refactor: enhance stream error handling and optimize code structure

- Initialize timestamps at start to avoid null checks
- Add detailed error handling for streaming response
- Handle CancelledError and other exceptions separately
- Unify exception handling with trace_exception
- Clean up redundant code and simplify logic
This commit is contained in:
yangdx
2025-02-06 02:43:06 +08:00
parent b21d38802c
commit e26c6e564d

View File

@@ -203,16 +203,16 @@ class OllamaAPI:
) )
async def stream_generator(): async def stream_generator():
first_chunk_time = time.time_ns()
last_chunk_time = first_chunk_time
total_response = ""
try: try:
first_chunk_time = None
last_chunk_time = None
total_response = ""
# Ensure response is an async generator # Ensure response is an async generator
if isinstance(response, str): if isinstance(response, str):
# If it's a string, send in two parts # If it's a string, send in two parts
first_chunk_time = time.time_ns() last_chunk_time = time.time_ns()
last_chunk_time = first_chunk_time
total_response = response total_response = response
data = { data = {
@@ -241,21 +241,48 @@ class OllamaAPI:
} }
yield f"{json.dumps(data, ensure_ascii=False)}\n" yield f"{json.dumps(data, ensure_ascii=False)}\n"
else: else:
async for chunk in response: try:
if chunk: async for chunk in response:
if first_chunk_time is None: if chunk:
first_chunk_time = time.time_ns() if first_chunk_time is None:
first_chunk_time = time.time_ns()
last_chunk_time = time.time_ns() last_chunk_time = time.time_ns()
total_response += chunk total_response += chunk
data = { data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL, "model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"response": chunk, "response": chunk,
"done": False, "done": False,
} }
yield f"{json.dumps(data, ensure_ascii=False)}\n" yield f"{json.dumps(data, ensure_ascii=False)}\n"
except (asyncio.CancelledError, Exception) as e:
error_msg = str(e)
if isinstance(e, asyncio.CancelledError):
error_msg = "Stream was cancelled by server"
else:
error_msg = f"Provider error: {error_msg}"
logging.error(f"Stream error: {error_msg}")
# Send error message to client
error_data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"response": f"\n\nError: {error_msg}",
"done": False,
}
yield f"{json.dumps(error_data, ensure_ascii=False)}\n"
# Send final message to close the stream
final_data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"done": True,
}
yield f"{json.dumps(final_data, ensure_ascii=False)}\n"
return
completion_tokens = estimate_tokens(total_response) completion_tokens = estimate_tokens(total_response)
total_time = last_chunk_time - start_time total_time = last_chunk_time - start_time
@@ -381,16 +408,15 @@ class OllamaAPI:
) )
async def stream_generator(): async def stream_generator():
first_chunk_time = None first_chunk_time = time.time_ns()
last_chunk_time = None last_chunk_time = first_chunk_time
total_response = "" total_response = ""
try: try:
# Ensure response is an async generator # Ensure response is an async generator
if isinstance(response, str): if isinstance(response, str):
# If it's a string, send in two parts # If it's a string, send in two parts
first_chunk_time = time.time_ns() last_chunk_time = time.time_ns()
last_chunk_time = first_chunk_time
total_response = response total_response = response
data = { data = {
@@ -474,45 +500,27 @@ class OllamaAPI:
yield f"{json.dumps(final_data, ensure_ascii=False)}\n" yield f"{json.dumps(final_data, ensure_ascii=False)}\n"
return return
if last_chunk_time is not None: completion_tokens = estimate_tokens(total_response)
completion_tokens = estimate_tokens(total_response) total_time = last_chunk_time - start_time
total_time = last_chunk_time - start_time prompt_eval_time = first_chunk_time - start_time
prompt_eval_time = first_chunk_time - start_time eval_time = last_chunk_time - first_chunk_time
eval_time = last_chunk_time - first_chunk_time
data = { data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL, "model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT, "created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"done": True, "done": True,
"total_duration": total_time, "total_duration": total_time,
"load_duration": 0, "load_duration": 0,
"prompt_eval_count": prompt_tokens, "prompt_eval_count": prompt_tokens,
"prompt_eval_duration": prompt_eval_time, "prompt_eval_duration": prompt_eval_time,
"eval_count": completion_tokens, "eval_count": completion_tokens,
"eval_duration": eval_time, "eval_duration": eval_time,
} }
yield f"{json.dumps(data, ensure_ascii=False)}\n" yield f"{json.dumps(data, ensure_ascii=False)}\n"
except Exception as e: except Exception as e:
error_msg = f"Error in stream_generator: {str(e)}" trace_exception(e)
logging.error(error_msg) raise
# Send error message to client
error_data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"error": {"code": "STREAM_ERROR", "message": error_msg},
}
yield f"{json.dumps(error_data, ensure_ascii=False)}\n"
# Ensure sending end marker
final_data = {
"model": self.ollama_server_infos.LIGHTRAG_MODEL,
"created_at": self.ollama_server_infos.LIGHTRAG_CREATED_AT,
"done": True,
}
yield f"{json.dumps(final_data, ensure_ascii=False)}\n"
return
return StreamingResponse( return StreamingResponse(
stream_generator(), stream_generator(),