From ca2caf47bc0ed52899c06c7507cea7a5912fb5ed Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 15 Jan 2025 22:14:57 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=B5=81=E5=BC=8F=E5=93=8D?= =?UTF-8?q?=E5=BA=94=E7=9A=84=E8=BE=93=E5=87=BA=E6=A0=BC=E5=BC=8F=EF=BC=9A?= =?UTF-8?q?=E4=BB=8Eevent-stream=E6=94=B9=E4=B8=BAx-ndjson?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lightrag/api/lightrag_ollama.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/lightrag/api/lightrag_ollama.py b/lightrag/api/lightrag_ollama.py index 5a066e15..714731eb 100644 --- a/lightrag/api/lightrag_ollama.py +++ b/lightrag/api/lightrag_ollama.py @@ -2,6 +2,7 @@ from fastapi import FastAPI, HTTPException, File, UploadFile, Form, Request from pydantic import BaseModel import logging import argparse +import json from typing import List, Dict, Any, Optional from lightrag import LightRAG, QueryParam from lightrag.llm import openai_complete_if_cache, ollama_embedding @@ -474,19 +475,18 @@ def create_app(args): if request.stream: from fastapi.responses import StreamingResponse - import json async def stream_generator(): async for chunk in response: - yield f"data: {json.dumps({'response': chunk})}\n\n" + yield f"{json.dumps({'response': chunk})}\n" return StreamingResponse( stream_generator(), - media_type="text/event-stream", + media_type="application/x-ndjson", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", - "Content-Type": "text/event-stream", + "Content-Type": "application/x-ndjson", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "POST, OPTIONS", "Access-Control-Allow-Headers": "Content-Type" @@ -513,15 +513,15 @@ def create_app(args): async def stream_generator(): async for chunk in response: - yield f"data: {chunk}\n\n" + yield f"{json.dumps({'response': chunk})}\n" return StreamingResponse( stream_generator(), - media_type="text/event-stream", + media_type="application/x-ndjson", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", - "Content-Type": "text/event-stream", + "Content-Type": "application/x-ndjson", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "POST, OPTIONS", "Access-Control-Allow-Headers": "Content-Type" @@ -699,7 +699,6 @@ def create_app(args): if request.stream: from fastapi.responses import StreamingResponse - import json response = await rag.aquery( # 需要 await 来获取异步生成器 cleaned_query, @@ -721,7 +720,7 @@ def create_app(args): }, "done": True } - yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n" + yield f"{json.dumps(data, ensure_ascii=False)}\n" else: # 流式响应 async for chunk in response: @@ -736,7 +735,7 @@ def create_app(args): }, "done": False } - yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n" + yield f"{json.dumps(data, ensure_ascii=False)}\n" # 发送完成标记,包含性能统计信息 data = { @@ -750,7 +749,7 @@ def create_app(args): "eval_count": 999, "eval_duration": 1 } - yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n" + yield f"{json.dumps(data, ensure_ascii=False)}\n" return # 确保生成器在发送完成标记后立即结束 except Exception as e: logging.error(f"Error in stream_generator: {str(e)}") @@ -758,11 +757,11 @@ def create_app(args): return StreamingResponse( stream_generator(), - media_type="text/event-stream", + media_type="application/x-ndjson", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", - "Content-Type": "text/event-stream", + "Content-Type": "application/x-ndjson", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "POST, OPTIONS", "Access-Control-Allow-Headers": "Content-Type"