Fix multiprocess dict creation logic, add process safety locks for namespace creation.

This commit is contained in:
yangdx
2025-02-27 19:03:53 +08:00
parent 92ecb0da97
commit 946095ef80
3 changed files with 50 additions and 48 deletions

View File

@@ -61,7 +61,6 @@ def on_starting(server):
print("Gunicorn initialization complete, forking workers...") print("Gunicorn initialization complete, forking workers...")
print("=" * 80) print("=" * 80)
def on_exit(server): def on_exit(server):
""" """
Executed when Gunicorn is shutting down. Executed when Gunicorn is shutting down.

View File

@@ -50,7 +50,7 @@ def initialize_share_data(workers: int = 1):
# Check if already initialized # Check if already initialized
if _initialized: if _initialized:
direct_log(f"Process {os.getpid()} storage data already initialized (multiprocess={is_multiprocess})") direct_log(f"Process {os.getpid()} Shared-Data already initialized (multiprocess={is_multiprocess})")
return return
_manager = Manager() _manager = Manager()
@@ -63,14 +63,14 @@ def initialize_share_data(workers: int = 1):
_shared_dicts = _manager.dict() _shared_dicts = _manager.dict()
_share_objects = _manager.dict() _share_objects = _manager.dict()
_init_flags = _manager.dict() # Use shared dictionary to store initialization flags _init_flags = _manager.dict() # Use shared dictionary to store initialization flags
direct_log(f"Process {os.getpid()} storage data created for Multiple Process (workers={workers})") direct_log(f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})")
else: else:
is_multiprocess = False is_multiprocess = False
_global_lock = ThreadLock() _global_lock = ThreadLock()
_shared_dicts = {} _shared_dicts = {}
_share_objects = {} _share_objects = {}
_init_flags = {} _init_flags = {}
direct_log(f"Process {os.getpid()} storage data created for Single Process") direct_log(f"Process {os.getpid()} Shared-Data created for Single Process")
# Mark as initialized # Mark as initialized
_initialized = True _initialized = True
@@ -82,28 +82,16 @@ def try_initialize_namespace(namespace: str) -> bool:
""" """
global _init_flags, _manager global _init_flags, _manager
if is_multiprocess: if _init_flags is None:
if _init_flags is None: direct_log(f"Error: try to create nanmespace before Shared-Data is initialized, pid={os.getpid()}", level="ERROR")
raise RuntimeError( raise ValueError("Shared dictionaries not initialized")
"Shared storage not initialized. Call initialize_share_data() first."
)
else:
if _init_flags is None:
_init_flags = {}
logger.info(f"Process {os.getpid()} trying to initialize namespace {namespace}") if namespace not in _init_flags:
_init_flags[namespace] = True
with _global_lock: direct_log(f"Process {os.getpid()} ready to initialize namespace {namespace}")
if namespace not in _init_flags: return True
_init_flags[namespace] = True direct_log(f"Process {os.getpid()} namespace {namespace} already to initialized")
logger.info( return False
f"Process {os.getpid()} ready to initialize namespace {namespace}"
)
return True
logger.info(
f"Process {os.getpid()} found namespace {namespace} already initialized"
)
return False
def _get_global_lock() -> LockType: def _get_global_lock() -> LockType:
@@ -123,25 +111,36 @@ def get_scan_lock() -> LockType:
def get_namespace_object(namespace: str) -> Any: def get_namespace_object(namespace: str) -> Any:
"""Get an object for specific namespace""" """Get an object for specific namespace"""
if namespace not in _share_objects: if _share_objects is None:
lock = _get_global_lock() direct_log(f"Error: try to getnanmespace before Shared-Data is initialized, pid={os.getpid()}", level="ERROR")
with lock: raise ValueError("Shared dictionaries not initialized")
lock = _get_global_lock()
with lock:
if namespace not in _share_objects:
if namespace not in _share_objects: if namespace not in _share_objects:
if is_multiprocess: if is_multiprocess:
_share_objects[namespace] = _manager.Value("O", None) _share_objects[namespace] = _manager.Value("O", None)
else: else:
_share_objects[namespace] = None _share_objects[namespace] = None
direct_log(f"Created namespace({namespace}): type={type(_share_objects[namespace])}, pid={os.getpid()}")
return _share_objects[namespace] return _share_objects[namespace]
def get_namespace_data(namespace: str) -> Dict[str, Any]: def get_namespace_data(namespace: str) -> Dict[str, Any]:
"""get storage space for specific storage type(namespace)""" """get storage space for specific storage type(namespace)"""
if _shared_dicts is None:
direct_log(f"Error: try to getnanmespace before Shared-Data is initialized, pid={os.getpid()}", level="ERROR")
raise ValueError("Shared dictionaries not initialized")
if namespace not in _shared_dicts: lock = _get_global_lock()
lock = _get_global_lock() with lock:
with lock: if namespace not in _shared_dicts:
if namespace not in _shared_dicts: if is_multiprocess and _manager is not None:
_shared_dicts[namespace] = _manager.dict()
else:
_shared_dicts[namespace] = {} _shared_dicts[namespace] = {}
direct_log(f"Created namespace({namespace}): type={type(_shared_dicts[namespace])}, pid={os.getpid()}")
return _shared_dicts[namespace] return _shared_dicts[namespace]

View File

@@ -267,25 +267,29 @@ class LightRAG:
_storages_status: StoragesStatus = field(default=StoragesStatus.NOT_CREATED) _storages_status: StoragesStatus = field(default=StoragesStatus.NOT_CREATED)
def __post_init__(self): def __post_init__(self):
from lightrag.kg.shared_storage import initialize_share_data, try_initialize_namespace, get_namespace_data
initialize_share_data()
need_init = try_initialize_namespace("scan_progress")
scan_progress = get_namespace_data("scan_progress")
if need_init:
scan_progress.update(
{
"is_scanning": False,
"current_file": "",
"indexed_count": 0,
"total_files": 0,
"progress": 0,
}
)
os.makedirs(os.path.dirname(self.log_file_path), exist_ok=True) os.makedirs(os.path.dirname(self.log_file_path), exist_ok=True)
set_logger(self.log_file_path, self.log_level) set_logger(self.log_file_path, self.log_level)
logger.info(f"Logger initialized for working directory: {self.working_dir}") logger.info(f"Logger initialized for working directory: {self.working_dir}")
from lightrag.kg.shared_storage import initialize_share_data, try_initialize_namespace, get_namespace_data
initialize_share_data()
need_init = try_initialize_namespace("scan_progress")
scan_progress = get_namespace_data("scan_progress")
logger.info(f"scan_progress type after init: {type(scan_progress)}")
scan_progress.update(
{
"is_scanning": False,
"current_file": "",
"indexed_count": 0,
"total_files": 0,
"progress": 0,
}
)
scan_progress = get_namespace_data("scan_progress")
logger.info(f"scan_progress type after update: {type(scan_progress)}")
logger.info(f"Scan_progres value after update: {scan_progress}")
if not os.path.exists(self.working_dir): if not os.path.exists(self.working_dir):
logger.info(f"Creating working directory {self.working_dir}") logger.info(f"Creating working directory {self.working_dir}")
os.makedirs(self.working_dir) os.makedirs(self.working_dir)