diff --git a/gunicorn_config.py b/gunicorn_config.py index 8c1b22bf..f4c9178e 100644 --- a/gunicorn_config.py +++ b/gunicorn_config.py @@ -61,7 +61,6 @@ def on_starting(server): print("Gunicorn initialization complete, forking workers...") print("=" * 80) - def on_exit(server): """ Executed when Gunicorn is shutting down. diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index b4bd5613..73ffb306 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -50,7 +50,7 @@ def initialize_share_data(workers: int = 1): # Check if already initialized if _initialized: - direct_log(f"Process {os.getpid()} storage data already initialized (multiprocess={is_multiprocess})") + direct_log(f"Process {os.getpid()} Shared-Data already initialized (multiprocess={is_multiprocess})") return _manager = Manager() @@ -63,14 +63,14 @@ def initialize_share_data(workers: int = 1): _shared_dicts = _manager.dict() _share_objects = _manager.dict() _init_flags = _manager.dict() # Use shared dictionary to store initialization flags - direct_log(f"Process {os.getpid()} storage data created for Multiple Process (workers={workers})") + direct_log(f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})") else: is_multiprocess = False _global_lock = ThreadLock() _shared_dicts = {} _share_objects = {} _init_flags = {} - direct_log(f"Process {os.getpid()} storage data created for Single Process") + direct_log(f"Process {os.getpid()} Shared-Data created for Single Process") # Mark as initialized _initialized = True @@ -82,28 +82,16 @@ def try_initialize_namespace(namespace: str) -> bool: """ global _init_flags, _manager - if is_multiprocess: - if _init_flags is None: - raise RuntimeError( - "Shared storage not initialized. Call initialize_share_data() first." - ) - else: - if _init_flags is None: - _init_flags = {} + if _init_flags is None: + direct_log(f"Error: try to create nanmespace before Shared-Data is initialized, pid={os.getpid()}", level="ERROR") + raise ValueError("Shared dictionaries not initialized") - logger.info(f"Process {os.getpid()} trying to initialize namespace {namespace}") - - with _global_lock: - if namespace not in _init_flags: - _init_flags[namespace] = True - logger.info( - f"Process {os.getpid()} ready to initialize namespace {namespace}" - ) - return True - logger.info( - f"Process {os.getpid()} found namespace {namespace} already initialized" - ) - return False + if namespace not in _init_flags: + _init_flags[namespace] = True + direct_log(f"Process {os.getpid()} ready to initialize namespace {namespace}") + return True + direct_log(f"Process {os.getpid()} namespace {namespace} already to initialized") + return False def _get_global_lock() -> LockType: @@ -123,26 +111,37 @@ def get_scan_lock() -> LockType: def get_namespace_object(namespace: str) -> Any: """Get an object for specific namespace""" - if namespace not in _share_objects: - lock = _get_global_lock() - with lock: + if _share_objects is None: + direct_log(f"Error: try to getnanmespace before Shared-Data is initialized, pid={os.getpid()}", level="ERROR") + raise ValueError("Shared dictionaries not initialized") + + lock = _get_global_lock() + with lock: + if namespace not in _share_objects: if namespace not in _share_objects: if is_multiprocess: _share_objects[namespace] = _manager.Value("O", None) else: _share_objects[namespace] = None + direct_log(f"Created namespace({namespace}): type={type(_share_objects[namespace])}, pid={os.getpid()}") return _share_objects[namespace] def get_namespace_data(namespace: str) -> Dict[str, Any]: """get storage space for specific storage type(namespace)""" + if _shared_dicts is None: + direct_log(f"Error: try to getnanmespace before Shared-Data is initialized, pid={os.getpid()}", level="ERROR") + raise ValueError("Shared dictionaries not initialized") - if namespace not in _shared_dicts: - lock = _get_global_lock() - with lock: - if namespace not in _shared_dicts: + lock = _get_global_lock() + with lock: + if namespace not in _shared_dicts: + if is_multiprocess and _manager is not None: + _shared_dicts[namespace] = _manager.dict() + else: _shared_dicts[namespace] = {} - + direct_log(f"Created namespace({namespace}): type={type(_shared_dicts[namespace])}, pid={os.getpid()}") + return _shared_dicts[namespace] diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index ec0accc3..924fbae3 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -267,25 +267,29 @@ class LightRAG: _storages_status: StoragesStatus = field(default=StoragesStatus.NOT_CREATED) def __post_init__(self): - from lightrag.kg.shared_storage import initialize_share_data, try_initialize_namespace, get_namespace_data - initialize_share_data() - need_init = try_initialize_namespace("scan_progress") - scan_progress = get_namespace_data("scan_progress") - if need_init: - scan_progress.update( - { - "is_scanning": False, - "current_file": "", - "indexed_count": 0, - "total_files": 0, - "progress": 0, - } - ) - os.makedirs(os.path.dirname(self.log_file_path), exist_ok=True) set_logger(self.log_file_path, self.log_level) logger.info(f"Logger initialized for working directory: {self.working_dir}") + from lightrag.kg.shared_storage import initialize_share_data, try_initialize_namespace, get_namespace_data + initialize_share_data() + + need_init = try_initialize_namespace("scan_progress") + scan_progress = get_namespace_data("scan_progress") + logger.info(f"scan_progress type after init: {type(scan_progress)}") + scan_progress.update( + { + "is_scanning": False, + "current_file": "", + "indexed_count": 0, + "total_files": 0, + "progress": 0, + } + ) + scan_progress = get_namespace_data("scan_progress") + logger.info(f"scan_progress type after update: {type(scan_progress)}") + logger.info(f"Scan_progres value after update: {scan_progress}") + if not os.path.exists(self.working_dir): logger.info(f"Creating working directory {self.working_dir}") os.makedirs(self.working_dir)