From 95c06f1bde92bb5ced2c2a2536b2c304a414db0a Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 8 Mar 2025 22:36:41 +0800 Subject: [PATCH] Add graph DB lock to shared storage system MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Introduced new graph_db_lock • Added detailed lock debugging output --- lightrag/kg/shared_storage.py | 94 +++++++++++++++++++++++++---------- 1 file changed, 69 insertions(+), 25 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index c8c154aa..67206971 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -7,12 +7,18 @@ from typing import Any, Dict, Optional, Union, TypeVar, Generic # Define a direct print function for critical logs that must be visible in all processes -def direct_log(message, level="INFO"): +def direct_log(message, level="INFO", enable_output: bool = True): """ Log a message directly to stderr to ensure visibility in all processes, including the Gunicorn master process. + + Args: + message: The message to log + level: Log level (default: "INFO") + enable_output: Whether to actually output the log (default: True) """ - print(f"{level}: {message}", file=sys.stderr, flush=True) + if enable_output: + print(f"{level}: {message}", file=sys.stderr, flush=True) T = TypeVar("T") @@ -32,55 +38,88 @@ _update_flags: Optional[Dict[str, bool]] = None # namespace -> updated _storage_lock: Optional[LockType] = None _internal_lock: Optional[LockType] = None _pipeline_status_lock: Optional[LockType] = None +_graph_db_lock: Optional[LockType] = None class UnifiedLock(Generic[T]): """Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock""" - def __init__(self, lock: Union[ProcessLock, asyncio.Lock], is_async: bool): + def __init__(self, lock: Union[ProcessLock, asyncio.Lock], is_async: bool, name: str = "unnamed", enable_logging: bool = True): self._lock = lock self._is_async = is_async + self._pid = os.getpid() # for debug only + self._name = name # for debug only + self._enable_logging = enable_logging # for debug only async def __aenter__(self) -> "UnifiedLock[T]": - if self._is_async: - await self._lock.acquire() - else: - self._lock.acquire() - return self + try: + direct_log(f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (async={self._is_async})", enable_output=self._enable_logging) + if self._is_async: + await self._lock.acquire() + else: + self._lock.acquire() + direct_log(f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (async={self._is_async})", enable_output=self._enable_logging) + return self + except Exception as e: + direct_log(f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}", level="ERROR", enable_output=self._enable_logging) + raise async def __aexit__(self, exc_type, exc_val, exc_tb): - if self._is_async: - self._lock.release() - else: - self._lock.release() + try: + direct_log(f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (async={self._is_async})", enable_output=self._enable_logging) + if self._is_async: + self._lock.release() + else: + self._lock.release() + direct_log(f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})", enable_output=self._enable_logging) + except Exception as e: + direct_log(f"== Lock == Process {self._pid}: Failed to release lock '{self._name}': {e}", level="ERROR", enable_output=self._enable_logging) + raise def __enter__(self) -> "UnifiedLock[T]": """For backward compatibility""" - if self._is_async: - raise RuntimeError("Use 'async with' for shared_storage lock") - self._lock.acquire() - return self + try: + if self._is_async: + raise RuntimeError("Use 'async with' for shared_storage lock") + direct_log(f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (sync)", enable_output=self._enable_logging) + self._lock.acquire() + direct_log(f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (sync)", enable_output=self._enable_logging) + return self + except Exception as e: + direct_log(f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}' (sync): {e}", level="ERROR", enable_output=self._enable_logging) + raise def __exit__(self, exc_type, exc_val, exc_tb): """For backward compatibility""" - if self._is_async: - raise RuntimeError("Use 'async with' for shared_storage lock") - self._lock.release() + try: + if self._is_async: + raise RuntimeError("Use 'async with' for shared_storage lock") + direct_log(f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (sync)", enable_output=self._enable_logging) + self._lock.release() + direct_log(f"== Lock == Process {self._pid}: Lock '{self._name}' released (sync)", enable_output=self._enable_logging) + except Exception as e: + direct_log(f"== Lock == Process {self._pid}: Failed to release lock '{self._name}' (sync): {e}", level="ERROR", enable_output=self._enable_logging) + raise -def get_internal_lock() -> UnifiedLock: +def get_internal_lock(enable_logging: bool = False) -> UnifiedLock: """return unified storage lock for data consistency""" - return UnifiedLock(lock=_internal_lock, is_async=not is_multiprocess) + return UnifiedLock(lock=_internal_lock, is_async=not is_multiprocess, name="internal_lock", enable_logging=enable_logging) -def get_storage_lock() -> UnifiedLock: +def get_storage_lock(enable_logging: bool = False) -> UnifiedLock: """return unified storage lock for data consistency""" - return UnifiedLock(lock=_storage_lock, is_async=not is_multiprocess) + return UnifiedLock(lock=_storage_lock, is_async=not is_multiprocess, name="storage_lock", enable_logging=enable_logging) -def get_pipeline_status_lock() -> UnifiedLock: +def get_pipeline_status_lock(enable_logging: bool = False) -> UnifiedLock: """return unified storage lock for data consistency""" - return UnifiedLock(lock=_pipeline_status_lock, is_async=not is_multiprocess) + return UnifiedLock(lock=_pipeline_status_lock, is_async=not is_multiprocess, name="pipeline_status_lock", enable_logging=enable_logging) + + +def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock: + """return unified graph database lock for ensuring atomic operations""" + return UnifiedLock(lock=_graph_db_lock, is_async=not is_multiprocess, name="graph_db_lock", enable_logging=enable_logging) def initialize_share_data(workers: int = 1): @@ -108,6 +147,7 @@ def initialize_share_data(workers: int = 1): _storage_lock, \ _internal_lock, \ _pipeline_status_lock, \ + _graph_db_lock, \ _shared_dicts, \ _init_flags, \ _initialized, \ @@ -128,6 +168,7 @@ def initialize_share_data(workers: int = 1): _internal_lock = _manager.Lock() _storage_lock = _manager.Lock() _pipeline_status_lock = _manager.Lock() + _graph_db_lock = _manager.Lock() _shared_dicts = _manager.dict() _init_flags = _manager.dict() _update_flags = _manager.dict() @@ -139,6 +180,7 @@ def initialize_share_data(workers: int = 1): _internal_lock = asyncio.Lock() _storage_lock = asyncio.Lock() _pipeline_status_lock = asyncio.Lock() + _graph_db_lock = asyncio.Lock() _shared_dicts = {} _init_flags = {} _update_flags = {} @@ -304,6 +346,7 @@ def finalize_share_data(): _storage_lock, \ _internal_lock, \ _pipeline_status_lock, \ + _graph_db_lock, \ _shared_dicts, \ _init_flags, \ _initialized, \ @@ -369,6 +412,7 @@ def finalize_share_data(): _storage_lock = None _internal_lock = None _pipeline_status_lock = None + _graph_db_lock = None _update_flags = None direct_log(f"Process {os.getpid()} storage data finalization complete")