Merge pull request #712 from danielaskdd/handle-stream-cancel-error

Improve LLM Error Handling for API Server
This commit is contained in:
zrguo
2025-02-05 13:47:17 +08:00
committed by GitHub
2 changed files with 101 additions and 48 deletions

View File

@@ -12,7 +12,7 @@ from fastapi import (
# Add this to store progress globally # Add this to store progress globally
from typing import Dict from typing import Dict
import threading import threading
import asyncio
import json import json
import os import os
@@ -1725,11 +1725,11 @@ def create_app(args):
) )
async def stream_generator(): async def stream_generator():
try: first_chunk_time = None
first_chunk_time = None last_chunk_time = None
last_chunk_time = None total_response = ""
total_response = ""
try:
# Ensure response is an async generator # Ensure response is an async generator
if isinstance(response, str): if isinstance(response, str):
# If it's a string, send in two parts # If it's a string, send in two parts
@@ -1767,47 +1767,96 @@ def create_app(args):
} }
yield f"{json.dumps(data, ensure_ascii=False)}\n" yield f"{json.dumps(data, ensure_ascii=False)}\n"
else: else:
async for chunk in response: try:
if chunk: async for chunk in response:
if first_chunk_time is None: if chunk:
first_chunk_time = time.time_ns() if first_chunk_time is None:
first_chunk_time = time.time_ns()
last_chunk_time = time.time_ns() last_chunk_time = time.time_ns()
total_response += chunk total_response += chunk
data = { data = {
"model": ollama_server_infos.LIGHTRAG_MODEL, "model": ollama_server_infos.LIGHTRAG_MODEL,
"created_at": ollama_server_infos.LIGHTRAG_CREATED_AT, "created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
"message": { "message": {
"role": "assistant", "role": "assistant",
"content": chunk, "content": chunk,
"images": None, "images": None,
}, },
"done": False, "done": False,
} }
yield f"{json.dumps(data, ensure_ascii=False)}\n" yield f"{json.dumps(data, ensure_ascii=False)}\n"
except (asyncio.CancelledError, Exception) as e:
error_msg = str(e)
if isinstance(e, asyncio.CancelledError):
error_msg = "Stream was cancelled by server"
else:
error_msg = f"Provider error: {error_msg}"
completion_tokens = estimate_tokens(total_response) logging.error(f"Stream error: {error_msg}")
total_time = last_chunk_time - start_time
prompt_eval_time = first_chunk_time - start_time # Send error message to client
eval_time = last_chunk_time - first_chunk_time error_data = {
"model": ollama_server_infos.LIGHTRAG_MODEL,
"created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
"message": {
"role": "assistant",
"content": f"\n\nError: {error_msg}",
"images": None,
},
"done": False,
}
yield f"{json.dumps(error_data, ensure_ascii=False)}\n"
# Send final message to close the stream
final_data = {
"model": ollama_server_infos.LIGHTRAG_MODEL,
"created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
"done": True,
}
yield f"{json.dumps(final_data, ensure_ascii=False)}\n"
return
if last_chunk_time is not None:
completion_tokens = estimate_tokens(total_response)
total_time = last_chunk_time - start_time
prompt_eval_time = first_chunk_time - start_time
eval_time = last_chunk_time - first_chunk_time
data = {
"model": ollama_server_infos.LIGHTRAG_MODEL,
"created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
"done": True,
"total_duration": total_time,
"load_duration": 0,
"prompt_eval_count": prompt_tokens,
"prompt_eval_duration": prompt_eval_time,
"eval_count": completion_tokens,
"eval_duration": eval_time,
}
yield f"{json.dumps(data, ensure_ascii=False)}\n"
data = {
"model": ollama_server_infos.LIGHTRAG_MODEL,
"created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
"done": True,
"total_duration": total_time,
"load_duration": 0,
"prompt_eval_count": prompt_tokens,
"prompt_eval_duration": prompt_eval_time,
"eval_count": completion_tokens,
"eval_duration": eval_time,
}
yield f"{json.dumps(data, ensure_ascii=False)}\n"
return # Ensure the generator ends immediately after sending the completion marker
except Exception as e: except Exception as e:
logging.error(f"Error in stream_generator: {str(e)}") error_msg = f"Error in stream_generator: {str(e)}"
raise logging.error(error_msg)
# Send error message to client
error_data = {
"model": ollama_server_infos.LIGHTRAG_MODEL,
"created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
"error": {"code": "STREAM_ERROR", "message": error_msg},
}
yield f"{json.dumps(error_data, ensure_ascii=False)}\n"
# Ensure sending end marker
final_data = {
"model": ollama_server_infos.LIGHTRAG_MODEL,
"created_at": ollama_server_infos.LIGHTRAG_CREATED_AT,
"done": True,
}
yield f"{json.dumps(final_data, ensure_ascii=False)}\n"
return
return StreamingResponse( return StreamingResponse(
stream_generator(), stream_generator(),

View File

@@ -125,13 +125,17 @@ async def openai_complete_if_cache(
if hasattr(response, "__aiter__"): if hasattr(response, "__aiter__"):
async def inner(): async def inner():
async for chunk in response: try:
content = chunk.choices[0].delta.content async for chunk in response:
if content is None: content = chunk.choices[0].delta.content
continue if content is None:
if r"\u" in content: continue
content = safe_unicode_decode(content.encode("utf-8")) if r"\u" in content:
yield content content = safe_unicode_decode(content.encode("utf-8"))
yield content
except Exception as e:
logger.error(f"Error in stream response: {str(e)}")
raise
return inner() return inner()
else: else: