Merge pull request #1488 from danielaskdd/improve-limit-async
Enhance the robustness of concurrency control and scheduling logic
This commit is contained in:
@@ -112,31 +112,26 @@ class UnifiedLock(Generic[T]):
|
|||||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||||
main_lock_released = False
|
main_lock_released = False
|
||||||
try:
|
try:
|
||||||
# direct_log(
|
|
||||||
# f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (async={self._is_async})",
|
|
||||||
# enable_output=self._enable_logging,
|
|
||||||
# )
|
|
||||||
|
|
||||||
# Release main lock first
|
# Release main lock first
|
||||||
if self._is_async:
|
if self._is_async:
|
||||||
self._lock.release()
|
self._lock.release()
|
||||||
else:
|
else:
|
||||||
self._lock.release()
|
self._lock.release()
|
||||||
|
|
||||||
main_lock_released = True
|
main_lock_released = True
|
||||||
|
|
||||||
# Then release async lock if in multiprocess mode
|
|
||||||
if not self._is_async and self._async_lock is not None:
|
|
||||||
# direct_log(
|
|
||||||
# f"== Lock == Process {self._pid}: Releasing async lock for '{self._name}'",
|
|
||||||
# enable_output=self._enable_logging,
|
|
||||||
# )
|
|
||||||
self._async_lock.release()
|
|
||||||
|
|
||||||
direct_log(
|
direct_log(
|
||||||
f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})",
|
f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})",
|
||||||
enable_output=self._enable_logging,
|
enable_output=self._enable_logging,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Then release async lock if in multiprocess mode
|
||||||
|
if not self._is_async and self._async_lock is not None:
|
||||||
|
self._async_lock.release()
|
||||||
|
direct_log(
|
||||||
|
f"== Lock == Process {self._pid}: Async lock '{self._name}' released",
|
||||||
|
enable_output=self._enable_logging,
|
||||||
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
direct_log(
|
direct_log(
|
||||||
f"== Lock == Process {self._pid}: Failed to release lock '{self._name}': {e}",
|
f"== Lock == Process {self._pid}: Failed to release lock '{self._name}': {e}",
|
||||||
|
@@ -297,6 +297,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
|||||||
|
|
||||||
# Track active future objects for cleanup
|
# Track active future objects for cleanup
|
||||||
active_futures = weakref.WeakSet()
|
active_futures = weakref.WeakSet()
|
||||||
|
reinit_count = 0 # Reinitialization counter to track system health
|
||||||
|
|
||||||
# Worker function to process tasks in the queue
|
# Worker function to process tasks in the queue
|
||||||
async def worker():
|
async def worker():
|
||||||
@@ -349,6 +350,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
|||||||
|
|
||||||
async def health_check():
|
async def health_check():
|
||||||
"""Periodically check worker health status and recover"""
|
"""Periodically check worker health status and recover"""
|
||||||
|
nonlocal initialized
|
||||||
try:
|
try:
|
||||||
while not shutdown_event.is_set():
|
while not shutdown_event.is_set():
|
||||||
await asyncio.sleep(5) # Check every 5 seconds
|
await asyncio.sleep(5) # Check every 5 seconds
|
||||||
@@ -378,6 +380,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
|||||||
logger.error(f"limit_async: Error in health check: {str(e)}")
|
logger.error(f"limit_async: Error in health check: {str(e)}")
|
||||||
finally:
|
finally:
|
||||||
logger.warning("limit_async: Health check task exiting")
|
logger.warning("limit_async: Health check task exiting")
|
||||||
|
initialized = False
|
||||||
|
|
||||||
async def ensure_workers():
|
async def ensure_workers():
|
||||||
"""Ensure worker threads and health check system are available
|
"""Ensure worker threads and health check system are available
|
||||||
@@ -386,7 +389,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
|||||||
If not, it performs a one-time initialization of all worker threads
|
If not, it performs a one-time initialization of all worker threads
|
||||||
and starts the health check system.
|
and starts the health check system.
|
||||||
"""
|
"""
|
||||||
nonlocal initialized, worker_health_check_task, tasks
|
nonlocal initialized, worker_health_check_task, tasks, reinit_count
|
||||||
|
|
||||||
if initialized:
|
if initialized:
|
||||||
return
|
return
|
||||||
@@ -395,10 +398,30 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
|||||||
if initialized:
|
if initialized:
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.info("limit_async: Initializing worker system")
|
# Increment reinitialization counter if this is not the first initialization
|
||||||
|
if reinit_count > 0:
|
||||||
|
reinit_count += 1
|
||||||
|
logger.warning(
|
||||||
|
f"limit_async: Reinitializing needed (count: {reinit_count})"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
reinit_count = 1 # First initialization
|
||||||
|
|
||||||
# Create initial worker tasks
|
# Check for completed tasks and remove them from the task set
|
||||||
for _ in range(max_size):
|
current_tasks = set(tasks)
|
||||||
|
done_tasks = {t for t in current_tasks if t.done()}
|
||||||
|
tasks.difference_update(done_tasks)
|
||||||
|
|
||||||
|
# Log active tasks count during reinitialization
|
||||||
|
active_tasks_count = len(tasks)
|
||||||
|
if active_tasks_count > 0 and reinit_count > 1:
|
||||||
|
logger.warning(
|
||||||
|
f"limit_async: {active_tasks_count} tasks still running during reinitialization"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create initial worker tasks, only adding the number needed
|
||||||
|
workers_needed = max_size - active_tasks_count
|
||||||
|
for _ in range(workers_needed):
|
||||||
task = asyncio.create_task(worker())
|
task = asyncio.create_task(worker())
|
||||||
tasks.add(task)
|
tasks.add(task)
|
||||||
task.add_done_callback(tasks.discard)
|
task.add_done_callback(tasks.discard)
|
||||||
@@ -407,7 +430,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
|||||||
worker_health_check_task = asyncio.create_task(health_check())
|
worker_health_check_task = asyncio.create_task(health_check())
|
||||||
|
|
||||||
initialized = True
|
initialized = True
|
||||||
logger.info("limit_async: Worker system initialized")
|
logger.info(f"limit_async: {workers_needed} new workers initialized")
|
||||||
|
|
||||||
async def shutdown():
|
async def shutdown():
|
||||||
"""Gracefully shut down all workers and the queue"""
|
"""Gracefully shut down all workers and the queue"""
|
||||||
@@ -476,7 +499,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
|||||||
|
|
||||||
nonlocal counter
|
nonlocal counter
|
||||||
async with initialization_lock:
|
async with initialization_lock:
|
||||||
current_count = counter
|
current_count = counter # Use local variable to avoid race conditions
|
||||||
counter += 1
|
counter += 1
|
||||||
|
|
||||||
# Try to put the task into the queue, supporting timeout
|
# Try to put the task into the queue, supporting timeout
|
||||||
@@ -485,6 +508,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
|||||||
# Use timeout to wait for queue space
|
# Use timeout to wait for queue space
|
||||||
try:
|
try:
|
||||||
await asyncio.wait_for(
|
await asyncio.wait_for(
|
||||||
|
# current_count is used to ensure FIFO order
|
||||||
queue.put((_priority, current_count, future, args, kwargs)),
|
queue.put((_priority, current_count, future, args, kwargs)),
|
||||||
timeout=_queue_timeout,
|
timeout=_queue_timeout,
|
||||||
)
|
)
|
||||||
@@ -494,6 +518,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
|
|||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# No timeout, may wait indefinitely
|
# No timeout, may wait indefinitely
|
||||||
|
# current_count is used to ensure FIFO order
|
||||||
await queue.put((_priority, current_count, future, args, kwargs))
|
await queue.put((_priority, current_count, future, args, kwargs))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Clean up the future
|
# Clean up the future
|
||||||
|
Reference in New Issue
Block a user