diff --git a/lightrag/llm/openai.py b/lightrag/llm/openai.py index 68f26e4a..b339a0de 100644 --- a/lightrag/llm/openai.py +++ b/lightrag/llm/openai.py @@ -99,8 +99,11 @@ def create_openai_async_client( @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10), - retry=retry_if_exception_type( - (RateLimitError, APIConnectionError, APITimeoutError, InvalidResponseError) + retry=( + retry_if_exception_type(RateLimitError) | + retry_if_exception_type(APIConnectionError) | + retry_if_exception_type(APITimeoutError) | + retry_if_exception_type(InvalidResponseError) ), ) async def openai_complete_if_cache( @@ -200,7 +203,10 @@ async def openai_complete_if_cache( if hasattr(response, "__aiter__"): async def inner(): + # Track if we've started iterating + iteration_started = False try: + iteration_started = True async for chunk in response: # Check if choices exists and is not empty if not hasattr(chunk, "choices") or not chunk.choices: @@ -223,7 +229,22 @@ async def openai_complete_if_cache( yield content except Exception as e: logger.error(f"Error in stream response: {str(e)}") + # Try to clean up resources if possible + if iteration_started and hasattr(response, "aclose") and callable(getattr(response, "aclose", None)): + try: + await response.aclose() + logger.debug("Successfully closed stream response after error") + except Exception as close_error: + logger.warning(f"Failed to close stream response: {close_error}") raise + finally: + # Ensure resources are released even if no exception occurs + if iteration_started and hasattr(response, "aclose") and callable(getattr(response, "aclose", None)): + try: + await response.aclose() + logger.debug("Successfully closed stream response") + except Exception as close_error: + logger.warning(f"Failed to close stream response in finally block: {close_error}") return inner() @@ -351,8 +372,10 @@ async def nvidia_openai_complete( @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=60), - retry=retry_if_exception_type( - (RateLimitError, APIConnectionError, APITimeoutError) + retry=( + retry_if_exception_type(RateLimitError) | + retry_if_exception_type(APIConnectionError) | + retry_if_exception_type(APITimeoutError) ), ) async def openai_embed(