diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index ca0958ee..f5f9f8ea 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -138,17 +138,17 @@ def create_app(args): # Import necessary functions from shared_storage from lightrag.kg.shared_storage import ( get_namespace_data, - get_storage_lock, - initialize_pipeline_namespace, + get_pipeline_status_lock, + initialize_pipeline_status, ) - await initialize_pipeline_namespace() + await initialize_pipeline_status() # Auto scan documents if enabled if args.auto_scan_at_startup: # Check if a task is already running (with lock protection) pipeline_status = await get_namespace_data("pipeline_status") should_start_task = False - async with get_storage_lock(): + async with get_pipeline_status_lock(): if not pipeline_status.get("busy", False): should_start_task = True # Only start the task if no other task is running diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 940d0e7b..237ed302 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -16,6 +16,22 @@ def direct_log(message, level="INFO"): T = TypeVar('T') +LockType = Union[ProcessLock, asyncio.Lock] + +is_multiprocess = None +_workers = None +_manager = None +_initialized = None + +# shared data for storage across processes +_shared_dicts: Optional[Dict[str, Any]] = None +_init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized +_update_flags: Optional[Dict[str, bool]] = None # namespace -> updated + +# locks for mutex access +_storage_lock: Optional[LockType] = None +_internal_lock: Optional[LockType] = None +_pipeline_status_lock: Optional[LockType] = None class UnifiedLock(Generic[T]): """Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock""" @@ -39,30 +55,37 @@ class UnifiedLock(Generic[T]): def __enter__(self) -> 'UnifiedLock[T]': """For backward compatibility""" if self._is_async: - raise RuntimeError("Use 'async with' for asyncio.Lock") + raise RuntimeError("Use 'async with' for shared_storage lock") self._lock.acquire() return self def __exit__(self, exc_type, exc_val, exc_tb): """For backward compatibility""" if self._is_async: - raise RuntimeError("Use 'async with' for asyncio.Lock") + raise RuntimeError("Use 'async with' for shared_storage lock") self._lock.release() -LockType = Union[ProcessLock, asyncio.Lock] +def get_internal_lock() -> UnifiedLock: + """return unified storage lock for data consistency""" + return UnifiedLock( + lock=_internal_lock, + is_async=not is_multiprocess + ) -is_multiprocess = None -_workers = None -_manager = None -_initialized = None -_global_lock: Optional[LockType] = None - -# shared data for storage across processes -_shared_dicts: Optional[Dict[str, Any]] = None -_init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized -_update_flags: Optional[Dict[str, bool]] = None # namespace -> updated +def get_storage_lock() -> UnifiedLock: + """return unified storage lock for data consistency""" + return UnifiedLock( + lock=_storage_lock, + is_async=not is_multiprocess + ) +def get_pipeline_status_lock() -> UnifiedLock: + """return unified storage lock for data consistency""" + return UnifiedLock( + lock=_pipeline_status_lock, + is_async=not is_multiprocess + ) def initialize_share_data(workers: int = 1): """ @@ -87,7 +110,9 @@ def initialize_share_data(workers: int = 1): _workers, \ is_multiprocess, \ is_multiprocess, \ - _global_lock, \ + _storage_lock, \ + _internal_lock, \ + _pipeline_status_lock, \ _shared_dicts, \ _init_flags, \ _initialized, \ @@ -105,7 +130,9 @@ def initialize_share_data(workers: int = 1): if workers > 1: is_multiprocess = True - _global_lock = _manager.Lock() + _internal_lock = _manager.Lock() + _storage_lock = _manager.Lock() + _pipeline_status_lock = _manager.Lock() _shared_dicts = _manager.dict() _init_flags = _manager.dict() _update_flags = _manager.dict() @@ -114,7 +141,9 @@ def initialize_share_data(workers: int = 1): ) else: is_multiprocess = False - _global_lock = asyncio.Lock() + _internal_lock = asyncio.Lock() + _storage_lock = asyncio.Lock() + _pipeline_status_lock = asyncio.Lock() _shared_dicts = {} _init_flags = {} _update_flags = {} @@ -124,13 +153,13 @@ def initialize_share_data(workers: int = 1): _initialized = True -async def initialize_pipeline_namespace(): +async def initialize_pipeline_status(): """ Initialize pipeline namespace with default values. """ pipeline_namespace = await get_namespace_data("pipeline_status") - async with get_storage_lock(): + async with get_internal_lock(): # Check if already initialized by checking for required fields if "busy" in pipeline_namespace: return @@ -160,7 +189,7 @@ async def get_update_flag(namespace: str): if _update_flags is None: raise ValueError("Try to create namespace before Shared-Data is initialized") - async with get_storage_lock(): + async with get_internal_lock(): if namespace not in _update_flags: if is_multiprocess and _manager is not None: _update_flags[namespace] = _manager.list() @@ -182,7 +211,7 @@ async def set_all_update_flags(namespace: str): if _update_flags is None: raise ValueError("Try to create namespace before Shared-Data is initialized") - async with get_storage_lock(): + async with get_internal_lock(): if namespace not in _update_flags: raise ValueError(f"Namespace {namespace} not found in update flags") # Update flags for both modes @@ -215,14 +244,6 @@ def try_initialize_namespace(namespace: str) -> bool: return False -def get_storage_lock() -> UnifiedLock: - """return unified storage lock for data consistency""" - return UnifiedLock( - lock=_global_lock, - is_async=not is_multiprocess - ) - - async def get_namespace_data(namespace: str) -> Dict[str, Any]: """get storage space for specific storage type(namespace)""" if _shared_dicts is None: @@ -232,7 +253,7 @@ async def get_namespace_data(namespace: str) -> Dict[str, Any]: ) raise ValueError("Shared dictionaries not initialized") - async with get_storage_lock(): + async with get_internal_lock(): if namespace not in _shared_dicts: if is_multiprocess and _manager is not None: _shared_dicts[namespace] = _manager.dict() diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 4b85a3b7..e7420a35 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -669,15 +669,15 @@ class LightRAG: 3. Process each chunk for entity and relation extraction 4. Update the document status """ - from lightrag.kg.shared_storage import get_namespace_data, get_storage_lock + from lightrag.kg.shared_storage import get_namespace_data, get_pipeline_status_lock # Get pipeline status shared data and lock pipeline_status = await get_namespace_data("pipeline_status") - storage_lock = get_storage_lock() + pipeline_status_lock = get_pipeline_status_lock() # Check if another process is already processing the queue process_documents = False - async with storage_lock: + async with pipeline_status_lock: # Ensure only one worker is processing documents if not pipeline_status.get("busy", False): # Cleaning history_messages without breaking it as a shared list object @@ -851,7 +851,7 @@ class LightRAG: # Check if there's a pending request to process more documents (with lock) has_pending_request = False - async with storage_lock: + async with pipeline_status_lock: has_pending_request = pipeline_status.get("request_pending", False) if has_pending_request: # Clear the request flag before checking for more documents @@ -869,7 +869,7 @@ class LightRAG: log_message = "Document processing pipeline completed" logger.info(log_message) # Always reset busy status when done or if an exception occurs (with lock) - async with storage_lock: + async with pipeline_status_lock: pipeline_status["busy"] = False pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message)