Streaming local LLM with FastAPI, Llama.cpp and Langchain




I have setup FastAPI with Llama.cpp and Langchain. Now I want to enable streaming in the FastAPI responses. Streaming works with Llama.cpp in my terminal, but I wasn't able to implement it with a FastAPI response.


Most tutorials focused on enabling streaming with an OpenAI model, but I am using a local LLM (quantized Mistral) with llama.cpp. I think I have to modify the Callbackhandler, but no tutorial worked. Here is my code:


from fastapi import FastAPI, Request, Response
from langchain_community.llms import LlamaCpp
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
import copy
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplatemodel_path = "../modelle/mixtral-8x7b-instruct-v0.1.Q5_K_M.gguf"prompt= """
<s> [INST] Im folgenden bekommst du eine Aufgabe. Erledige diese anhand des User Inputs.### Hier die Aufgabe: ###
{typescript_string}### Hier der User Input: ###
{input}Antwort: [/INST]
"""def model_response_prompt():return PromptTemplate(template=prompt, input_variables=['input', 'typescript_string'])def build_llm(model_path, callback=None):callback_manager = CallbackManager([StreamingStdOutCallbackHandler()])#callback_manager = CallbackManager(callback)n_gpu_layers = 1 # Metal set to 1 is enough. # ausprobiert mit mehrerenn_batch = 512#1024 # Should be between 1 and n_ctx, consider the amount of RAM of your Apple Silicon Chip.llm = LlamaCpp(max_tokens =1000,n_threads = 6,model_path=model_path,temperature= 0.8,f16_kv=True,n_ctx=28000, n_gpu_layers=n_gpu_layers,n_batch=n_batch,callback_manager=callback_manager, verbose=True,top_p=0.75,top_k=40,repeat_penalty = 1.1,streaming=True,model_kwargs={'mirostat': 2,},)return llm# caching LLM
def get_cached_llm():chat = build_llm(model_path)return chatchat = get_cached_llm()app = FastAPI(title="Inference API for Mistral and Mixtral",description="A simple API that use Mistral or Mixtral",version="1.0",
)def bullet_point_model():          llm = build_llm(model_path=model_path)llm_chain = LLMChain(llm=llm,prompt=model_response_prompt(),verbose=True,)return llm_chain@app.get('/model_response')
async def model(question : str, prompt: str):model = bullet_point_model()res = model({"typescript_string": prompt, "input": question})result = copy.deepcopy(res)return result

In a example notebook, I am calling FastAPI like this:


import  subprocess
import urllib.parse
import shlex
query = input("Insert your bullet points here: ")
task = input("Insert the task here: ")
#Safe Encode url string
encodedquery =  urllib.parse.quote(query)
encodedtask =  urllib.parse.quote(task)
#Join the curl command textx
command = f"curl -X 'GET' '{encodedquery}&prompt={encodedtask}' -H 'accept: application/json'"
args = shlex.split(command)
process = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()

So with this code, getting responses from the API works. But I only see streaming in my terminal (I think this is because of the StreamingStdOutCallbackHandler. After the streaming in the terminal is complete, I am getting my FastAPI response.


What do I have to change now that I can stream token by token with FastAPI and a local llama.cpp model?



I was doing the same and hit similar issue that FastAPI was not streaming the response even I am using the StreamingResponse API and eventually I got the following code work. There are three important part:

我之前也做了同样的事情,并遇到了类似的问题,即即使我使用了StreamingResponse API,FastAPI也没有流式传输响应。但最终我得到了以下可以工作的代码。这里有三个重要的部分:

  • Make sure using StreamingResponse to wrap an Iterator.


  • Make sure the Iterator sends newline character \n in each streaming response.

确保迭代器在每个流式响应中发送换行符 \n

  • Make sure using streaming APIs to connect to your LLMs. For example, _client.chat function in my example is using httpx to connect to REST APIs for LLMs. If you use requests package, it won't work as it doesn't support streaming.

确保使用流式API来连接您的大型语言模型(LLMs)。例如,在我的示例中,_client.chat 函数使用 httpx 来连接到LLMs的REST API。如果您使用 requests 包,那么它将无法工作,因为 requests 不支持流式传输。

async def chat(self, request: Request):
Generate a chat response using the requested model.
"""# Passing request body JSON to parameters of function _chat
# Request body follows ollama API's chat request format for now.
params = await request.json()
self.logger.debug("Request data: %s", params)chat_response = self._client.chat(**params)# Always return as streaming
if isinstance(chat_response, Iterator):def generate_response():for response in chat_response:yield json.dumps(response) + "\n"return StreamingResponse(generate_response(), media_type="application/x-ndjson")
elif chat_response is not None:return json.dumps(chat_response)


