feat: improve error handling for streaming responses

• Add CancelledError handling for streams
• Send error details to client in JSON
• Add error status codes and messages
• Always send final completion marker
• Refactor stream generator error handling
This commit is contained in:
yangdx
2025-02-05 09:46:56 +08:00
parent 0c8a2bface
commit 69f200faf2

View File

@@ -12,7 +12,7 @@ from fastapi import (
# Add this to store progress globally
from typing import Dict
import threading
import asyncio
import json
import os
@@ -1718,11 +1718,11 @@ def create_app(args):
)
async def stream_generator():
try:
first_chunk_time = None
last_chunk_time = None
total_response = ""
first_chunk_time = None
last_chunk_time = None
total_response = ""
try:
# Ensure response is an async generator
if isinstance(response, str):
# If it's a string, send in two parts
@@ -1760,46 +1760,81 @@ def create_app(args):
}
yield f"{json.dumps(data, ensure_ascii=False)}\n"
else:
async for chunk in response:
if chunk:
if first_chunk_time is None:
first_chunk_time = time.time_ns()
try:
async for chunk in response:
if chunk:
if first_chunk_time is None:
first_chunk_time = time.time_ns()
last_chunk_time = time.time_ns()
last_chunk_time = time.time_ns()
total_response += chunk
data = {
"model": ollama_server_infos.LIGHTRAG_MODEL,
"created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
"message": {
"role": "assistant",
"content": chunk,
"images": None,
},
"done": False,
}
yield f"{json.dumps(data, ensure_ascii=False)}\n"
total_response += chunk
data = {
"model": ollama_server_infos.LIGHTRAG_MODEL,
"created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
"message": {
"role": "assistant",
"content": chunk,
"images": None,
},
"done": False,
}
yield f"{json.dumps(data, ensure_ascii=False)}\n"
except asyncio.CancelledError:
error_data = {
"model": ollama_server_infos.LIGHTRAG_MODEL,
"created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
"error": {
"code": "STREAM_CANCELLED",
"message": "Stream was cancelled by server"
},
"done": False
}
yield f"{json.dumps(error_data, ensure_ascii=False)}\n"
raise
completion_tokens = estimate_tokens(total_response)
total_time = last_chunk_time - start_time
prompt_eval_time = first_chunk_time - start_time
eval_time = last_chunk_time - first_chunk_time
if last_chunk_time is not None:
completion_tokens = estimate_tokens(total_response)
total_time = last_chunk_time - start_time
prompt_eval_time = first_chunk_time - start_time
eval_time = last_chunk_time - first_chunk_time
data = {
"model": ollama_server_infos.LIGHTRAG_MODEL,
"created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
"done": True,
"total_duration": total_time,
"load_duration": 0,
"prompt_eval_count": prompt_tokens,
"prompt_eval_duration": prompt_eval_time,
"eval_count": completion_tokens,
"eval_duration": eval_time,
}
yield f"{json.dumps(data, ensure_ascii=False)}\n"
data = {
"model": ollama_server_infos.LIGHTRAG_MODEL,
"created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
"done": True,
"total_duration": total_time,
"load_duration": 0,
"prompt_eval_count": prompt_tokens,
"prompt_eval_duration": prompt_eval_time,
"eval_count": completion_tokens,
"eval_duration": eval_time,
}
yield f"{json.dumps(data, ensure_ascii=False)}\n"
return # Ensure the generator ends immediately after sending the completion marker
except Exception as e:
logging.error(f"Error in stream_generator: {str(e)}")
error_msg = f"Error in stream_generator: {str(e)}"
logging.error(error_msg)
# 发送错误消息给客户端
error_data = {
"model": ollama_server_infos.LIGHTRAG_MODEL,
"created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
"error": {
"code": "STREAM_ERROR",
"message": error_msg
},
"done": False
}
yield f"{json.dumps(error_data, ensure_ascii=False)}\n"
# 确保发送结束标记
final_data = {
"model": ollama_server_infos.LIGHTRAG_MODEL,
"created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
"done": True
}
yield f"{json.dumps(final_data, ensure_ascii=False)}\n"
raise
return StreamingResponse(