From 69f200faf2f755b709c07b4557714fe8f3ace930 Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 5 Feb 2025 09:46:56 +0800 Subject: [PATCH 1/5] feat: improve error handling for streaming responses MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Add CancelledError handling for streams • Send error details to client in JSON • Add error status codes and messages • Always send final completion marker • Refactor stream generator error handling --- lightrag/api/lightrag_server.py | 115 +++++++++++++++++++++----------- 1 file changed, 75 insertions(+), 40 deletions(-) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index ec58f552..27042a27 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -12,7 +12,7 @@ from fastapi import ( # Add this to store progress globally from typing import Dict import threading - +import asyncio import json import os @@ -1718,11 +1718,11 @@ def create_app(args): ) async def stream_generator(): + first_chunk_time = None + last_chunk_time = None + total_response = "" + try: - first_chunk_time = None - last_chunk_time = None - total_response = "" - # Ensure response is an async generator if isinstance(response, str): # If it's a string, send in two parts @@ -1760,46 +1760,81 @@ def create_app(args): } yield f"{json.dumps(data, ensure_ascii=False)}\n" else: - async for chunk in response: - if chunk: - if first_chunk_time is None: - first_chunk_time = time.time_ns() + try: + async for chunk in response: + if chunk: + if first_chunk_time is None: + first_chunk_time = time.time_ns() - last_chunk_time = time.time_ns() + last_chunk_time = time.time_ns() - total_response += chunk - data = { - "model": ollama_server_infos.LIGHTRAG_MODEL, - "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, - "message": { - "role": "assistant", - "content": chunk, - "images": None, - }, - "done": False, - } - yield f"{json.dumps(data, ensure_ascii=False)}\n" + total_response += chunk + data = { + "model": ollama_server_infos.LIGHTRAG_MODEL, + "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, + "message": { + "role": "assistant", + "content": chunk, + "images": None, + }, + "done": False, + } + yield f"{json.dumps(data, ensure_ascii=False)}\n" + except asyncio.CancelledError: + error_data = { + "model": ollama_server_infos.LIGHTRAG_MODEL, + "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, + "error": { + "code": "STREAM_CANCELLED", + "message": "Stream was cancelled by server" + }, + "done": False + } + yield f"{json.dumps(error_data, ensure_ascii=False)}\n" + raise - completion_tokens = estimate_tokens(total_response) - total_time = last_chunk_time - start_time - prompt_eval_time = first_chunk_time - start_time - eval_time = last_chunk_time - first_chunk_time + if last_chunk_time is not None: + completion_tokens = estimate_tokens(total_response) + total_time = last_chunk_time - start_time + prompt_eval_time = first_chunk_time - start_time + eval_time = last_chunk_time - first_chunk_time + + data = { + "model": ollama_server_infos.LIGHTRAG_MODEL, + "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, + "done": True, + "total_duration": total_time, + "load_duration": 0, + "prompt_eval_count": prompt_tokens, + "prompt_eval_duration": prompt_eval_time, + "eval_count": completion_tokens, + "eval_duration": eval_time, + } + yield f"{json.dumps(data, ensure_ascii=False)}\n" - data = { - "model": ollama_server_infos.LIGHTRAG_MODEL, - "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, - "done": True, - "total_duration": total_time, - "load_duration": 0, - "prompt_eval_count": prompt_tokens, - "prompt_eval_duration": prompt_eval_time, - "eval_count": completion_tokens, - "eval_duration": eval_time, - } - yield f"{json.dumps(data, ensure_ascii=False)}\n" - return # Ensure the generator ends immediately after sending the completion marker except Exception as e: - logging.error(f"Error in stream_generator: {str(e)}") + error_msg = f"Error in stream_generator: {str(e)}" + logging.error(error_msg) + + # 发送错误消息给客户端 + error_data = { + "model": ollama_server_infos.LIGHTRAG_MODEL, + "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, + "error": { + "code": "STREAM_ERROR", + "message": error_msg + }, + "done": False + } + yield f"{json.dumps(error_data, ensure_ascii=False)}\n" + + # 确保发送结束标记 + final_data = { + "model": ollama_server_infos.LIGHTRAG_MODEL, + "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, + "done": True + } + yield f"{json.dumps(final_data, ensure_ascii=False)}\n" raise return StreamingResponse( From ff40e61fad530ef516934d7881f9ad1f6f999311 Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 5 Feb 2025 09:47:39 +0800 Subject: [PATCH 2/5] Fix linting --- lightrag/api/lightrag_server.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 27042a27..131be01f 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -1721,7 +1721,7 @@ def create_app(args): first_chunk_time = None last_chunk_time = None total_response = "" - + try: # Ensure response is an async generator if isinstance(response, str): @@ -1786,9 +1786,9 @@ def create_app(args): "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, "error": { "code": "STREAM_CANCELLED", - "message": "Stream was cancelled by server" + "message": "Stream was cancelled by server", }, - "done": False + "done": False, } yield f"{json.dumps(error_data, ensure_ascii=False)}\n" raise @@ -1815,24 +1815,21 @@ def create_app(args): except Exception as e: error_msg = f"Error in stream_generator: {str(e)}" logging.error(error_msg) - + # 发送错误消息给客户端 error_data = { "model": ollama_server_infos.LIGHTRAG_MODEL, "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, - "error": { - "code": "STREAM_ERROR", - "message": error_msg - }, - "done": False + "error": {"code": "STREAM_ERROR", "message": error_msg}, + "done": False, } yield f"{json.dumps(error_data, ensure_ascii=False)}\n" - + # 确保发送结束标记 final_data = { "model": ollama_server_infos.LIGHTRAG_MODEL, "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, - "done": True + "done": True, } yield f"{json.dumps(final_data, ensure_ascii=False)}\n" raise From 24effb127dbff8ddfc9868984460bd99ebbf65cd Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 5 Feb 2025 10:44:48 +0800 Subject: [PATCH 3/5] Improve error handling and response consistency in streaming endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Add error message forwarding to client • Handle stream cancellations gracefully • Add logging for stream errors • Ensure clean stream termination • Add try-catch in OpenAI streaming --- lightrag/api/lightrag_server.py | 38 ++++++++++++++++++++++++--------- lightrag/llm/openai.py | 18 ++++++++++------ 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 131be01f..d8412a13 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -1780,18 +1780,34 @@ def create_app(args): "done": False, } yield f"{json.dumps(data, ensure_ascii=False)}\n" - except asyncio.CancelledError: + except (asyncio.CancelledError, Exception) as e: + error_msg = str(e) + if isinstance(e, asyncio.CancelledError): + error_msg = "Stream was cancelled by server" + else: + error_msg = f"Provider error: {error_msg}" + + logging.error(f"Stream error: {error_msg}") + + # Send error message to client error_data = { "model": ollama_server_infos.LIGHTRAG_MODEL, "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, "error": { - "code": "STREAM_CANCELLED", - "message": "Stream was cancelled by server", + "code": "STREAM_ERROR", + "message": error_msg }, - "done": False, } yield f"{json.dumps(error_data, ensure_ascii=False)}\n" - raise + + # Send final message to close the stream + final_data = { + "model": ollama_server_infos.LIGHTRAG_MODEL, + "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, + "done": True, + } + yield f"{json.dumps(final_data, ensure_ascii=False)}\n" + return if last_chunk_time is not None: completion_tokens = estimate_tokens(total_response) @@ -1816,23 +1832,25 @@ def create_app(args): error_msg = f"Error in stream_generator: {str(e)}" logging.error(error_msg) - # 发送错误消息给客户端 + # Send error message to client error_data = { "model": ollama_server_infos.LIGHTRAG_MODEL, "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, - "error": {"code": "STREAM_ERROR", "message": error_msg}, - "done": False, + "error": { + "code": "STREAM_ERROR", + "message": error_msg + }, } yield f"{json.dumps(error_data, ensure_ascii=False)}\n" - # 确保发送结束标记 + # Ensure sending end marker final_data = { "model": ollama_server_infos.LIGHTRAG_MODEL, "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, "done": True, } yield f"{json.dumps(final_data, ensure_ascii=False)}\n" - raise + return return StreamingResponse( stream_generator(), diff --git a/lightrag/llm/openai.py b/lightrag/llm/openai.py index 11ba69c0..4eaca093 100644 --- a/lightrag/llm/openai.py +++ b/lightrag/llm/openai.py @@ -125,13 +125,17 @@ async def openai_complete_if_cache( if hasattr(response, "__aiter__"): async def inner(): - async for chunk in response: - content = chunk.choices[0].delta.content - if content is None: - continue - if r"\u" in content: - content = safe_unicode_decode(content.encode("utf-8")) - yield content + try: + async for chunk in response: + content = chunk.choices[0].delta.content + if content is None: + continue + if r"\u" in content: + content = safe_unicode_decode(content.encode("utf-8")) + yield content + except Exception as e: + logger.error(f"Error in stream response: {str(e)}") + raise return inner() else: From f1ea7f7415ca53adf2576dab03d50ed137d5d61d Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 5 Feb 2025 11:07:31 +0800 Subject: [PATCH 4/5] update error response format in streaming API to a normal message. So user can get what's going on. --- lightrag/api/lightrag_server.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index d8412a13..c9144d0e 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -1793,10 +1793,12 @@ def create_app(args): error_data = { "model": ollama_server_infos.LIGHTRAG_MODEL, "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, - "error": { - "code": "STREAM_ERROR", - "message": error_msg + "message": { + "role": "assistant", + "content": f"\n\nError: {error_msg}", + "images": None }, + "done": False } yield f"{json.dumps(error_data, ensure_ascii=False)}\n" From f77faf80237bd474e4c5f3fd33addfe2698e95fd Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 5 Feb 2025 12:36:52 +0800 Subject: [PATCH 5/5] Fix linting --- lightrag/api/lightrag_server.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index aadd1c09..345136aa 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -1793,9 +1793,9 @@ def create_app(args): error_msg = "Stream was cancelled by server" else: error_msg = f"Provider error: {error_msg}" - + logging.error(f"Stream error: {error_msg}") - + # Send error message to client error_data = { "model": ollama_server_infos.LIGHTRAG_MODEL, @@ -1803,12 +1803,12 @@ def create_app(args): "message": { "role": "assistant", "content": f"\n\nError: {error_msg}", - "images": None + "images": None, }, - "done": False + "done": False, } yield f"{json.dumps(error_data, ensure_ascii=False)}\n" - + # Send final message to close the stream final_data = { "model": ollama_server_infos.LIGHTRAG_MODEL, @@ -1845,10 +1845,7 @@ def create_app(args): error_data = { "model": ollama_server_infos.LIGHTRAG_MODEL, "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, - "error": { - "code": "STREAM_ERROR", - "message": error_msg - }, + "error": {"code": "STREAM_ERROR", "message": error_msg}, } yield f"{json.dumps(error_data, ensure_ascii=False)}\n"