Fix linting
This commit is contained in:
@@ -271,18 +271,21 @@ def compute_mdhash_id(content: str, prefix: str = "") -> str:
|
|||||||
# Custom exception class
|
# Custom exception class
|
||||||
class QueueFullError(Exception):
|
class QueueFullError(Exception):
|
||||||
"""Raised when the queue is full and the wait times out"""
|
"""Raised when the queue is full and the wait times out"""
|
||||||
|
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
||||||
"""
|
"""
|
||||||
Enhanced priority-limited asynchronous function call decorator
|
Enhanced priority-limited asynchronous function call decorator
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
max_size: Maximum number of concurrent calls
|
max_size: Maximum number of concurrent calls
|
||||||
max_queue_size: Maximum queue capacity to prevent memory overflow
|
max_queue_size: Maximum queue capacity to prevent memory overflow
|
||||||
Returns:
|
Returns:
|
||||||
Decorator function
|
Decorator function
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def final_decro(func):
|
def final_decro(func):
|
||||||
queue = asyncio.PriorityQueue(maxsize=max_queue_size)
|
queue = asyncio.PriorityQueue(maxsize=max_queue_size)
|
||||||
tasks = set()
|
tasks = set()
|
||||||
@@ -290,10 +293,10 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
|||||||
counter = 0
|
counter = 0
|
||||||
shutdown_event = asyncio.Event()
|
shutdown_event = asyncio.Event()
|
||||||
worker_health_check_task = None
|
worker_health_check_task = None
|
||||||
|
|
||||||
# Track active future objects for cleanup
|
# Track active future objects for cleanup
|
||||||
active_futures = weakref.WeakSet()
|
active_futures = weakref.WeakSet()
|
||||||
|
|
||||||
# Worker function to process tasks in the queue
|
# Worker function to process tasks in the queue
|
||||||
async def worker():
|
async def worker():
|
||||||
"""Worker that processes tasks in the priority queue"""
|
"""Worker that processes tasks in the priority queue"""
|
||||||
@@ -302,18 +305,22 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
|||||||
try:
|
try:
|
||||||
# Use timeout to get tasks, allowing periodic checking of shutdown signal
|
# Use timeout to get tasks, allowing periodic checking of shutdown signal
|
||||||
try:
|
try:
|
||||||
priority, count, future, args, kwargs = await asyncio.wait_for(
|
(
|
||||||
queue.get(), timeout=1.0
|
priority,
|
||||||
)
|
count,
|
||||||
|
future,
|
||||||
|
args,
|
||||||
|
kwargs,
|
||||||
|
) = await asyncio.wait_for(queue.get(), timeout=1.0)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
# Timeout is just to check shutdown signal, continue to next iteration
|
# Timeout is just to check shutdown signal, continue to next iteration
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# If future is cancelled, skip execution
|
# If future is cancelled, skip execution
|
||||||
if future.cancelled():
|
if future.cancelled():
|
||||||
queue.task_done()
|
queue.task_done()
|
||||||
continue
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Execute function
|
# Execute function
|
||||||
result = await func(*args, **kwargs)
|
result = await func(*args, **kwargs)
|
||||||
@@ -325,7 +332,9 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
|||||||
future.cancel()
|
future.cancel()
|
||||||
logger.debug("limit_async: Task cancelled during execution")
|
logger.debug("limit_async: Task cancelled during execution")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"limit_async: Error in decorated function: {str(e)}")
|
logger.error(
|
||||||
|
f"limit_async: Error in decorated function: {str(e)}"
|
||||||
|
)
|
||||||
if not future.done():
|
if not future.done():
|
||||||
future.set_exception(e)
|
future.set_exception(e)
|
||||||
finally:
|
finally:
|
||||||
@@ -336,22 +345,24 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
|||||||
await asyncio.sleep(0.1) # Prevent high CPU usage
|
await asyncio.sleep(0.1) # Prevent high CPU usage
|
||||||
finally:
|
finally:
|
||||||
logger.warning("limit_async: Worker exiting")
|
logger.warning("limit_async: Worker exiting")
|
||||||
|
|
||||||
async def health_check():
|
async def health_check():
|
||||||
"""Periodically check worker health status and recover"""
|
"""Periodically check worker health status and recover"""
|
||||||
try:
|
try:
|
||||||
while not shutdown_event.is_set():
|
while not shutdown_event.is_set():
|
||||||
await asyncio.sleep(5) # Check every 5 seconds
|
await asyncio.sleep(5) # Check every 5 seconds
|
||||||
|
|
||||||
async with lock:
|
async with lock:
|
||||||
# Directly remove completed tasks from the tasks set
|
# Directly remove completed tasks from the tasks set
|
||||||
tasks.difference_update({t for t in tasks if t.done()})
|
tasks.difference_update({t for t in tasks if t.done()})
|
||||||
|
|
||||||
# Create new workers if active tasks less than max_size for better performance
|
# Create new workers if active tasks less than max_size for better performance
|
||||||
active_tasks_count = len(tasks)
|
active_tasks_count = len(tasks)
|
||||||
workers_needed = max_size - active_tasks_count
|
workers_needed = max_size - active_tasks_count
|
||||||
if workers_needed > 0:
|
if workers_needed > 0:
|
||||||
logger.info(f"limit_async: Creating {workers_needed} new workers")
|
logger.info(
|
||||||
|
f"limit_async: Creating {workers_needed} new workers"
|
||||||
|
)
|
||||||
for _ in range(workers_needed):
|
for _ in range(workers_needed):
|
||||||
task = asyncio.create_task(worker())
|
task = asyncio.create_task(worker())
|
||||||
tasks.add(task)
|
tasks.add(task)
|
||||||
@@ -360,7 +371,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
|||||||
logger.error(f"limit_async: Error in health check: {str(e)}")
|
logger.error(f"limit_async: Error in health check: {str(e)}")
|
||||||
finally:
|
finally:
|
||||||
logger.warning("limit_async: Health check task exiting")
|
logger.warning("limit_async: Health check task exiting")
|
||||||
|
|
||||||
# Ensure worker tasks are started
|
# Ensure worker tasks are started
|
||||||
async def ensure_workers():
|
async def ensure_workers():
|
||||||
"""Ensure worker tasks and health check are started"""
|
"""Ensure worker tasks and health check are started"""
|
||||||
@@ -373,20 +384,25 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
|||||||
# Try to acquire the lock, wait up to 5 seconds
|
# Try to acquire the lock, wait up to 5 seconds
|
||||||
lock_acquired = await asyncio.wait_for(lock.acquire(), timeout=5.0)
|
lock_acquired = await asyncio.wait_for(lock.acquire(), timeout=5.0)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
logger.error("limit_async: Timeout acquiring lock in ensure_workers")
|
logger.error(
|
||||||
|
"limit_async: Timeout acquiring lock in ensure_workers"
|
||||||
|
)
|
||||||
# Even if acquiring the lock times out, continue trying to create workers
|
# Even if acquiring the lock times out, continue trying to create workers
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Start the health check task (if not already started)
|
# Start the health check task (if not already started)
|
||||||
if worker_health_check_task is None or worker_health_check_task.done():
|
if (
|
||||||
|
worker_health_check_task is None
|
||||||
|
or worker_health_check_task.done()
|
||||||
|
):
|
||||||
worker_health_check_task = asyncio.create_task(health_check())
|
worker_health_check_task = asyncio.create_task(health_check())
|
||||||
|
|
||||||
# Directly remove completed tasks from the tasks set
|
# Directly remove completed tasks from the tasks set
|
||||||
tasks.difference_update({t for t in tasks if t.done()})
|
tasks.difference_update({t for t in tasks if t.done()})
|
||||||
|
|
||||||
# Calculate the number of active tasks
|
# Calculate the number of active tasks
|
||||||
active_tasks_count = len(tasks)
|
active_tasks_count = len(tasks)
|
||||||
|
|
||||||
# If active tasks count is less than max_size, create new workers
|
# If active tasks count is less than max_size, create new workers
|
||||||
workers_needed = max_size - active_tasks_count
|
workers_needed = max_size - active_tasks_count
|
||||||
if workers_needed > 0:
|
if workers_needed > 0:
|
||||||
@@ -422,7 +438,9 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
|||||||
try:
|
try:
|
||||||
await asyncio.wait_for(queue.join(), timeout=5.0)
|
await asyncio.wait_for(queue.join(), timeout=5.0)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
logger.warning("limit_async: Timeout waiting for queue to empty during shutdown")
|
logger.warning(
|
||||||
|
"limit_async: Timeout waiting for queue to empty during shutdown"
|
||||||
|
)
|
||||||
|
|
||||||
# Cancel all worker tasks
|
# Cancel all worker tasks
|
||||||
for task in list(tasks):
|
for task in list(tasks):
|
||||||
@@ -444,7 +462,9 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
|||||||
logger.info("limit_async: Priority queue workers shutdown complete")
|
logger.info("limit_async: Priority queue workers shutdown complete")
|
||||||
|
|
||||||
@wraps(func)
|
@wraps(func)
|
||||||
async def wait_func(*args, _priority=10, _timeout=None, _queue_timeout=None, **kwargs):
|
async def wait_func(
|
||||||
|
*args, _priority=10, _timeout=None, _queue_timeout=None, **kwargs
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Execute the function with priority-based concurrency control
|
Execute the function with priority-based concurrency control
|
||||||
Args:
|
Args:
|
||||||
@@ -479,10 +499,12 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
|||||||
try:
|
try:
|
||||||
await asyncio.wait_for(
|
await asyncio.wait_for(
|
||||||
queue.put((_priority, current_count, future, args, kwargs)),
|
queue.put((_priority, current_count, future, args, kwargs)),
|
||||||
timeout=_queue_timeout
|
timeout=_queue_timeout,
|
||||||
)
|
)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
raise QueueFullError(f"Queue full, timeout after {_queue_timeout} seconds")
|
raise QueueFullError(
|
||||||
|
f"Queue full, timeout after {_queue_timeout} seconds"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
# No timeout, may wait indefinitely
|
# No timeout, may wait indefinitely
|
||||||
await queue.put((_priority, current_count, future, args, kwargs))
|
await queue.put((_priority, current_count, future, args, kwargs))
|
||||||
@@ -502,7 +524,9 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
|||||||
# Cancel the future
|
# Cancel the future
|
||||||
if not future.done():
|
if not future.done():
|
||||||
future.cancel()
|
future.cancel()
|
||||||
raise TimeoutError(f"limit_async: Task timed out after {_timeout} seconds")
|
raise TimeoutError(
|
||||||
|
f"limit_async: Task timed out after {_timeout} seconds"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
# Wait for the result without timeout
|
# Wait for the result without timeout
|
||||||
return await future
|
return await future
|
||||||
@@ -514,7 +538,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
|||||||
wait_func.shutdown = shutdown
|
wait_func.shutdown = shutdown
|
||||||
|
|
||||||
return wait_func
|
return wait_func
|
||||||
|
|
||||||
return final_decro
|
return final_decro
|
||||||
|
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user