Enhance the robustness of concurrency control and scheduling logic

This commit is contained in:
yangdx
2025-04-29 13:38:11 +08:00
parent 5f3e210246
commit 81953e6d46

View File

@@ -297,6 +297,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
# Track active future objects for cleanup
active_futures = weakref.WeakSet()
reinit_count = 0 # Reinitialization counter to track system health
# Worker function to process tasks in the queue
async def worker():
@@ -349,6 +350,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
async def health_check():
"""Periodically check worker health status and recover"""
nonlocal initialized
try:
while not shutdown_event.is_set():
await asyncio.sleep(5) # Check every 5 seconds
@@ -378,6 +380,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
logger.error(f"limit_async: Error in health check: {str(e)}")
finally:
logger.warning("limit_async: Health check task exiting")
initialized = False
async def ensure_workers():
"""Ensure worker threads and health check system are available
@@ -386,7 +389,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
If not, it performs a one-time initialization of all worker threads
and starts the health check system.
"""
nonlocal initialized, worker_health_check_task, tasks
nonlocal initialized, worker_health_check_task, tasks, reinit_count
if initialized:
return
@@ -395,10 +398,30 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
if initialized:
return
logger.info("limit_async: Initializing worker system")
# Increment reinitialization counter if this is not the first initialization
if reinit_count > 0:
reinit_count += 1
logger.warning(
f"limit_async: Reinitializing needed (count: {reinit_count})"
)
else:
reinit_count = 1 # First initialization
# Create initial worker tasks
for _ in range(max_size):
# Check for completed tasks and remove them from the task set
current_tasks = set(tasks)
done_tasks = {t for t in current_tasks if t.done()}
tasks.difference_update(done_tasks)
# Log active tasks count during reinitialization
active_tasks_count = len(tasks)
if active_tasks_count > 0 and reinit_count > 1:
logger.warning(
f"limit_async: {active_tasks_count} tasks still running during reinitialization"
)
# Create initial worker tasks, only adding the number needed
workers_needed = max_size - active_tasks_count
for _ in range(workers_needed):
task = asyncio.create_task(worker())
tasks.add(task)
task.add_done_callback(tasks.discard)
@@ -407,7 +430,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
worker_health_check_task = asyncio.create_task(health_check())
initialized = True
logger.info("limit_async: Worker system initialized")
logger.info(f"limit_async: {workers_needed} new workers initialized")
async def shutdown():
"""Gracefully shut down all workers and the queue"""
@@ -476,7 +499,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
nonlocal counter
async with initialization_lock:
current_count = counter
current_count = counter # Use local variable to avoid race conditions
counter += 1
# Try to put the task into the queue, supporting timeout
@@ -485,6 +508,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
# Use timeout to wait for queue space
try:
await asyncio.wait_for(
# current_count is used to ensure FIFO order
queue.put((_priority, current_count, future, args, kwargs)),
timeout=_queue_timeout,
)
@@ -494,6 +518,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
)
else:
# No timeout, may wait indefinitely
# current_count is used to ensure FIFO order
await queue.put((_priority, current_count, future, args, kwargs))
except Exception as e:
# Clean up the future