修改流式响应的输出格式:从event-stream改为x-ndjson

This commit is contained in:
yangdx
2025-01-15 22:14:57 +08:00
parent 6d44178f63
commit ca2caf47bc

View File

@@ -2,6 +2,7 @@ from fastapi import FastAPI, HTTPException, File, UploadFile, Form, Request
from pydantic import BaseModel
import logging
import argparse
import json
from typing import List, Dict, Any, Optional
from lightrag import LightRAG, QueryParam
from lightrag.llm import openai_complete_if_cache, ollama_embedding
@@ -474,19 +475,18 @@ def create_app(args):
if request.stream:
from fastapi.responses import StreamingResponse
import json
async def stream_generator():
async for chunk in response:
yield f"data: {json.dumps({'response': chunk})}\n\n"
yield f"{json.dumps({'response': chunk})}\n"
return StreamingResponse(
stream_generator(),
media_type="text/event-stream",
media_type="application/x-ndjson",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "text/event-stream",
"Content-Type": "application/x-ndjson",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type"
@@ -513,15 +513,15 @@ def create_app(args):
async def stream_generator():
async for chunk in response:
yield f"data: {chunk}\n\n"
yield f"{json.dumps({'response': chunk})}\n"
return StreamingResponse(
stream_generator(),
media_type="text/event-stream",
media_type="application/x-ndjson",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "text/event-stream",
"Content-Type": "application/x-ndjson",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type"
@@ -699,7 +699,6 @@ def create_app(args):
if request.stream:
from fastapi.responses import StreamingResponse
import json
response = await rag.aquery( # 需要 await 来获取异步生成器
cleaned_query,
@@ -721,7 +720,7 @@ def create_app(args):
},
"done": True
}
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
yield f"{json.dumps(data, ensure_ascii=False)}\n"
else:
# 流式响应
async for chunk in response:
@@ -736,7 +735,7 @@ def create_app(args):
},
"done": False
}
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
yield f"{json.dumps(data, ensure_ascii=False)}\n"
# 发送完成标记,包含性能统计信息
data = {
@@ -750,7 +749,7 @@ def create_app(args):
"eval_count": 999,
"eval_duration": 1
}
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
yield f"{json.dumps(data, ensure_ascii=False)}\n"
return # 确保生成器在发送完成标记后立即结束
except Exception as e:
logging.error(f"Error in stream_generator: {str(e)}")
@@ -758,11 +757,11 @@ def create_app(args):
return StreamingResponse(
stream_generator(),
media_type="text/event-stream",
media_type="application/x-ndjson",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "text/event-stream",
"Content-Type": "application/x-ndjson",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type"