From f58c8276bcc84cdf6dd0a9adbfb1d4135ccd393e Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 29 Apr 2025 17:43:27 +0800 Subject: [PATCH 1/2] fix: correct retry_if_exception_type usage and improve async iterator resource management - Corrects the syntax of retry_if_exception_type decorators to ensure proper exception handling and retry behavior - Implements proper resource cleanup for async iterators to prevent memory leaks and potential SIGSEGV errors --- lightrag/llm/openai.py | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/lightrag/llm/openai.py b/lightrag/llm/openai.py index 68f26e4a..b339a0de 100644 --- a/lightrag/llm/openai.py +++ b/lightrag/llm/openai.py @@ -99,8 +99,11 @@ def create_openai_async_client( @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10), - retry=retry_if_exception_type( - (RateLimitError, APIConnectionError, APITimeoutError, InvalidResponseError) + retry=( + retry_if_exception_type(RateLimitError) | + retry_if_exception_type(APIConnectionError) | + retry_if_exception_type(APITimeoutError) | + retry_if_exception_type(InvalidResponseError) ), ) async def openai_complete_if_cache( @@ -200,7 +203,10 @@ async def openai_complete_if_cache( if hasattr(response, "__aiter__"): async def inner(): + # Track if we've started iterating + iteration_started = False try: + iteration_started = True async for chunk in response: # Check if choices exists and is not empty if not hasattr(chunk, "choices") or not chunk.choices: @@ -223,7 +229,22 @@ async def openai_complete_if_cache( yield content except Exception as e: logger.error(f"Error in stream response: {str(e)}") + # Try to clean up resources if possible + if iteration_started and hasattr(response, "aclose") and callable(getattr(response, "aclose", None)): + try: + await response.aclose() + logger.debug("Successfully closed stream response after error") + except Exception as close_error: + logger.warning(f"Failed to close stream response: {close_error}") raise + finally: + # Ensure resources are released even if no exception occurs + if iteration_started and hasattr(response, "aclose") and callable(getattr(response, "aclose", None)): + try: + await response.aclose() + logger.debug("Successfully closed stream response") + except Exception as close_error: + logger.warning(f"Failed to close stream response in finally block: {close_error}") return inner() @@ -351,8 +372,10 @@ async def nvidia_openai_complete( @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=60), - retry=retry_if_exception_type( - (RateLimitError, APIConnectionError, APITimeoutError) + retry=( + retry_if_exception_type(RateLimitError) | + retry_if_exception_type(APIConnectionError) | + retry_if_exception_type(APITimeoutError) ), ) async def openai_embed( From 34cc8b6a517ee7bde8d8f6a8ca7eddb963debefd Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 29 Apr 2025 17:52:07 +0800 Subject: [PATCH 2/2] Fix linting --- lightrag/llm/openai.py | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/lightrag/llm/openai.py b/lightrag/llm/openai.py index b339a0de..2f01c969 100644 --- a/lightrag/llm/openai.py +++ b/lightrag/llm/openai.py @@ -100,10 +100,10 @@ def create_openai_async_client( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10), retry=( - retry_if_exception_type(RateLimitError) | - retry_if_exception_type(APIConnectionError) | - retry_if_exception_type(APITimeoutError) | - retry_if_exception_type(InvalidResponseError) + retry_if_exception_type(RateLimitError) + | retry_if_exception_type(APIConnectionError) + | retry_if_exception_type(APITimeoutError) + | retry_if_exception_type(InvalidResponseError) ), ) async def openai_complete_if_cache( @@ -230,21 +230,33 @@ async def openai_complete_if_cache( except Exception as e: logger.error(f"Error in stream response: {str(e)}") # Try to clean up resources if possible - if iteration_started and hasattr(response, "aclose") and callable(getattr(response, "aclose", None)): + if ( + iteration_started + and hasattr(response, "aclose") + and callable(getattr(response, "aclose", None)) + ): try: await response.aclose() logger.debug("Successfully closed stream response after error") except Exception as close_error: - logger.warning(f"Failed to close stream response: {close_error}") + logger.warning( + f"Failed to close stream response: {close_error}" + ) raise finally: # Ensure resources are released even if no exception occurs - if iteration_started and hasattr(response, "aclose") and callable(getattr(response, "aclose", None)): + if ( + iteration_started + and hasattr(response, "aclose") + and callable(getattr(response, "aclose", None)) + ): try: await response.aclose() logger.debug("Successfully closed stream response") except Exception as close_error: - logger.warning(f"Failed to close stream response in finally block: {close_error}") + logger.warning( + f"Failed to close stream response in finally block: {close_error}" + ) return inner() @@ -373,9 +385,9 @@ async def nvidia_openai_complete( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=60), retry=( - retry_if_exception_type(RateLimitError) | - retry_if_exception_type(APIConnectionError) | - retry_if_exception_type(APITimeoutError) + retry_if_exception_type(RateLimitError) + | retry_if_exception_type(APIConnectionError) + | retry_if_exception_type(APITimeoutError) ), ) async def openai_embed(