当前位置: 首页 > news >正文

How can I stream a response from LangChain‘s OpenAI using Flask API?

题意:怎样在 Flask API 中使用 LangChain 的 OpenAI 模型流式传输响应

问题背景:

I am using Python Flask app for chat over data. In the console I am getting streamable response directly from the OpenAI since I can enable streming with a flag streaming=True.

我正在使用 Python Flask 应用程序进行数据聊天。在控制台中,我直接从 OpenAI 获取流式响应,因为我可以通过设置 `streaming=True` 来启用流式传输。

The problem is, that I can't "forward" the stream or "show" the stream than in my API call.

问题是,我无法在 API 调用中“转发”或“显示”这个流式响应。

Code for the processing OpenAI and chain is:

处理 OpenAI 和链的代码如下:

def askQuestion(self, collection_id, question):collection_name = "collection-" + str(collection_id)self.llm = ChatOpenAI(model_name=self.model_name, temperature=self.temperature, openai_api_key=os.environ.get('OPENAI_API_KEY'), streaming=True, callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]))self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True,  output_key='answer')chroma_Vectorstore = Chroma(collection_name=collection_name, embedding_function=self.embeddingsOpenAi, client=self.chroma_client)self.chain = ConversationalRetrievalChain.from_llm(self.llm, chroma_Vectorstore.as_retriever(similarity_search_with_score=True),return_source_documents=True,verbose=VERBOSE, memory=self.memory)result = self.chain({"question": question})res_dict = {"answer": result["answer"],}res_dict["source_documents"] = []for source in result["source_documents"]:res_dict["source_documents"].append({"page_content": source.page_content,"metadata":  source.metadata})return res_dict

and the API route code:        以及 API 路由的代码:

@app.route("/collection/<int:collection_id>/ask_question", methods=["POST"])
def ask_question(collection_id):question = request.form["question"]# response_generator = document_thread.askQuestion(collection_id, question)# return jsonify(response_generator)def stream(question):completion = document_thread.askQuestion(collection_id, question)for line in completion['answer']:yield line

I am testing my endpoint with curl and I am passing flag -N to curl, so I should get the streamable response, if it is possible.

我正在使用 curl 测试我的端点,并传递了 `-N` 标志,因此如果可能的话,我应该能得到流式响应。

When I make API call first the endpoint is waiting to process the data (I can see in my terminal in VS code the streamable answer) and when finished, I get everything displayed in one go.

当我发起 API 调用时,端点首先等待处理数据(我可以在 VS Code 的终端中看到流式的回答),处理完成后,所有内容一次性显示出来。

问题解决:

With the usage of threading and callback we can have a streaming response from flask API.

通过使用 `threading` 和 `callback`,我们可以在 Flask API 中实现流式响应。

In flask API, you may create a queue to register tokens through langchain's callback.

在 Flask API 中,可以创建一个队列,通过 LangChain 的回调函数来注册令牌。

class StreamingHandler(BaseCallbackHandler):...def on_llm_new_token(self, token: str, **kwargs) -> None:self.queue.put(token)

You may get tokens from the same queue in your flask route.

你可以在 Flask 路由中从同一个队列获取令牌。

from flask import Response, stream_with_context
import threading @app.route(....):
def stream_output():q = Queue()def generate(rq: Queue):...# add your logic to prevent while loop# to run indefinitely  while( ...):yield rq.get()callback_fn = StreamingHandler(q)threading.Thread(target= askQuestion, args=(collection_id, question, callback_fn))return Response(stream_with_context(generate(q))

In your langchain's ChatOpenAI add the above custom callback StreamingHandler.

在你的 LangChain 的 `ChatOpenAI` 中添加上述自定义回调 `StreamingHandler`。

self.llm = ChatOpenAI(model_name=self.model_name, temperature=self.temperature, openai_api_key=os.environ.get('OPENAI_API_KEY'), streaming=True, callback=[callback_fn,]
)

For reference:        参考如下

  • https://python.langchain.com/en/latest/modules/callbacks/getting_started.html#creating-a-custom-handler
  • Streaming Contents — Flask Documentation (2.3.x)

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 面试经典150题——删除有序数组中的重复项
  • 某花顺爬虫逆向分析
  • 基于主从Reactor模型实现高并发服务器
  • 【后端】【nginx】nginx常用命令
  • 高校心理辅导系统:Spring Boot技术实现指南
  • 三十种编程语言庆祝【国庆节】!!!
  • flask的学习记录
  • Transformer模型-7- Decoder
  • 如何有效检测住宅IP真伪?
  • Lubuntu电源管理
  • 深度学习经典模型之BERT(上)
  • 深入解析仓颉语言中的变量操作与赋值技巧
  • 十六,Spring Boot 整合 Druid 以及使用 Druid 监控功能
  • 【C++】C++入门概念(一)
  • 大数据-143 - ClickHouse 集群 SQL 超详细实践记录!
  • @angular/forms 源码解析之双向绑定
  • [nginx文档翻译系列] 控制nginx
  • 《微软的软件测试之道》成书始末、出版宣告、补充致谢名单及相关信息
  • 【剑指offer】让抽象问题具体化
  • 〔开发系列〕一次关于小程序开发的深度总结
  • 2017 前端面试准备 - 收藏集 - 掘金
  • 2017-08-04 前端日报
  • Bootstrap JS插件Alert源码分析
  • Java面向对象及其三大特征
  • js正则,这点儿就够用了
  • Vultr 教程目录
  • zookeeper系列(七)实战分布式命名服务
  • 订阅Forge Viewer所有的事件
  • 多线程事务回滚
  • 如何将自己的网站分享到QQ空间,微信,微博等等
  • 如何利用MongoDB打造TOP榜小程序
  • 腾讯优测优分享 | 你是否体验过Android手机插入耳机后仍外放的尴尬?
  • 机器人开始自主学习,是人类福祉,还是定时炸弹? ...
  • ​【经验分享】微机原理、指令判断、判断指令是否正确判断指令是否正确​
  • ​经​纬​恒​润​二​面​​三​七​互​娱​一​面​​元​象​二​面​
  • ​字​节​一​面​
  • # centos7下FFmpeg环境部署记录
  • # 日期待t_最值得等的SUV奥迪Q9:空间比MPV还大,或搭4.0T,香
  • (1)安装hadoop之虚拟机准备(配置IP与主机名)
  • (2021|NIPS,扩散,无条件分数估计,条件分数估计)无分类器引导扩散
  • (void) (_x == _y)的作用
  • (vue)el-tabs选中最后一项后更新数据后无法展开
  • (八)c52学习之旅-中断实验
  • (多级缓存)多级缓存
  • (论文阅读40-45)图像描述1
  • (四)React组件、useState、组件样式
  • (五)c52学习之旅-静态数码管
  • (一)、python程序--模拟电脑鼠走迷宫
  • (转载)Google Chrome调试JS
  • ***检测工具之RKHunter AIDE
  • **CI中自动类加载的用法总结
  • 、写入Shellcode到注册表上线
  • .bat批处理(十):从路径字符串中截取盘符、文件名、后缀名等信息
  • .bat批处理(五):遍历指定目录下资源文件并更新
  • .htaccess 强制https 单独排除某个目录