Improve error handling and response consistency in streaming endpoints

• Add error message forwarding to client
• Handle stream cancellations gracefully
• Add logging for stream errors
• Ensure clean stream termination
• Add try-catch in OpenAI streaming
This commit is contained in:
yangdx
2025-02-05 10:44:48 +08:00
parent ff40e61fad
commit 24effb127d
2 changed files with 39 additions and 17 deletions

View File

@@ -1780,18 +1780,34 @@ def create_app(args):
"done": False,
}
yield f"{json.dumps(data, ensure_ascii=False)}\n"
except asyncio.CancelledError:
except (asyncio.CancelledError, Exception) as e:
error_msg = str(e)
if isinstance(e, asyncio.CancelledError):
error_msg = "Stream was cancelled by server"
else:
error_msg = f"Provider error: {error_msg}"
logging.error(f"Stream error: {error_msg}")
# Send error message to client
error_data = {
"model": ollama_server_infos.LIGHTRAG_MODEL,
"created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
"error": {
"code": "STREAM_CANCELLED",
"message": "Stream was cancelled by server",
"code": "STREAM_ERROR",
"message": error_msg
},
"done": False,
}
yield f"{json.dumps(error_data, ensure_ascii=False)}\n"
raise
# Send final message to close the stream
final_data = {
"model": ollama_server_infos.LIGHTRAG_MODEL,
"created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
"done": True,
}
yield f"{json.dumps(final_data, ensure_ascii=False)}\n"
return
if last_chunk_time is not None:
completion_tokens = estimate_tokens(total_response)
@@ -1816,23 +1832,25 @@ def create_app(args):
error_msg = f"Error in stream_generator: {str(e)}"
logging.error(error_msg)
# 发送错误消息给客户端
# Send error message to client
error_data = {
"model": ollama_server_infos.LIGHTRAG_MODEL,
"created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
"error": {"code": "STREAM_ERROR", "message": error_msg},
"done": False,
"error": {
"code": "STREAM_ERROR",
"message": error_msg
},
}
yield f"{json.dumps(error_data, ensure_ascii=False)}\n"
# 确保发送结束标记
# Ensure sending end marker
final_data = {
"model": ollama_server_infos.LIGHTRAG_MODEL,
"created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
"done": True,
}
yield f"{json.dumps(final_data, ensure_ascii=False)}\n"
raise
return
return StreamingResponse(
stream_generator(),

View File

@@ -125,6 +125,7 @@ async def openai_complete_if_cache(
if hasattr(response, "__aiter__"):
async def inner():
try:
async for chunk in response:
content = chunk.choices[0].delta.content
if content is None:
@@ -132,6 +133,9 @@ async def openai_complete_if_cache(
if r"\u" in content:
content = safe_unicode_decode(content.encode("utf-8"))
yield content
except Exception as e:
logger.error(f"Error in stream response: {str(e)}")
raise
return inner()
else: