Merge pull request #1492 from danielaskdd/fix-openai-retry
fix: correct retry_if_exception_type usage and improve async iterator…
This commit is contained in:
@@ -99,8 +99,11 @@ def create_openai_async_client(
|
|||||||
@retry(
|
@retry(
|
||||||
stop=stop_after_attempt(3),
|
stop=stop_after_attempt(3),
|
||||||
wait=wait_exponential(multiplier=1, min=4, max=10),
|
wait=wait_exponential(multiplier=1, min=4, max=10),
|
||||||
retry=retry_if_exception_type(
|
retry=(
|
||||||
(RateLimitError, APIConnectionError, APITimeoutError, InvalidResponseError)
|
retry_if_exception_type(RateLimitError)
|
||||||
|
| retry_if_exception_type(APIConnectionError)
|
||||||
|
| retry_if_exception_type(APITimeoutError)
|
||||||
|
| retry_if_exception_type(InvalidResponseError)
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
async def openai_complete_if_cache(
|
async def openai_complete_if_cache(
|
||||||
@@ -200,7 +203,10 @@ async def openai_complete_if_cache(
|
|||||||
if hasattr(response, "__aiter__"):
|
if hasattr(response, "__aiter__"):
|
||||||
|
|
||||||
async def inner():
|
async def inner():
|
||||||
|
# Track if we've started iterating
|
||||||
|
iteration_started = False
|
||||||
try:
|
try:
|
||||||
|
iteration_started = True
|
||||||
async for chunk in response:
|
async for chunk in response:
|
||||||
# Check if choices exists and is not empty
|
# Check if choices exists and is not empty
|
||||||
if not hasattr(chunk, "choices") or not chunk.choices:
|
if not hasattr(chunk, "choices") or not chunk.choices:
|
||||||
@@ -223,7 +229,34 @@ async def openai_complete_if_cache(
|
|||||||
yield content
|
yield content
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error in stream response: {str(e)}")
|
logger.error(f"Error in stream response: {str(e)}")
|
||||||
|
# Try to clean up resources if possible
|
||||||
|
if (
|
||||||
|
iteration_started
|
||||||
|
and hasattr(response, "aclose")
|
||||||
|
and callable(getattr(response, "aclose", None))
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
await response.aclose()
|
||||||
|
logger.debug("Successfully closed stream response after error")
|
||||||
|
except Exception as close_error:
|
||||||
|
logger.warning(
|
||||||
|
f"Failed to close stream response: {close_error}"
|
||||||
|
)
|
||||||
raise
|
raise
|
||||||
|
finally:
|
||||||
|
# Ensure resources are released even if no exception occurs
|
||||||
|
if (
|
||||||
|
iteration_started
|
||||||
|
and hasattr(response, "aclose")
|
||||||
|
and callable(getattr(response, "aclose", None))
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
await response.aclose()
|
||||||
|
logger.debug("Successfully closed stream response")
|
||||||
|
except Exception as close_error:
|
||||||
|
logger.warning(
|
||||||
|
f"Failed to close stream response in finally block: {close_error}"
|
||||||
|
)
|
||||||
|
|
||||||
return inner()
|
return inner()
|
||||||
|
|
||||||
@@ -351,8 +384,10 @@ async def nvidia_openai_complete(
|
|||||||
@retry(
|
@retry(
|
||||||
stop=stop_after_attempt(3),
|
stop=stop_after_attempt(3),
|
||||||
wait=wait_exponential(multiplier=1, min=4, max=60),
|
wait=wait_exponential(multiplier=1, min=4, max=60),
|
||||||
retry=retry_if_exception_type(
|
retry=(
|
||||||
(RateLimitError, APIConnectionError, APITimeoutError)
|
retry_if_exception_type(RateLimitError)
|
||||||
|
| retry_if_exception_type(APIConnectionError)
|
||||||
|
| retry_if_exception_type(APITimeoutError)
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
async def openai_embed(
|
async def openai_embed(
|
||||||
|
Reference in New Issue
Block a user