From 69f200faf2f755b709c07b4557714fe8f3ace930 Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 5 Feb 2025 09:46:56 +0800 Subject: [PATCH] feat: improve error handling for streaming responses MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Add CancelledError handling for streams • Send error details to client in JSON • Add error status codes and messages • Always send final completion marker • Refactor stream generator error handling --- lightrag/api/lightrag_server.py | 115 +++++++++++++++++++++----------- 1 file changed, 75 insertions(+), 40 deletions(-) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index ec58f552..27042a27 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -12,7 +12,7 @@ from fastapi import ( # Add this to store progress globally from typing import Dict import threading - +import asyncio import json import os @@ -1718,11 +1718,11 @@ def create_app(args): ) async def stream_generator(): + first_chunk_time = None + last_chunk_time = None + total_response = "" + try: - first_chunk_time = None - last_chunk_time = None - total_response = "" - # Ensure response is an async generator if isinstance(response, str): # If it's a string, send in two parts @@ -1760,46 +1760,81 @@ def create_app(args): } yield f"{json.dumps(data, ensure_ascii=False)}\n" else: - async for chunk in response: - if chunk: - if first_chunk_time is None: - first_chunk_time = time.time_ns() + try: + async for chunk in response: + if chunk: + if first_chunk_time is None: + first_chunk_time = time.time_ns() - last_chunk_time = time.time_ns() + last_chunk_time = time.time_ns() - total_response += chunk - data = { - "model": ollama_server_infos.LIGHTRAG_MODEL, - "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, - "message": { - "role": "assistant", - "content": chunk, - "images": None, - }, - "done": False, - } - yield f"{json.dumps(data, ensure_ascii=False)}\n" + total_response += chunk + data = { + "model": ollama_server_infos.LIGHTRAG_MODEL, + "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, + "message": { + "role": "assistant", + "content": chunk, + "images": None, + }, + "done": False, + } + yield f"{json.dumps(data, ensure_ascii=False)}\n" + except asyncio.CancelledError: + error_data = { + "model": ollama_server_infos.LIGHTRAG_MODEL, + "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, + "error": { + "code": "STREAM_CANCELLED", + "message": "Stream was cancelled by server" + }, + "done": False + } + yield f"{json.dumps(error_data, ensure_ascii=False)}\n" + raise - completion_tokens = estimate_tokens(total_response) - total_time = last_chunk_time - start_time - prompt_eval_time = first_chunk_time - start_time - eval_time = last_chunk_time - first_chunk_time + if last_chunk_time is not None: + completion_tokens = estimate_tokens(total_response) + total_time = last_chunk_time - start_time + prompt_eval_time = first_chunk_time - start_time + eval_time = last_chunk_time - first_chunk_time + + data = { + "model": ollama_server_infos.LIGHTRAG_MODEL, + "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, + "done": True, + "total_duration": total_time, + "load_duration": 0, + "prompt_eval_count": prompt_tokens, + "prompt_eval_duration": prompt_eval_time, + "eval_count": completion_tokens, + "eval_duration": eval_time, + } + yield f"{json.dumps(data, ensure_ascii=False)}\n" - data = { - "model": ollama_server_infos.LIGHTRAG_MODEL, - "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, - "done": True, - "total_duration": total_time, - "load_duration": 0, - "prompt_eval_count": prompt_tokens, - "prompt_eval_duration": prompt_eval_time, - "eval_count": completion_tokens, - "eval_duration": eval_time, - } - yield f"{json.dumps(data, ensure_ascii=False)}\n" - return # Ensure the generator ends immediately after sending the completion marker except Exception as e: - logging.error(f"Error in stream_generator: {str(e)}") + error_msg = f"Error in stream_generator: {str(e)}" + logging.error(error_msg) + + # 发送错误消息给客户端 + error_data = { + "model": ollama_server_infos.LIGHTRAG_MODEL, + "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, + "error": { + "code": "STREAM_ERROR", + "message": error_msg + }, + "done": False + } + yield f"{json.dumps(error_data, ensure_ascii=False)}\n" + + # 确保发送结束标记 + final_data = { + "model": ollama_server_infos.LIGHTRAG_MODEL, + "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, + "done": True + } + yield f"{json.dumps(final_data, ensure_ascii=False)}\n" raise return StreamingResponse(