diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 27ce8a8d..345136aa 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -12,7 +12,7 @@ from fastapi import ( # Add this to store progress globally from typing import Dict import threading - +import asyncio import json import os @@ -1725,11 +1725,11 @@ def create_app(args): ) async def stream_generator(): - try: - first_chunk_time = None - last_chunk_time = None - total_response = "" + first_chunk_time = None + last_chunk_time = None + total_response = "" + try: # Ensure response is an async generator if isinstance(response, str): # If it's a string, send in two parts @@ -1767,47 +1767,96 @@ def create_app(args): } yield f"{json.dumps(data, ensure_ascii=False)}\n" else: - async for chunk in response: - if chunk: - if first_chunk_time is None: - first_chunk_time = time.time_ns() + try: + async for chunk in response: + if chunk: + if first_chunk_time is None: + first_chunk_time = time.time_ns() - last_chunk_time = time.time_ns() + last_chunk_time = time.time_ns() - total_response += chunk - data = { - "model": ollama_server_infos.LIGHTRAG_MODEL, - "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, - "message": { - "role": "assistant", - "content": chunk, - "images": None, - }, - "done": False, - } - yield f"{json.dumps(data, ensure_ascii=False)}\n" + total_response += chunk + data = { + "model": ollama_server_infos.LIGHTRAG_MODEL, + "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, + "message": { + "role": "assistant", + "content": chunk, + "images": None, + }, + "done": False, + } + yield f"{json.dumps(data, ensure_ascii=False)}\n" + except (asyncio.CancelledError, Exception) as e: + error_msg = str(e) + if isinstance(e, asyncio.CancelledError): + error_msg = "Stream was cancelled by server" + else: + error_msg = f"Provider error: {error_msg}" - completion_tokens = estimate_tokens(total_response) - total_time = last_chunk_time - start_time - prompt_eval_time = first_chunk_time - start_time - eval_time = last_chunk_time - first_chunk_time + logging.error(f"Stream error: {error_msg}") + + # Send error message to client + error_data = { + "model": ollama_server_infos.LIGHTRAG_MODEL, + "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, + "message": { + "role": "assistant", + "content": f"\n\nError: {error_msg}", + "images": None, + }, + "done": False, + } + yield f"{json.dumps(error_data, ensure_ascii=False)}\n" + + # Send final message to close the stream + final_data = { + "model": ollama_server_infos.LIGHTRAG_MODEL, + "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, + "done": True, + } + yield f"{json.dumps(final_data, ensure_ascii=False)}\n" + return + + if last_chunk_time is not None: + completion_tokens = estimate_tokens(total_response) + total_time = last_chunk_time - start_time + prompt_eval_time = first_chunk_time - start_time + eval_time = last_chunk_time - first_chunk_time + + data = { + "model": ollama_server_infos.LIGHTRAG_MODEL, + "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, + "done": True, + "total_duration": total_time, + "load_duration": 0, + "prompt_eval_count": prompt_tokens, + "prompt_eval_duration": prompt_eval_time, + "eval_count": completion_tokens, + "eval_duration": eval_time, + } + yield f"{json.dumps(data, ensure_ascii=False)}\n" - data = { - "model": ollama_server_infos.LIGHTRAG_MODEL, - "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, - "done": True, - "total_duration": total_time, - "load_duration": 0, - "prompt_eval_count": prompt_tokens, - "prompt_eval_duration": prompt_eval_time, - "eval_count": completion_tokens, - "eval_duration": eval_time, - } - yield f"{json.dumps(data, ensure_ascii=False)}\n" - return # Ensure the generator ends immediately after sending the completion marker except Exception as e: - logging.error(f"Error in stream_generator: {str(e)}") - raise + error_msg = f"Error in stream_generator: {str(e)}" + logging.error(error_msg) + + # Send error message to client + error_data = { + "model": ollama_server_infos.LIGHTRAG_MODEL, + "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, + "error": {"code": "STREAM_ERROR", "message": error_msg}, + } + yield f"{json.dumps(error_data, ensure_ascii=False)}\n" + + # Ensure sending end marker + final_data = { + "model": ollama_server_infos.LIGHTRAG_MODEL, + "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, + "done": True, + } + yield f"{json.dumps(final_data, ensure_ascii=False)}\n" + return return StreamingResponse( stream_generator(), diff --git a/lightrag/llm/openai.py b/lightrag/llm/openai.py index 11ba69c0..4eaca093 100644 --- a/lightrag/llm/openai.py +++ b/lightrag/llm/openai.py @@ -125,13 +125,17 @@ async def openai_complete_if_cache( if hasattr(response, "__aiter__"): async def inner(): - async for chunk in response: - content = chunk.choices[0].delta.content - if content is None: - continue - if r"\u" in content: - content = safe_unicode_decode(content.encode("utf-8")) - yield content + try: + async for chunk in response: + content = chunk.choices[0].delta.content + if content is None: + continue + if r"\u" in content: + content = safe_unicode_decode(content.encode("utf-8")) + yield content + except Exception as e: + logger.error(f"Error in stream response: {str(e)}") + raise return inner() else: