From 20d65ae55491eff365f73b92ab9d79807101823f Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 21 Mar 2025 16:08:23 +0800 Subject: [PATCH] feat(shared_storage): prevent event loop blocking in multiprocess mode Add auxiliary async locks in multiprocess mode to prevent event loop blocking --- lightrag/kg/shared_storage.py | 87 ++++++++++++++++++++++++++++++++++- 1 file changed, 85 insertions(+), 2 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 736887a6..d5006d62 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -41,6 +41,9 @@ _pipeline_status_lock: Optional[LockType] = None _graph_db_lock: Optional[LockType] = None _data_init_lock: Optional[LockType] = None +# async locks for coroutine synchronization in multiprocess mode +_async_locks: Optional[Dict[str, asyncio.Lock]] = None + class UnifiedLock(Generic[T]): """Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock""" @@ -51,12 +54,14 @@ class UnifiedLock(Generic[T]): is_async: bool, name: str = "unnamed", enable_logging: bool = True, + async_lock: Optional[asyncio.Lock] = None, ): self._lock = lock self._is_async = is_async self._pid = os.getpid() # for debug only self._name = name # for debug only self._enable_logging = enable_logging # for debug only + self._async_lock = async_lock # auxiliary lock for coroutine synchronization async def __aenter__(self) -> "UnifiedLock[T]": try: @@ -64,16 +69,35 @@ class UnifiedLock(Generic[T]): f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (async={self._is_async})", enable_output=self._enable_logging, ) + + # If in multiprocess mode and async lock exists, acquire it first + if not self._is_async and self._async_lock is not None: + direct_log( + f"== Lock == Process {self._pid}: Acquiring async lock for '{self._name}'", + enable_output=self._enable_logging, + ) + await self._async_lock.acquire() + direct_log( + f"== Lock == Process {self._pid}: Async lock for '{self._name}' acquired", + enable_output=self._enable_logging, + ) + + # Then acquire the main lock if self._is_async: await self._lock.acquire() else: self._lock.acquire() + direct_log( f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (async={self._is_async})", enable_output=self._enable_logging, ) return self except Exception as e: + # If main lock acquisition fails, release the async lock if it was acquired + if not self._is_async and self._async_lock is not None and self._async_lock.locked(): + self._async_lock.release() + direct_log( f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}", level="ERROR", @@ -82,15 +106,29 @@ class UnifiedLock(Generic[T]): raise async def __aexit__(self, exc_type, exc_val, exc_tb): + main_lock_released = False try: direct_log( f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (async={self._is_async})", enable_output=self._enable_logging, ) + + # Release main lock first if self._is_async: self._lock.release() else: self._lock.release() + + main_lock_released = True + + # Then release async lock if in multiprocess mode + if not self._is_async and self._async_lock is not None: + direct_log( + f"== Lock == Process {self._pid}: Releasing async lock for '{self._name}'", + enable_output=self._enable_logging, + ) + self._async_lock.release() + direct_log( f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})", enable_output=self._enable_logging, @@ -101,6 +139,27 @@ class UnifiedLock(Generic[T]): level="ERROR", enable_output=self._enable_logging, ) + + # If main lock release failed but async lock hasn't been released, try to release it + if not main_lock_released and not self._is_async and self._async_lock is not None: + try: + direct_log( + f"== Lock == Process {self._pid}: Attempting to release async lock after main lock failure", + level="WARNING", + enable_output=self._enable_logging, + ) + self._async_lock.release() + direct_log( + f"== Lock == Process {self._pid}: Successfully released async lock after main lock failure", + enable_output=self._enable_logging, + ) + except Exception as inner_e: + direct_log( + f"== Lock == Process {self._pid}: Failed to release async lock after main lock failure: {inner_e}", + level="ERROR", + enable_output=self._enable_logging, + ) + raise def __enter__(self) -> "UnifiedLock[T]": @@ -151,51 +210,61 @@ class UnifiedLock(Generic[T]): def get_internal_lock(enable_logging: bool = False) -> UnifiedLock: """return unified storage lock for data consistency""" + async_lock = _async_locks.get("internal_lock") if is_multiprocess else None return UnifiedLock( lock=_internal_lock, is_async=not is_multiprocess, name="internal_lock", enable_logging=enable_logging, + async_lock=async_lock, ) def get_storage_lock(enable_logging: bool = False) -> UnifiedLock: """return unified storage lock for data consistency""" + async_lock = _async_locks.get("storage_lock") if is_multiprocess else None return UnifiedLock( lock=_storage_lock, is_async=not is_multiprocess, name="storage_lock", enable_logging=enable_logging, + async_lock=async_lock, ) def get_pipeline_status_lock(enable_logging: bool = False) -> UnifiedLock: """return unified storage lock for data consistency""" + async_lock = _async_locks.get("pipeline_status_lock") if is_multiprocess else None return UnifiedLock( lock=_pipeline_status_lock, is_async=not is_multiprocess, name="pipeline_status_lock", enable_logging=enable_logging, + async_lock=async_lock, ) def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock: """return unified graph database lock for ensuring atomic operations""" + async_lock = _async_locks.get("graph_db_lock") if is_multiprocess else None return UnifiedLock( lock=_graph_db_lock, is_async=not is_multiprocess, name="graph_db_lock", enable_logging=enable_logging, + async_lock=async_lock, ) def get_data_init_lock(enable_logging: bool = False) -> UnifiedLock: """return unified data initialization lock for ensuring atomic data initialization""" + async_lock = _async_locks.get("data_init_lock") if is_multiprocess else None return UnifiedLock( lock=_data_init_lock, is_async=not is_multiprocess, name="data_init_lock", enable_logging=enable_logging, + async_lock=async_lock, ) @@ -229,7 +298,8 @@ def initialize_share_data(workers: int = 1): _shared_dicts, \ _init_flags, \ _initialized, \ - _update_flags + _update_flags, \ + _async_locks # Check if already initialized if _initialized: @@ -251,6 +321,16 @@ def initialize_share_data(workers: int = 1): _shared_dicts = _manager.dict() _init_flags = _manager.dict() _update_flags = _manager.dict() + + # Initialize async locks for multiprocess mode + _async_locks = { + "internal_lock": asyncio.Lock(), + "storage_lock": asyncio.Lock(), + "pipeline_status_lock": asyncio.Lock(), + "graph_db_lock": asyncio.Lock(), + "data_init_lock": asyncio.Lock(), + } + direct_log( f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})" ) @@ -264,6 +344,7 @@ def initialize_share_data(workers: int = 1): _shared_dicts = {} _init_flags = {} _update_flags = {} + _async_locks = None # No need for async locks in single process mode direct_log(f"Process {os.getpid()} Shared-Data created for Single Process") # Mark as initialized @@ -458,7 +539,8 @@ def finalize_share_data(): _shared_dicts, \ _init_flags, \ _initialized, \ - _update_flags + _update_flags, \ + _async_locks # Check if already initialized if not _initialized: @@ -523,5 +605,6 @@ def finalize_share_data(): _graph_db_lock = None _data_init_lock = None _update_flags = None + _async_locks = None direct_log(f"Process {os.getpid()} storage data finalization complete")