From 847963d19a99213a826d60e8251004bcc20ff0a8 Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 17 Jan 2025 03:35:03 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20/query=20=E5=92=8C=20/quer?= =?UTF-8?q?y/stream=20=E7=AB=AF=E7=82=B9=E5=A4=84=E7=90=86stream=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=E6=98=AF=E7=9A=84=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lightrag/api/lightrag_ollama.py | 51 ++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/lightrag/api/lightrag_ollama.py b/lightrag/api/lightrag_ollama.py index bd068653..af3f22ee 100644 --- a/lightrag/api/lightrag_ollama.py +++ b/lightrag/api/lightrag_ollama.py @@ -489,27 +489,21 @@ def create_app(args): ), ) - if request.stream: - from fastapi.responses import StreamingResponse - - async def stream_generator(): - async for chunk in response: - yield f"{json.dumps({'response': chunk})}\n" - - return StreamingResponse( - stream_generator(), - media_type="application/x-ndjson", - headers={ - "Cache-Control": "no-cache", - "Connection": "keep-alive", - "Content-Type": "application/x-ndjson", - "Access-Control-Allow-Origin": "*", - "Access-Control-Allow-Methods": "POST, OPTIONS", - "Access-Control-Allow-Headers": "Content-Type" - } - ) - else: + # 如果响应是字符串(比如命中缓存),直接返回 + if isinstance(response, str): return QueryResponse(response=response) + + # 如果是异步生成器,根据stream参数决定是否流式返回 + if request.stream: + result = "" + async for chunk in response: + result += chunk + return QueryResponse(response=result) + else: + result = "" + async for chunk in response: + result += chunk + return QueryResponse(response=result) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @@ -528,8 +522,18 @@ def create_app(args): from fastapi.responses import StreamingResponse async def stream_generator(): - async for chunk in response: - yield f"{json.dumps({'response': chunk})}\n" + if isinstance(response, str): + # 如果是字符串,一次性发送 + yield f"{json.dumps({'response': response})}\n" + else: + # 如果是异步生成器,逐块发送 + try: + async for chunk in response: + if chunk: # 只发送非空内容 + yield f"{json.dumps({'response': chunk})}\n" + except Exception as e: + logging.error(f"Streaming error: {str(e)}") + yield f"{json.dumps({'error': str(e)})}\n" return StreamingResponse( stream_generator(), @@ -540,7 +544,8 @@ def create_app(args): "Content-Type": "application/x-ndjson", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "POST, OPTIONS", - "Access-Control-Allow-Headers": "Content-Type" + "Access-Control-Allow-Headers": "Content-Type", + "X-Accel-Buffering": "no" # 禁用 Nginx 缓冲 } ) except Exception as e: