diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index e05c04f6..67a4705a 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -15,6 +15,7 @@ from lightrag.utils import ( from .shared_storage import ( get_namespace_data, get_storage_lock, + get_data_init_lock, try_initialize_namespace, ) @@ -27,21 +28,22 @@ class JsonDocStatusStorage(DocStatusStorage): def __post_init__(self): working_dir = self.global_config["working_dir"] self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json") - self._storage_lock = get_storage_lock() self._data = None async def initialize(self): """Initialize storage data""" - # check need_init must before get_namespace_data - need_init = await try_initialize_namespace(self.namespace) + self._storage_lock = get_storage_lock() self._data = await get_namespace_data(self.namespace) - if need_init: - loaded_data = load_json(self._file_name) or {} - async with self._storage_lock: - self._data.update(loaded_data) - logger.info( - f"Process {os.getpid()} doc status load {self.namespace} with {len(loaded_data)} records" - ) + async with get_data_init_lock(): + # check need_init must before get_namespace_data + need_init = await try_initialize_namespace(self.namespace) + if need_init: + loaded_data = load_json(self._file_name) or {} + async with self._storage_lock: + self._data.update(loaded_data) + logger.info( + f"Process {os.getpid()} doc status load {self.namespace} with {len(loaded_data)} records" + ) async def filter_keys(self, keys: set[str]) -> set[str]: """Return keys that should be processed (not in storage or not successfully processed)""" diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index c0aa81b2..5070c0b4 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -13,6 +13,7 @@ from lightrag.utils import ( from .shared_storage import ( get_namespace_data, get_storage_lock, + get_data_init_lock, try_initialize_namespace, ) @@ -23,29 +24,30 @@ class JsonKVStorage(BaseKVStorage): def __post_init__(self): working_dir = self.global_config["working_dir"] self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json") - self._storage_lock = get_storage_lock() self._data = None async def initialize(self): """Initialize storage data""" - # check need_init must before get_namespace_data - need_init = await try_initialize_namespace(self.namespace) + self._storage_lock = get_storage_lock() self._data = await get_namespace_data(self.namespace) - if need_init: - loaded_data = load_json(self._file_name) or {} - async with self._storage_lock: - self._data.update(loaded_data) - - # Calculate data count based on namespace - if self.namespace.endswith("cache"): - # For cache namespaces, sum the cache entries across all cache types - data_count = sum(len(first_level_dict) for first_level_dict in loaded_data.values() - if isinstance(first_level_dict, dict)) - else: - # For non-cache namespaces, use the original count method - data_count = len(loaded_data) - - logger.info(f"Process {os.getpid()} KV load {self.namespace} with {data_count} records") + async with get_data_init_lock(): + # check need_init must before get_namespace_data + need_init = await try_initialize_namespace(self.namespace) + if need_init: + loaded_data = load_json(self._file_name) or {} + async with self._storage_lock: + self._data.update(loaded_data) + + # Calculate data count based on namespace + if self.namespace.endswith("cache"): + # For cache namespaces, sum the cache entries across all cache types + data_count = sum(len(first_level_dict) for first_level_dict in loaded_data.values() + if isinstance(first_level_dict, dict)) + else: + # For non-cache namespaces, use the original count method + data_count = len(loaded_data) + + logger.info(f"Process {os.getpid()} KV load {self.namespace} with {data_count} records") async def index_done_callback(self) -> None: async with self._storage_lock: diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 68747ff8..e3c25d34 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -39,6 +39,7 @@ _storage_lock: Optional[LockType] = None _internal_lock: Optional[LockType] = None _pipeline_status_lock: Optional[LockType] = None _graph_db_lock: Optional[LockType] = None +_data_init_lock: Optional[LockType] = None class UnifiedLock(Generic[T]): @@ -188,6 +189,16 @@ def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock: ) +def get_data_init_lock(enable_logging: bool = False) -> UnifiedLock: + """return unified data initialization lock for ensuring atomic data initialization""" + return UnifiedLock( + lock=_data_init_lock, + is_async=not is_multiprocess, + name="data_init_lock", + enable_logging=enable_logging, + ) + + def initialize_share_data(workers: int = 1): """ Initialize shared storage data for single or multi-process mode. @@ -214,6 +225,7 @@ def initialize_share_data(workers: int = 1): _internal_lock, \ _pipeline_status_lock, \ _graph_db_lock, \ + _data_init_lock, \ _shared_dicts, \ _init_flags, \ _initialized, \ @@ -226,15 +238,16 @@ def initialize_share_data(workers: int = 1): ) return - _manager = Manager() _workers = workers if workers > 1: is_multiprocess = True + _manager = Manager() _internal_lock = _manager.Lock() _storage_lock = _manager.Lock() _pipeline_status_lock = _manager.Lock() _graph_db_lock = _manager.Lock() + _data_init_lock = _manager.Lock() _shared_dicts = _manager.dict() _init_flags = _manager.dict() _update_flags = _manager.dict() @@ -247,6 +260,7 @@ def initialize_share_data(workers: int = 1): _storage_lock = asyncio.Lock() _pipeline_status_lock = asyncio.Lock() _graph_db_lock = asyncio.Lock() + _data_init_lock = asyncio.Lock() _shared_dicts = {} _init_flags = {} _update_flags = {} @@ -415,6 +429,7 @@ def finalize_share_data(): _internal_lock, \ _pipeline_status_lock, \ _graph_db_lock, \ + _data_init_lock, \ _shared_dicts, \ _init_flags, \ _initialized, \ @@ -481,6 +496,7 @@ def finalize_share_data(): _internal_lock = None _pipeline_status_lock = None _graph_db_lock = None + _data_init_lock = None _update_flags = None direct_log(f"Process {os.getpid()} storage data finalization complete")