refactor: enhance stream error handling and optimize code structure

- Initialize timestamps at start to avoid null checks
- Add detailed error handling for streaming response
- Handle CancelledError and other exceptions separately
- Unify exception handling with trace_exception
- Clean up redundant code and simplify logic
This commit is contained in:
yangdx
2025-02-06 02:43:06 +08:00
committed by ultrageopro
parent b90f3f14be
commit 1508dcb403

View File

@@ -203,16 +203,16 @@ class OllamaAPI:
) )
async def stream_generator(): async def stream_generator():
try: first_chunk_time = time.time_ns()
first_chunk_time = None last_chunk_time = first_chunk_time
last_chunk_time = None
total_response = "" total_response = ""
try:
# Ensure response is an async generator # Ensure response is an async generator
if isinstance(response, str): if isinstance(response, str):
# If it's a string, send in two parts # If it's a string, send in two parts
first_chunk_time = time.time_ns() last_chunk_time = time.time_ns()
last_chunk_time = first_chunk_time
total_response = response total_response = response
data = { data = {
@@ -241,6 +241,7 @@ class OllamaAPI:
} }
yield f"{json.dumps(data, ensure_ascii=False)}\n" yield f"{json.dumps(data, ensure_ascii=False)}\n"
else: else:
try:
async for chunk in response: async for chunk in response:
if chunk: if chunk:
if first_chunk_time is None: if first_chunk_time is None:
@@ -256,6 +257,32 @@ class OllamaAPI:
"done": False, "done": False,
} }
yield f"{json.dumps(data, ensure_ascii=False)}\n" yield f"{json.dumps(data, ensure_ascii=False)}\n"
except (asyncio.CancelledError, Exception) as e:
error_msg = str(e)
if isinstance(e, asyncio.CancelledError):
error_msg = "Stream was cancelled by server"
else:
error_msg = f"Provider error: {error_msg}"
logging.error(f"Stream error: {error_msg}")
# Send error message to client
error_data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"response": f"\n\nError: {error_msg}",
"done": False,
}
yield f"{json.dumps(error_data, ensure_ascii=False)}\n"
# Send final message to close the stream
final_data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"done": True,
}
yield f"{json.dumps(final_data, ensure_ascii=False)}\n"
return
completion_tokens = estimate_tokens(total_response) completion_tokens = estimate_tokens(total_response)
total_time = last_chunk_time - start_time total_time = last_chunk_time - start_time
@@ -381,16 +408,15 @@ class OllamaAPI:
) )
async def stream_generator(): async def stream_generator():
first_chunk_time = None first_chunk_time = time.time_ns()
last_chunk_time = None last_chunk_time = first_chunk_time
total_response = "" total_response = ""
try: try:
# Ensure response is an async generator # Ensure response is an async generator
if isinstance(response, str): if isinstance(response, str):
# If it's a string, send in two parts # If it's a string, send in two parts
first_chunk_time = time.time_ns() last_chunk_time = time.time_ns()
last_chunk_time = first_chunk_time
total_response = response total_response = response
data = { data = {
@@ -474,7 +500,6 @@ class OllamaAPI:
yield f"{json.dumps(final_data, ensure_ascii=False)}\n" yield f"{json.dumps(final_data, ensure_ascii=False)}\n"
return return
if last_chunk_time is not None:
completion_tokens = estimate_tokens(total_response) completion_tokens = estimate_tokens(total_response)
total_time = last_chunk_time - start_time total_time = last_chunk_time - start_time
prompt_eval_time = first_chunk_time - start_time prompt_eval_time = first_chunk_time - start_time
@@ -494,25 +519,8 @@ class OllamaAPI:
yield f"{json.dumps(data, ensure_ascii=False)}\n" yield f"{json.dumps(data, ensure_ascii=False)}\n"
except Exception as e: except Exception as e:
error_msg = f"Error in stream_generator: {str(e)}" trace_exception(e)
logging.error(error_msg) raise
# Send error message to client
error_data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"error": {"code": "STREAM_ERROR", "message": error_msg},
}
yield f"{json.dumps(error_data, ensure_ascii=False)}\n"
# Ensure sending end marker
final_data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"done": True,
}
yield f"{json.dumps(final_data, ensure_ascii=False)}\n"
return
return StreamingResponse( return StreamingResponse(
stream_generator(), stream_generator(),