fix: correct retry_if_exception_type usage and improve async iterator resource management
- Corrects the syntax of retry_if_exception_type decorators to ensure proper exception handling and retry behavior - Implements proper resource cleanup for async iterators to prevent memory leaks and potential SIGSEGV errors
This commit is contained in:
@@ -99,8 +99,11 @@ def create_openai_async_client(
|
||||
@retry(
|
||||
stop=stop_after_attempt(3),
|
||||
wait=wait_exponential(multiplier=1, min=4, max=10),
|
||||
retry=retry_if_exception_type(
|
||||
(RateLimitError, APIConnectionError, APITimeoutError, InvalidResponseError)
|
||||
retry=(
|
||||
retry_if_exception_type(RateLimitError) |
|
||||
retry_if_exception_type(APIConnectionError) |
|
||||
retry_if_exception_type(APITimeoutError) |
|
||||
retry_if_exception_type(InvalidResponseError)
|
||||
),
|
||||
)
|
||||
async def openai_complete_if_cache(
|
||||
@@ -200,7 +203,10 @@ async def openai_complete_if_cache(
|
||||
if hasattr(response, "__aiter__"):
|
||||
|
||||
async def inner():
|
||||
# Track if we've started iterating
|
||||
iteration_started = False
|
||||
try:
|
||||
iteration_started = True
|
||||
async for chunk in response:
|
||||
# Check if choices exists and is not empty
|
||||
if not hasattr(chunk, "choices") or not chunk.choices:
|
||||
@@ -223,7 +229,22 @@ async def openai_complete_if_cache(
|
||||
yield content
|
||||
except Exception as e:
|
||||
logger.error(f"Error in stream response: {str(e)}")
|
||||
# Try to clean up resources if possible
|
||||
if iteration_started and hasattr(response, "aclose") and callable(getattr(response, "aclose", None)):
|
||||
try:
|
||||
await response.aclose()
|
||||
logger.debug("Successfully closed stream response after error")
|
||||
except Exception as close_error:
|
||||
logger.warning(f"Failed to close stream response: {close_error}")
|
||||
raise
|
||||
finally:
|
||||
# Ensure resources are released even if no exception occurs
|
||||
if iteration_started and hasattr(response, "aclose") and callable(getattr(response, "aclose", None)):
|
||||
try:
|
||||
await response.aclose()
|
||||
logger.debug("Successfully closed stream response")
|
||||
except Exception as close_error:
|
||||
logger.warning(f"Failed to close stream response in finally block: {close_error}")
|
||||
|
||||
return inner()
|
||||
|
||||
@@ -351,8 +372,10 @@ async def nvidia_openai_complete(
|
||||
@retry(
|
||||
stop=stop_after_attempt(3),
|
||||
wait=wait_exponential(multiplier=1, min=4, max=60),
|
||||
retry=retry_if_exception_type(
|
||||
(RateLimitError, APIConnectionError, APITimeoutError)
|
||||
retry=(
|
||||
retry_if_exception_type(RateLimitError) |
|
||||
retry_if_exception_type(APIConnectionError) |
|
||||
retry_if_exception_type(APITimeoutError)
|
||||
),
|
||||
)
|
||||
async def openai_embed(
|
||||
|
Reference in New Issue
Block a user