当前位置: 首页 > news >正文

FastAPI 学习之路(四十九)WebSockets(五)修复接口测试中的问题

其实代码没有问题,但是我们忽略了一个问题,就是在正常的开发中,肯定是遇到过这样的情况,我们频繁的有客户端链接,断开连接,需要统一的管理这些链接,那么应该如何管理呢。其实可以声明一个类去管理这些链接。接下来我们看下该如何优化。

一、优化测试接口方法

1.定义链接管理类,处理所有链接

"""
websocket 链接管理
"""from typing import List, Dictfrom starlette.websockets import WebSocketclass ConnectionManager:def __init__(self):"""存放链接"""self.active_connections: List[Dict[str, WebSocket]] = []async def connect(self, user: str, ws: WebSocket):"""链接"""self.active_connections.append({"user": user, "ws": ws})async def disconnect(self, user: str, ws: WebSocket):"""断开链接,移除"""self.active_connections.remove({"user": user, "ws": ws})

2.修改应用代码

我们增加了链接,移除链接的操作,那么对应修改下代码

from connection_tool import ConnectionManager
from starlette.websockets import WebSocketDisconnect
ws_manager = ConnectionManager()@app.websocket("/items/ws")
async def websocket_endpoint(websocket: WebSocket,cookie_or_token: str = Depends(get_cookie_or_token),
):await websocket.accept()await ws_manager.connect(cookie_or_token, websocket)try:while True:data = await websocket.receive_text()await websocket.send_text(f"Message is: {data}")except WebSocketDisconnect as e:await ws_manager.disconnect(cookie_or_token, websocket)

3.测试 

这样我们在链接处理的时候就可以正常处理了。之前报错是因为我们没有正常的关闭链接导致的,那么我们再测试一下

"""
测试websockets
"""from fastapi.testclient import TestClient
from main import appdef test_websocket():client = TestClient(app)with client.websocket_connect("/items/ws?token=fake-token") as websocket:websocket.send_text("Hello, this is testing websocket")data = websocket.receive_text()print(data)assert str(data) == f"Message is: Hello, this is testing websocket"if __name__ == '__main__':test_websocket()

此时,发现代码不会再报错 

 二、增加测试用例并优化

1.增加用例代码

import unittestfrom fastapi.testclient import TestClientfrom main import appclass FastApiTestWeb(unittest.TestCase):def setUp(self) -> None:self.client = TestClient(app)def tearDown(self) -> None:self.client = Nonedef test_websocket(self):with self.client.websocket_connect("/items/ws?token=fake-token") as websocket:websocket.send_text("Hello, this is using test case to test websocket")data = websocket.receive_text()print(data)assert str(data) == "Message is: Hello, this is using test case to test websocket"def test_websocket_again(self):with self.client.websocket_connect("/items/ws?token=fake-token") as websocket:websocket.send_text("Hello, this is using test case to test websocket again")data = websocket.receive_text()print(data)assert str(data) == "Message is: Hello, this is using test case to test websocket again"if __name__ == '__main__':unittest.main()

2.执行用例

这样我们的一个测试用例就更加的完整了。我们执行正常是没有报错的

3. 查看代码的覆盖率 

pip install coverage

我们想要看下代码的覆盖率,应该如何看呢。我是用的coverage。

然后再report

  我们想看html测试报告,可以运行下 coverage html。

然后打开index.html

因为我的main.py还有其他的方法,我们还需要点进去看我们对应方法的覆盖率。

如果想要将覆盖率都达到100%,还需要针对其他方法增加测试用例。

到这里,我们对于WebSockets接口测试完毕,但是如果我们想实现上线通知,下线通知,如何实现呢?见下一节。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • ScrapySharp框架:小红书视频数据采集的API集成与应用
  • 使用Docker创建并运行一个create-react-app应用(超简单)
  • 新手-前端生态
  • Qt中https的使用,报错TLS initialization failed和不能打开ssl.lib问题解决
  • Spring Boot(八十):Tesseract实现图片文字自动识别
  • Linux Zip 命令指南
  • [Spring] Spring Web MVC案例实战
  • SpringCloud集成kafka集群
  • MyBatis是如何分页的及原理
  • AWS CDN新增用户ip 地区 城市 响应头
  • 前端a-tree遇到的问题
  • 普通人还有必要学习 Python 之类的编程语言吗?
  • 值得关注的数据资产入表
  • C#开发:Git的安装和使用
  • Linux多线程编程-哲学家就餐问题详解与实现(C语言)
  • 【附node操作实例】redis简明入门系列—字符串类型
  • Android框架之Volley
  • JavaScript标准库系列——Math对象和Date对象(二)
  • Java程序员幽默爆笑锦集
  • Java深入 - 深入理解Java集合
  • JS函数式编程 数组部分风格 ES6版
  • JS实现简单的MVC模式开发小游戏
  • oschina
  • PHP 程序员也能做的 Java 开发 30分钟使用 netty 轻松打造一个高性能 websocket 服务...
  • 扫描识别控件Dynamic Web TWAIN v12.2发布,改进SSL证书
  • 通过npm或yarn自动生成vue组件
  • 微信如何实现自动跳转到用其他浏览器打开指定页面下载APP
  • 用element的upload组件实现多图片上传和压缩
  • 在 Chrome DevTools 中调试 JavaScript 入门
  • 怎样选择前端框架
  • ​​​​​​​GitLab 之 GitLab-Runner 安装,配置与问题汇总
  • # Spring Cloud Alibaba Nacos_配置中心与服务发现(四)
  • (PySpark)RDD实验实战——取最大数出现的次数
  • (八)Docker网络跨主机通讯vxlan和vlan
  • (附源码)springboot家庭财务分析系统 毕业设计641323
  • (精确度,召回率,真阳性,假阳性)ACC、敏感性、特异性等 ROC指标
  • (论文阅读32/100)Flowing convnets for human pose estimation in videos
  • (十二)python网络爬虫(理论+实战)——实战:使用BeautfulSoup解析baidu热搜新闻数据
  • (四)Tiki-taka算法(TTA)求解无人机三维路径规划研究(MATLAB)
  • (一)WLAN定义和基本架构转
  • (一)认识微服务
  • (最新)华为 2024 届秋招-硬件技术工程师-单板硬件开发—机试题—(共12套)(每套四十题)
  • **PyTorch月学习计划 - 第一周;第6-7天: 自动梯度(Autograd)**
  • ... fatal error LINK1120:1个无法解析的外部命令 的解决办法
  • ./mysql.server: 没有那个文件或目录_Linux下安装MySQL出现“ls: /var/lib/mysql/*.pid: 没有那个文件或目录”...
  • .ai域名是什么后缀?
  • .mkp勒索病毒解密方法|勒索病毒解决|勒索病毒恢复|数据库修复
  • .NET CF命令行调试器MDbg入门(一)
  • .Net Core中的内存缓存实现——Redis及MemoryCache(2个可选)方案的实现
  • .NET Micro Framework初体验
  • .NET 服务 ServiceController
  • .Net 垃圾回收机制原理(二)
  • .net连接oracle数据库
  • .NET序列化 serializable,反序列化
  • .NET中统一的存储过程调用方法(收藏)