当前位置: 首页 > news >正文

开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(六)

 一、前言

    使用 FastAPI 可以帮助我们更简单高效地部署 AI 交互业务。FastAPI 提供了快速构建 API 的能力,开发者可以轻松地定义模型需要的输入和输出格式,并编写好相应的业务逻辑。

    FastAPI 的异步高性能架构,可以有效支持大量并发的预测请求,为用户提供流畅的交互体验。此外,FastAPI 还提供了容器化部署能力,开发者可以轻松打包 AI 模型为 Docker 镜像,实现跨环境的部署和扩展。

    总之,使用 FastAPI 可以大大提高 AI 应用程序的开发效率和用户体验,为 AI 模型的部署和交互提供全方位的支持。

    本篇在开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)基础上,学习如何集成Tool获取实时数据,并以流式方式返回


二、术语

2.1.Tool

    Tool(工具)是为了增强其语言模型的功能和实用性而设计的一系列辅助手段,用于扩展模型的能力。例如代码解释器(Code Interpreter)和知识检索(Knowledge Retrieval)等都属于其工具。

2.2.langchain预置的tools

    https://github.com/langchain-ai/langchain/tree/v0.1.16/docs/docs/integrations/tools

   基本这些工具能满足大部分需求,具体使用参见:

2.3.LangChain支持流式输出的方法

  • stream:基本的流式传输方式,能逐步给出代理的动作和观察结果。
  • astream:异步的流式传输,用于异步处理需求的情况。
  • astream_events:更细致的流式传输,能流式传输代理的每个具体事件,如工具调用和结束、模型启动和结束等,便于深入了解和监控代理执行的详细过程。

2.4.langchainhub

    是 LangChain 相关工具的集合中心,其作用在于方便开发者发现和共享常用的提示(Prompt)、链、代理等。

    它受 Hugging Face Hub 启发,促进社区交流与协作,推动 LangChain 生态发展。当前,它在新架构中被置于 LangSmith 里,主要聚焦于 Prompt。

2.5.asyncio

    是一个用于编写并发代码的标准库,它提供了构建异步应用程序的基础框架。


三、前置条件

3.1. 创建虚拟环境&安装依赖

  增加Google Search以及langchainhub的依赖包

conda create -n fastapi_test python=3.10
conda activate fastapi_test
pip install fastapi websockets uvicorn
pip install --quiet  langchain-core langchain-community langchain-openai
pip install google-search-results langchainhub

3.2. 注册Google Search API账号

参见:开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)

3.3. 生成Google Search API的KEY


四、技术实现

4.1. 使用Tool&流式输出

# -*- coding: utf-8 -*-
import asyncio
import os
from langchain.agents import  create_structured_chat_agent, AgentExecutor
from langchain_community.utilities.serpapi import SerpAPIWrapper
from langchain_core.prompts import SystemMessagePromptTemplate, HumanMessagePromptTemplate, ChatPromptTemplate
from langchain_core.tools import tool
from langchain_openai import ChatOpenAIos.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'  # 你的Open AI Key
os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"llm = ChatOpenAI(model="gpt-3.5-turbo",temperature=0,max_tokens=512)@tool
def search(query:str):"""只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""serp = SerpAPIWrapper()result = serp.run(query)print("实时搜索结果:", result)return resulttools = [search]template='''
Respond to the human as helpfully and accurately as possible. You have access to the following tools:{tools}Use a json blob to specify a tool by providing an action key (tool name) and an action_input key (tool input).Valid "action" values: "Final Answer" or {tool_names}Provide only ONE action per $JSON_BLOB, as shown:```{{"action": $TOOL_NAME,"action_input": $INPUT}}```Follow this format:Question: input question to answerThought: consider previous and subsequent stepsAction:```$JSON_BLOB```Observation: action result... (repeat Thought/Action/Observation N times)Thought: I know what to respondAction:```{{"action": "Final Answer","action_input": "Final response to human"}}Begin! Reminder to ALWAYS respond with a valid json blob of a single action. Use tools if necessary. Respond directly if appropriate. Format is Action:```$JSON_BLOB```then Observation
'''
system_message_prompt = SystemMessagePromptTemplate.from_template(template)
human_template='''
{input}{agent_scratchpad}(reminder to respond in a JSON blob no matter what)
'''
human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)
prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])print(prompt)agent = create_structured_chat_agent(llm, tools, prompt
)agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)async def chat(params):events = agent_executor.astream_events(params,version="v2")async for event in events:type = event['event']if 'on_chat_model_stream' == type:data = event['data']chunk =  data['chunk']content =  chunk.contentif content and len(content) > 0:print(content)asyncio.run(chat({"input": "广州现在天气如何?"}))

调用结果:

说明:

流式输出的数据结构为:

{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='天', id='run-92515b63-4b86-4af8-8515-2f84def9dfab')}, 'run_id': '92515b63-4b86-4af8-8515-2f84def9dfab', 'name': 'ChatOpenAI', 'tags': ['seq:step:3'], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 512, 'ls_stop': ['\nObservation']}}
type: on_chat_model_stream
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='气', id='run-92515b63-4b86-4af8-8515-2f84def9dfab')}, 'run_id': '92515b63-4b86-4af8-8515-2f84def9dfab', 'name': 'ChatOpenAI', 'tags': ['seq:step:3'], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 512, 'ls_stop': ['\nObservation']}}

4.2. 通过langchainhub使用公共prompt

   在4.1使用Tool&流式输出的代码基础上进行调整

# -*- coding: utf-8 -*-
import asyncio
import os
from langchain.agents import  create_structured_chat_agent, AgentExecutor
from langchain_community.utilities.serpapi import SerpAPIWrapper
from langchain_core.tools import tool
from langchain_openai import ChatOpenAIos.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'  # 你的Open AI Key
os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"from langchain import hubllm = ChatOpenAI(model="gpt-3.5-turbo",temperature=0,max_tokens=512)@tool
def search(query:str):"""只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""serp = SerpAPIWrapper()result = serp.run(query)print("实时搜索结果:", result)return resulttools = [search]prompt = hub.pull("hwchase17/structured-chat-agent")print(prompt)agent = create_structured_chat_agent(llm, tools, prompt
)agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)async def chat(params):events = agent_executor.astream_events(params,version="v2")async for event in events:type = event['event']if 'on_chat_model_stream' == type:data = event['data']chunk =  data['chunk']content =  chunk.contentif content and len(content) > 0:print(content)asyncio.run(chat({"input": "广州现在天气如何?"}))

调用结果:

4.3. 整合代码

在开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)的代码基础上进行调整

import uvicorn
import osfrom typing import Annotated
from fastapi import (Depends,FastAPI,WebSocket,WebSocketException,WebSocketDisconnect,status,
)
from langchain import hub
from langchain.agents import create_structured_chat_agent, AgentExecutor
from langchain_community.utilities import SerpAPIWrapperfrom langchain_core.tools import tool
from langchain_openai import ChatOpenAIos.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'  # 你的Open AI Key
os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"class ConnectionManager:def __init__(self):self.active_connections: list[WebSocket] = []async def connect(self, websocket: WebSocket):await websocket.accept()self.active_connections.append(websocket)def disconnect(self, websocket: WebSocket):self.active_connections.remove(websocket)async def send_personal_message(self, message: str, websocket: WebSocket):await websocket.send_text(message)async def broadcast(self, message: str):for connection in self.active_connections:await connection.send_text(message)manager = ConnectionManager()app = FastAPI()async def authenticate(websocket: WebSocket,userid: str,secret: str,
):if userid is None or secret is None:raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)print(f'userid: {userid},secret: {secret}')if '12345' == userid and 'xxxxxxxxxxxxxxxxxxxxxxxxxx' == secret:return 'pass'else:return 'fail'@tool
def search(query:str):"""只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""serp = SerpAPIWrapper()result = serp.run(query)print("实时搜索结果:", result)return resultdef get_prompt():prompt = hub.pull("hwchase17/structured-chat-agent")return promptasync def chat(query):global llm,toolsagent = create_structured_chat_agent(llm, tools, get_prompt())agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)events = agent_executor.astream_events({"input": query}, version="v1")async for event in events:type = event['event']if 'on_chat_model_stream' == type:data = event['data']chunk = data['chunk']content = chunk.contentif content and len(content) > 0:print(content)yield content@app.websocket("/ws")
async def websocket_endpoint(*,websocket: WebSocket,userid: str,permission: Annotated[str, Depends(authenticate)],):await manager.connect(websocket)try:while True:text = await websocket.receive_text()if 'fail' == permission:await manager.send_personal_message(f"authentication failed", websocket)else:if text is not None and len(text) > 0:async for msg in chat(text):await manager.send_personal_message(msg, websocket)except WebSocketDisconnect:manager.disconnect(websocket)print(f"Client #{userid} left the chat")await manager.broadcast(f"Client #{userid} left the chat")if __name__ == '__main__':tools = [search]llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, max_tokens=512)uvicorn.run(app, host='0.0.0.0',port=7777)

客户端:

<!DOCTYPE html>
<html><head><title>Chat</title></head><body><h1>WebSocket Chat</h1><form action="" onsubmit="sendMessage(event)"><label>USERID: <input type="text" id="userid" autocomplete="off" value="12345"/></label><label>SECRET: <input type="text" id="secret" autocomplete="off" value="xxxxxxxxxxxxxxxxxxxxxxxxxx"/></label><br/><button onclick="connect(event)">Connect</button><hr><label>Message: <input type="text" id="messageText" autocomplete="off"/></label><button>Send</button></form><ul id='messages'></ul><script>var ws = null;function connect(event) {var userid = document.getElementById("userid")var secret = document.getElementById("secret")ws = new WebSocket("ws://localhost:7777/ws?userid="+userid.value+"&secret=" + secret.value);ws.onmessage = function(event) {var messages = document.getElementById('messages')var message = document.createElement('li')var content = document.createTextNode(event.data)message.appendChild(content)messages.appendChild(message)};event.preventDefault()}function sendMessage(event) {var input = document.getElementById("messageText")ws.send(input.value)input.value = ''event.preventDefault()}</script></body>
</html>

调用结果:

用户输入:你好

不需要触发工具调用

模型输出:

用户输入:广州现在天气如何?

需要调用工具

模型输出:

```
Action:
```
{"action": "Final Answer","action_input": "广州现在的天气是多云,温度为87华氏度,降水概率为7%,湿度为76%,风力为7英里/小时。"
}
```

PS:

1. 上面仅用于演示流式输出的效果,里面包含一些冗余的信息,例如:"action": "Final Answer",要根据实际情况过滤。

2. 页面输出的样式可以根据实际需要进行调整,此处仅用于演示效果。

相关文章:

  • 用MySQL+node+vue做一个学生信息管理系统(一):配置项目
  • 【 木兰宽松许可证】
  • win10下Python的安装和卸载
  • 【Python】.py和.pyc文件的区别
  • 【深度学习】注意力机制
  • Unity | Shader基础知识(第十七集:学习Stencil并做出透视效果)
  • 华为SRv6 policy EVPN配置案例
  • CS中的局部性原理
  • vite项目如何在本地启动https协议
  • 【SpringBoot3学习 | 第1篇】SpringBoot3介绍与配置文件
  • 01:Linux的基本命令
  • 强化学习-5 策略梯度、Actor-Critic 算法
  • IOS Swift 从入门到精通:写入 Firestore数据库
  • 【ACM出版,马来西亚-吉隆坡举行】第四届互联网技术与教育信息化国际会议 (ITEI 2024)
  • 电路笔记(PCB):电流容量(IPC-2221和IPC-2152)+阻抗匹配
  • ECS应用管理最佳实践
  • ERLANG 网工修炼笔记 ---- UDP
  • JS专题之继承
  • ucore操作系统实验笔记 - 重新理解中断
  • 成为一名优秀的Developer的书单
  • 分布式任务队列Celery
  • 函数式编程与面向对象编程[4]:Scala的类型关联Type Alias
  • 浅谈sql中的in与not in,exists与not exists的区别
  • 移动端高清、多屏适配方案
  • ​LeetCode解法汇总2808. 使循环数组所有元素相等的最少秒数
  • ​业务双活的数据切换思路设计(下)
  • # Swust 12th acm 邀请赛# [ A ] A+B problem [题解]
  • #我与Java虚拟机的故事#连载10: 如何在阿里、腾讯、百度、及字节跳动等公司面试中脱颖而出...
  • (70min)字节暑假实习二面(已挂)
  • (一)SpringBoot3---尚硅谷总结
  • (转载)跟我一起学习VIM - The Life Changing Editor
  • .net core 3.0 linux,.NET Core 3.0 的新增功能
  • .NET框架设计—常被忽视的C#设计技巧
  • .Net中的设计模式——Factory Method模式
  • .net中我喜欢的两种验证码
  • .skip() 和 .only() 的使用
  • [ACTF2020 新生赛]Upload 1
  • [Angular 基础] - 指令(directives)
  • [Angular] 笔记 18:Angular Router
  • [BZOJ 4034][HAOI2015]T2 [树链剖分]
  • [ERROR] ocp-server-ce-py_script_start_check-4.2.1 RuntimeError: ‘tenant_name‘
  • [FxCop.设计规则]8. 也许参数类型应该是基类型
  • [IM] [Webhook] Webhook实现IM平台机器人
  • [java/jdbc]插入数据时获取自增长主键的值
  • [LeetCode] Merge Two Sorted Lists
  • [NBIoT]NBIoT相关知识
  • [ndss 2023]确保联邦敏感主题分类免受中毒攻击
  • [nlp] grad norm先降后升再降
  • [node] Node.js 缓冲区Buffer
  • [OpenCV学习笔记]获取鼠标处图像的坐标和像素值
  • [Python] scikit-learn之mean_squared_error函数(Mean Squared Error(MSE))介绍和使用案例
  • [SpringBoot]接口的多实现:选择性注入SpringBoot接口的实现类
  • [SRM603] WinterAndSnowmen
  • [SWPUCTF 2021 新生赛]easy_md5
  • [UI5 常用控件] 08.Wizard,NavContainer