From 90527875fd74c0eb4ace001ef96d9de71ffa3146 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 9 Mar 2025 15:22:06 +0800 Subject: [PATCH] Fix async issues in namespace init --- lightrag/kg/json_doc_status_impl.py | 2 +- lightrag/kg/json_kv_impl.py | 2 +- lightrag/kg/shared_storage.py | 18 ++++++++++-------- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index 01c657fa..824bd052 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -33,7 +33,7 @@ class JsonDocStatusStorage(DocStatusStorage): async def initialize(self): """Initialize storage data""" # check need_init must before get_namespace_data - need_init = try_initialize_namespace(self.namespace) + need_init = await try_initialize_namespace(self.namespace) self._data = await get_namespace_data(self.namespace) if need_init: loaded_data = load_json(self._file_name) or {} diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index c0b61a63..96217d4b 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -29,7 +29,7 @@ class JsonKVStorage(BaseKVStorage): async def initialize(self): """Initialize storage data""" # check need_init must before get_namespace_data - need_init = try_initialize_namespace(self.namespace) + need_init = await try_initialize_namespace(self.namespace) self._data = await get_namespace_data(self.namespace) if need_init: loaded_data = load_json(self._file_name) or {} diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 9ccb2a99..68747ff8 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -355,7 +355,7 @@ async def get_all_update_flags_status() -> Dict[str, list]: return result -def try_initialize_namespace(namespace: str) -> bool: +async def try_initialize_namespace(namespace: str) -> bool: """ Returns True if the current worker(process) gets initialization permission for loading data later. The worker does not get the permission is prohibited to load data from files. @@ -365,15 +365,17 @@ def try_initialize_namespace(namespace: str) -> bool: if _init_flags is None: raise ValueError("Try to create nanmespace before Shared-Data is initialized") - if namespace not in _init_flags: - _init_flags[namespace] = True + async with get_internal_lock(): + if namespace not in _init_flags: + _init_flags[namespace] = True + direct_log( + f"Process {os.getpid()} ready to initialize storage namespace: [{namespace}]" + ) + return True direct_log( - f"Process {os.getpid()} ready to initialize storage namespace: [{namespace}]" + f"Process {os.getpid()} storage namespace already initialized: [{namespace}]" ) - return True - direct_log( - f"Process {os.getpid()} storage namespace already initialized: [{namespace}]" - ) + return False