From 24effb127dbff8ddfc9868984460bd99ebbf65cd Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 5 Feb 2025 10:44:48 +0800 Subject: [PATCH] Improve error handling and response consistency in streaming endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Add error message forwarding to client • Handle stream cancellations gracefully • Add logging for stream errors • Ensure clean stream termination • Add try-catch in OpenAI streaming --- lightrag/api/lightrag_server.py | 38 ++++++++++++++++++++++++--------- lightrag/llm/openai.py | 18 ++++++++++------ 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 131be01f..d8412a13 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -1780,18 +1780,34 @@ def create_app(args): "done": False, } yield f"{json.dumps(data, ensure_ascii=False)}\n" - except asyncio.CancelledError: + except (asyncio.CancelledError, Exception) as e: + error_msg = str(e) + if isinstance(e, asyncio.CancelledError): + error_msg = "Stream was cancelled by server" + else: + error_msg = f"Provider error: {error_msg}" + + logging.error(f"Stream error: {error_msg}") + + # Send error message to client error_data = { "model": ollama_server_infos.LIGHTRAG_MODEL, "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, "error": { - "code": "STREAM_CANCELLED", - "message": "Stream was cancelled by server", + "code": "STREAM_ERROR", + "message": error_msg }, - "done": False, } yield f"{json.dumps(error_data, ensure_ascii=False)}\n" - raise + + # Send final message to close the stream + final_data = { + "model": ollama_server_infos.LIGHTRAG_MODEL, + "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, + "done": True, + } + yield f"{json.dumps(final_data, ensure_ascii=False)}\n" + return if last_chunk_time is not None: completion_tokens = estimate_tokens(total_response) @@ -1816,23 +1832,25 @@ def create_app(args): error_msg = f"Error in stream_generator: {str(e)}" logging.error(error_msg) - # 发送错误消息给客户端 + # Send error message to client error_data = { "model": ollama_server_infos.LIGHTRAG_MODEL, "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, - "error": {"code": "STREAM_ERROR", "message": error_msg}, - "done": False, + "error": { + "code": "STREAM_ERROR", + "message": error_msg + }, } yield f"{json.dumps(error_data, ensure_ascii=False)}\n" - # 确保发送结束标记 + # Ensure sending end marker final_data = { "model": ollama_server_infos.LIGHTRAG_MODEL, "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, "done": True, } yield f"{json.dumps(final_data, ensure_ascii=False)}\n" - raise + return return StreamingResponse( stream_generator(), diff --git a/lightrag/llm/openai.py b/lightrag/llm/openai.py index 11ba69c0..4eaca093 100644 --- a/lightrag/llm/openai.py +++ b/lightrag/llm/openai.py @@ -125,13 +125,17 @@ async def openai_complete_if_cache( if hasattr(response, "__aiter__"): async def inner(): - async for chunk in response: - content = chunk.choices[0].delta.content - if content is None: - continue - if r"\u" in content: - content = safe_unicode_decode(content.encode("utf-8")) - yield content + try: + async for chunk in response: + content = chunk.choices[0].delta.content + if content is None: + continue + if r"\u" in content: + content = safe_unicode_decode(content.encode("utf-8")) + yield content + except Exception as e: + logger.error(f"Error in stream response: {str(e)}") + raise return inner() else: