当前位置: 首页 > news >正文

【FastAPI】服务器使用SSE实现客户端之间的广播和点对点功能

在 FastAPI 中实现使用SSE实现客户端之间的广播和点对点功能,可以通过以下步骤实现:

  1. 安装依赖:

    FastAPI 本身就支持 SSE,你只需要安装 fastapiuvicorn 来运行应用程序。

    pip install fastapi uvicorn
    
  2. SSE 服务端代码:

    下面的代码展示了如何用 FastAPI 实现 SSE 服务器。我们将用一个全局字典 clients 来存储每个客户端的事件流,并支持广播和点对点消息发送。

    from fastapi import FastAPI, Request
    from fastapi.responses import StreamingResponse
    from starlette.responses import JSONResponse
    from typing import Dict
    import asyncioapp = FastAPI()# 存储所有连接的客户端
    clients: Dict[str, asyncio.Queue] = {}# SSE 事件生成器
    async def event_generator(queue: asyncio.Queue):try:while True:data = await queue.get()yield f"data: {data}\n\n"except asyncio.CancelledError:pass# SSE 连接端点
    @app.get("/sse/{client_id}")
    async def sse(client_id: str):queue = asyncio.Queue()clients[client_id] = queuereturn StreamingResponse(event_generator(queue), media_type="text/event-stream")# 广播消息给所有客户端
    @app.post("/broadcast")
    async def broadcast_message(message: dict):for queue in clients.values():await queue.put(message['content'])return JSONResponse({"status": "Broadcasted"})# 点对点消息发送
    @app.post("/send/{client_id}")
    async def send_message(client_id: str, message: dict):if client_id in clients:await clients[client_id].put(message['content'])return JSONResponse({"status": f"Message sent to {client_id}"})return JSONResponse({"status": "Client not found"}, status_code=404)# 客户端断开连接处理
    @app.get("/disconnect/{client_id}")
    async def disconnect_client(client_id: str):if client_id in clients:del clients[client_id]return JSONResponse({"status": f"Disconnected client {client_id}"})return JSONResponse({"status": "Client not found"}, status_code=404)

功能说明:

  1. SSE 连接: 每个客户端可以通过 /sse/{client_id} 连接到 SSE 端点,其中 {client_id} 是该客户端的唯一标识符。每个连接都分配了一个独立的事件队列,用于接收消息。

  2. 广播消息: 客户端可以通过 /broadcast POST 请求向所有连接的客户端广播消息。

    请求体示例:

    {"content": "This is a broadcast message"
    }
    
  3. 点对点消息发送: 通过 /send/{client_id} POST 请求,可以将消息发送给指定的客户端。

    请求体示例:

    {"content": "Private message to specific client"
    }
    
  4. 客户端断开: 通过 /disconnect/{client_id} 可以手动断开特定客户端的连接。

客户端示例:

前端使用 JavaScript 来实现 SSE 客户端:

const clientId = 'client1';  // 替换为你客户端的唯一ID
const eventSource = new EventSource(`/sse/${clientId}`);eventSource.onmessage = function(event) {console.log("New message from server:", event.data);
};// 断开连接时
eventSource.close();

总结:

  1. event_generator 负责生成 SSE 消息流。
  2. clients 字典用于管理每个连接的客户端,允许广播和点对点消息。
  3. 每个客户端都可以通过唯一 ID 建立连接,发送或接收消息。

这种方式可以轻松扩展以支持更多的业务逻辑,例如认证、分组广播等。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • C#命令行参数解析库System.CommandLine介绍
  • vue3+ant design vue 中弹窗自定义按钮设置及以冒号为基准布局
  • 关于文件操作
  • CAD图1
  • 功能测试干了三年,快要废了。。。
  • 多维度测评!希亦、美的、大宇三大爆款超声波清洗机,谁才是实力王?
  • 『 Linux 』HTTP(一)
  • 【GO开发】MacOS上搭建GO的基础环境-Hello World
  • python植物大战僵尸项目源码【免费】
  • AI 大模型开发 —— 面对转行挑战与机遇,探索有效学习开启职业转型
  • Java学习路线指南
  • BUUCTF-MISC-easycap
  • 打工人、设计师必备的AI抠图工具
  • 不会JS逆向也能高效结合Scrapy与Selenium实现爬虫抓取
  • stm32g431rbt6芯片中VREF+是什么?在电路中怎么设计?
  • Angular2开发踩坑系列-生产环境编译
  • CentOS学习笔记 - 12. Nginx搭建Centos7.5远程repo
  • CoolViewPager:即刻刷新,自定义边缘效果颜色,双向自动循环,内置垂直切换效果,想要的都在这里...
  • Docker 笔记(1):介绍、镜像、容器及其基本操作
  • jquery ajax学习笔记
  • JS基础篇--通过JS生成由字母与数字组合的随机字符串
  • Median of Two Sorted Arrays
  • Python 反序列化安全问题(二)
  • python 装饰器(一)
  • SegmentFault 2015 Top Rank
  • SpingCloudBus整合RabbitMQ
  • spring cloud gateway 源码解析(4)跨域问题处理
  • TypeScript实现数据结构(一)栈,队列,链表
  • ⭐ Unity 开发bug —— 打包后shader失效或者bug (我这里用Shader做两张图片的合并发现了问题)
  • vue脚手架vue-cli
  • 使用putty远程连接linux
  • 小程序、APP Store 需要的 SSL 证书是个什么东西?
  • 异常机制详解
  • 深度学习之轻量级神经网络在TWS蓝牙音频处理器上的部署
  • 昨天1024程序员节,我故意写了个死循环~
  • ​虚拟化系列介绍(十)
  • # Panda3d 碰撞检测系统介绍
  • # 利刃出鞘_Tomcat 核心原理解析(二)
  • #我与Java虚拟机的故事#连载15:完整阅读的第一本技术书籍
  • $.ajax中的eval及dataType
  • $redis-setphp_redis Set命令,php操作Redis Set函数介绍
  • (1)Android开发优化---------UI优化
  • (2015)JS ES6 必知的十个 特性
  • (27)4.8 习题课
  • (51单片机)第五章-A/D和D/A工作原理-A/D
  • (Redis使用系列) Springboot 实现Redis 同数据源动态切换db 八
  • (Redis使用系列) Springboot 在redis中使用BloomFilter布隆过滤器机制 六
  • (编译到47%失败)to be deleted
  • (补充)IDEA项目结构
  • (每日一问)设计模式:设计模式的原则与分类——如何提升代码质量?
  • (免费分享)基于springboot,vue疗养中心管理系统
  • (四)软件性能测试
  • (一) storm的集群安装与配置
  • (原創) 未来三学期想要修的课 (日記)
  • (转)大型网站架构演变和知识体系