From e881bc070947957f92993f6a9ab13ff5c291127b Mon Sep 17 00:00:00 2001 From: yangdx Date: Thu, 27 Feb 2025 15:36:12 +0800 Subject: [PATCH] simplify process state management by removing redundant multiprocess flag --- lightrag/kg/shared_storage.py | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 8956d995..b4bd5613 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -18,7 +18,6 @@ LockType = Union[ProcessLock, ThreadLock] _manager = None _initialized = None -_is_multiprocess = None is_multiprocess = None # shared data for storage across processes @@ -47,21 +46,18 @@ def initialize_share_data(workers: int = 1): workers (int): Number of worker processes. If 1, single-process mode is used. If > 1, multi-process mode with shared memory is used. """ - global _manager, _is_multiprocess, is_multiprocess, _global_lock, _shared_dicts, _share_objects, _init_flags, _initialized + global _manager, is_multiprocess, is_multiprocess, _global_lock, _shared_dicts, _share_objects, _init_flags, _initialized # Check if already initialized - if _initialized and _initialized.value: - is_multiprocess = _is_multiprocess.value - direct_log(f"Process {os.getpid()} storage data already initialized (multiprocess={_is_multiprocess.value})") + if _initialized: + direct_log(f"Process {os.getpid()} storage data already initialized (multiprocess={is_multiprocess})") return _manager = Manager() - _initialized = _manager.Value("b", False) - _is_multiprocess = _manager.Value("b", False) # Force multi-process mode if workers > 1 if workers > 1: - _is_multiprocess.value = True + is_multiprocess = True _global_lock = _manager.Lock() # Create shared dictionaries with manager _shared_dicts = _manager.dict() @@ -69,7 +65,7 @@ def initialize_share_data(workers: int = 1): _init_flags = _manager.dict() # Use shared dictionary to store initialization flags direct_log(f"Process {os.getpid()} storage data created for Multiple Process (workers={workers})") else: - _is_multiprocess.value = False + is_multiprocess = False _global_lock = ThreadLock() _shared_dicts = {} _share_objects = {} @@ -77,8 +73,7 @@ def initialize_share_data(workers: int = 1): direct_log(f"Process {os.getpid()} storage data created for Single Process") # Mark as initialized - _initialized.value = True - is_multiprocess = _is_multiprocess.value + _initialized = True def try_initialize_namespace(namespace: str) -> bool: """ @@ -87,7 +82,7 @@ def try_initialize_namespace(namespace: str) -> bool: """ global _init_flags, _manager - if _is_multiprocess.value: + if is_multiprocess: if _init_flags is None: raise RuntimeError( "Shared storage not initialized. Call initialize_share_data() first." @@ -132,7 +127,7 @@ def get_namespace_object(namespace: str) -> Any: lock = _get_global_lock() with lock: if namespace not in _share_objects: - if _is_multiprocess.value: + if is_multiprocess: _share_objects[namespace] = _manager.Value("O", None) else: _share_objects[namespace] = None @@ -166,17 +161,17 @@ def finalize_share_data(): In multi-process mode, it shuts down the Manager and releases all shared objects. In single-process mode, it simply resets the global variables. """ - global _manager, _is_multiprocess, is_multiprocess, _global_lock, _shared_dicts, _share_objects, _init_flags, _initialized + global _manager, is_multiprocess, _global_lock, _shared_dicts, _share_objects, _init_flags, _initialized # Check if already initialized - if not (_initialized and _initialized.value): + if not _initialized: direct_log(f"Process {os.getpid()} storage data not initialized, nothing to finalize") return - direct_log(f"Process {os.getpid()} finalizing storage data (multiprocess={_is_multiprocess.value})") + direct_log(f"Process {os.getpid()} finalizing storage data (multiprocess={is_multiprocess})") # In multi-process mode, shut down the Manager - if _is_multiprocess.value and _manager is not None: + if is_multiprocess and _manager is not None: try: # Clear shared dictionaries first if _shared_dicts is not None: @@ -195,7 +190,6 @@ def finalize_share_data(): # Reset global variables _manager = None _initialized = None - _is_multiprocess = None is_multiprocess = None _shared_dicts = None _share_objects = None