From 5f3e210246e20a4b6ea4ee725d313d65f1ae3e4f Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 29 Apr 2025 13:32:05 +0800 Subject: [PATCH 1/3] Optimize log messages --- lightrag/kg/shared_storage.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 10e83e56..b5d56f5d 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -122,16 +122,20 @@ class UnifiedLock(Generic[T]): self._lock.release() else: self._lock.release() - main_lock_released = True + direct_log( + f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})", + enable_output=self._enable_logging, + ) + # Then release async lock if in multiprocess mode if not self._is_async and self._async_lock is not None: - # direct_log( - # f"== Lock == Process {self._pid}: Releasing async lock for '{self._name}'", - # enable_output=self._enable_logging, - # ) self._async_lock.release() + direct_log( + f"== Lock == Process {self._pid}: Async lock '{self._name}' released", + enable_output=self._enable_logging, + ) direct_log( f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})", From 81953e6d46afb342645ce619ac20ed9580e667a3 Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 29 Apr 2025 13:38:11 +0800 Subject: [PATCH 2/3] Enhance the robustness of concurrency control and scheduling logic --- lightrag/utils.py | 37 +++++++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/lightrag/utils.py b/lightrag/utils.py index 574e3ecd..7520da4b 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -297,6 +297,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): # Track active future objects for cleanup active_futures = weakref.WeakSet() + reinit_count = 0 # Reinitialization counter to track system health # Worker function to process tasks in the queue async def worker(): @@ -349,6 +350,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): async def health_check(): """Periodically check worker health status and recover""" + nonlocal initialized try: while not shutdown_event.is_set(): await asyncio.sleep(5) # Check every 5 seconds @@ -378,6 +380,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): logger.error(f"limit_async: Error in health check: {str(e)}") finally: logger.warning("limit_async: Health check task exiting") + initialized = False async def ensure_workers(): """Ensure worker threads and health check system are available @@ -386,7 +389,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): If not, it performs a one-time initialization of all worker threads and starts the health check system. """ - nonlocal initialized, worker_health_check_task, tasks + nonlocal initialized, worker_health_check_task, tasks, reinit_count if initialized: return @@ -395,10 +398,30 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): if initialized: return - logger.info("limit_async: Initializing worker system") + # Increment reinitialization counter if this is not the first initialization + if reinit_count > 0: + reinit_count += 1 + logger.warning( + f"limit_async: Reinitializing needed (count: {reinit_count})" + ) + else: + reinit_count = 1 # First initialization - # Create initial worker tasks - for _ in range(max_size): + # Check for completed tasks and remove them from the task set + current_tasks = set(tasks) + done_tasks = {t for t in current_tasks if t.done()} + tasks.difference_update(done_tasks) + + # Log active tasks count during reinitialization + active_tasks_count = len(tasks) + if active_tasks_count > 0 and reinit_count > 1: + logger.warning( + f"limit_async: {active_tasks_count} tasks still running during reinitialization" + ) + + # Create initial worker tasks, only adding the number needed + workers_needed = max_size - active_tasks_count + for _ in range(workers_needed): task = asyncio.create_task(worker()) tasks.add(task) task.add_done_callback(tasks.discard) @@ -407,7 +430,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): worker_health_check_task = asyncio.create_task(health_check()) initialized = True - logger.info("limit_async: Worker system initialized") + logger.info(f"limit_async: {workers_needed} new workers initialized") async def shutdown(): """Gracefully shut down all workers and the queue""" @@ -476,7 +499,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): nonlocal counter async with initialization_lock: - current_count = counter + current_count = counter # Use local variable to avoid race conditions counter += 1 # Try to put the task into the queue, supporting timeout @@ -485,6 +508,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): # Use timeout to wait for queue space try: await asyncio.wait_for( + # current_count is used to ensure FIFO order queue.put((_priority, current_count, future, args, kwargs)), timeout=_queue_timeout, ) @@ -494,6 +518,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): ) else: # No timeout, may wait indefinitely + # current_count is used to ensure FIFO order await queue.put((_priority, current_count, future, args, kwargs)) except Exception as e: # Clean up the future From 9f33ff2ecd62c1eca51acdb15a0b4995e03e1eaf Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 29 Apr 2025 13:45:06 +0800 Subject: [PATCH 3/3] Optimize log messages --- lightrag/kg/shared_storage.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index b5d56f5d..6f36f2c4 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -112,11 +112,6 @@ class UnifiedLock(Generic[T]): async def __aexit__(self, exc_type, exc_val, exc_tb): main_lock_released = False try: - # direct_log( - # f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (async={self._is_async})", - # enable_output=self._enable_logging, - # ) - # Release main lock first if self._is_async: self._lock.release() @@ -137,10 +132,6 @@ class UnifiedLock(Generic[T]): enable_output=self._enable_logging, ) - direct_log( - f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})", - enable_output=self._enable_logging, - ) except Exception as e: direct_log( f"== Lock == Process {self._pid}: Failed to release lock '{self._name}': {e}",