Fix async issues in namespace init

This commit is contained in:
yangdx
2025-03-09 15:22:06 +08:00
parent 6a969e8de4
commit 90527875fd
3 changed files with 12 additions and 10 deletions

View File

@@ -33,7 +33,7 @@ class JsonDocStatusStorage(DocStatusStorage):
async def initialize(self): async def initialize(self):
"""Initialize storage data""" """Initialize storage data"""
# check need_init must before get_namespace_data # check need_init must before get_namespace_data
need_init = try_initialize_namespace(self.namespace) need_init = await try_initialize_namespace(self.namespace)
self._data = await get_namespace_data(self.namespace) self._data = await get_namespace_data(self.namespace)
if need_init: if need_init:
loaded_data = load_json(self._file_name) or {} loaded_data = load_json(self._file_name) or {}

View File

@@ -29,7 +29,7 @@ class JsonKVStorage(BaseKVStorage):
async def initialize(self): async def initialize(self):
"""Initialize storage data""" """Initialize storage data"""
# check need_init must before get_namespace_data # check need_init must before get_namespace_data
need_init = try_initialize_namespace(self.namespace) need_init = await try_initialize_namespace(self.namespace)
self._data = await get_namespace_data(self.namespace) self._data = await get_namespace_data(self.namespace)
if need_init: if need_init:
loaded_data = load_json(self._file_name) or {} loaded_data = load_json(self._file_name) or {}

View File

@@ -355,7 +355,7 @@ async def get_all_update_flags_status() -> Dict[str, list]:
return result return result
def try_initialize_namespace(namespace: str) -> bool: async def try_initialize_namespace(namespace: str) -> bool:
""" """
Returns True if the current worker(process) gets initialization permission for loading data later. Returns True if the current worker(process) gets initialization permission for loading data later.
The worker does not get the permission is prohibited to load data from files. The worker does not get the permission is prohibited to load data from files.
@@ -365,15 +365,17 @@ def try_initialize_namespace(namespace: str) -> bool:
if _init_flags is None: if _init_flags is None:
raise ValueError("Try to create nanmespace before Shared-Data is initialized") raise ValueError("Try to create nanmespace before Shared-Data is initialized")
if namespace not in _init_flags: async with get_internal_lock():
_init_flags[namespace] = True if namespace not in _init_flags:
_init_flags[namespace] = True
direct_log(
f"Process {os.getpid()} ready to initialize storage namespace: [{namespace}]"
)
return True
direct_log( direct_log(
f"Process {os.getpid()} ready to initialize storage namespace: [{namespace}]" f"Process {os.getpid()} storage namespace already initialized: [{namespace}]"
) )
return True
direct_log(
f"Process {os.getpid()} storage namespace already initialized: [{namespace}]"
)
return False return False