diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index acebafa7..c8c154aa 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -91,7 +91,7 @@ def initialize_share_data(workers: int = 1): master process before forking worker processes, allowing all workers to share the same initialized data. - In single-process mode, this function is called during LightRAG object initialization. + In single-process mode, this function is called in FASTAPI lifespan function. The function determines whether to use cross-process shared variables for data storage based on the number of workers. If workers=1, it uses thread locks and local dictionaries. @@ -105,7 +105,6 @@ def initialize_share_data(workers: int = 1): _manager, \ _workers, \ is_multiprocess, \ - is_multiprocess, \ _storage_lock, \ _internal_lock, \ _pipeline_status_lock, \ @@ -152,6 +151,7 @@ def initialize_share_data(workers: int = 1): async def initialize_pipeline_status(): """ Initialize pipeline namespace with default values. + This function is called during FASTAPI lifespan for each worker. """ pipeline_namespace = await get_namespace_data("pipeline_status") @@ -249,8 +249,8 @@ async def get_all_update_flags_status() -> Dict[str, list]: def try_initialize_namespace(namespace: str) -> bool: """ - Try to initialize a namespace. Returns True if the current process gets initialization permission. - Uses atomic operations on shared dictionaries to ensure only one process can successfully initialize. + Returns True if the current worker(process) gets initialization permission for loading data later. + The worker does not get the permission is prohibited to load data from files. """ global _init_flags, _manager @@ -270,10 +270,10 @@ def try_initialize_namespace(namespace: str) -> bool: async def get_namespace_data(namespace: str) -> Dict[str, Any]: - """get storage space for specific storage type(namespace)""" + """get the shared data reference for specific namespace""" if _shared_dicts is None: direct_log( - f"Error: try to getnanmespace before Shared-Data is initialized, pid={os.getpid()}", + f"Error: try to getnanmespace before it is initialized, pid={os.getpid()}", level="ERROR", ) raise ValueError("Shared dictionaries not initialized") @@ -301,10 +301,13 @@ def finalize_share_data(): global \ _manager, \ is_multiprocess, \ - _global_lock, \ + _storage_lock, \ + _internal_lock, \ + _pipeline_status_lock, \ _shared_dicts, \ _init_flags, \ - _initialized + _initialized, \ + _update_flags # Check if already initialized if not _initialized: @@ -320,13 +323,36 @@ def finalize_share_data(): # In multi-process mode, shut down the Manager if is_multiprocess and _manager is not None: try: - # Clear shared dictionaries first + # Clear shared resources before shutting down Manager if _shared_dicts is not None: + # Clear pipeline status history messages first if exists + try: + pipeline_status = _shared_dicts.get("pipeline_status", {}) + if "history_messages" in pipeline_status: + pipeline_status["history_messages"].clear() + except Exception: + pass # Ignore any errors during history messages cleanup _shared_dicts.clear() if _init_flags is not None: _init_flags.clear() + if _update_flags is not None: + # Clear each namespace's update flags list and Value objects + try: + for namespace in _update_flags: + flags_list = _update_flags[namespace] + if isinstance(flags_list, list): + # Clear Value objects in the list + for flag in flags_list: + if hasattr( + flag, "value" + ): # Check if it's a Value object + flag.value = False + flags_list.clear() + except Exception: + pass # Ignore any errors during update flags cleanup + _update_flags.clear() - # Shut down the Manager + # Shut down the Manager - this will automatically clean up all shared resources _manager.shutdown() direct_log(f"Process {os.getpid()} Manager shutdown complete") except Exception as e: @@ -340,6 +366,9 @@ def finalize_share_data(): is_multiprocess = None _shared_dicts = None _init_flags = None - _global_lock = None + _storage_lock = None + _internal_lock = None + _pipeline_status_lock = None + _update_flags = None direct_log(f"Process {os.getpid()} storage data finalization complete")