Improve shared storage cleanup and clarify initialization in multi-worker setup
This commit is contained in:
@@ -91,7 +91,7 @@ def initialize_share_data(workers: int = 1):
|
|||||||
master process before forking worker processes, allowing all workers to share
|
master process before forking worker processes, allowing all workers to share
|
||||||
the same initialized data.
|
the same initialized data.
|
||||||
|
|
||||||
In single-process mode, this function is called during LightRAG object initialization.
|
In single-process mode, this function is called in FASTAPI lifespan function.
|
||||||
|
|
||||||
The function determines whether to use cross-process shared variables for data storage
|
The function determines whether to use cross-process shared variables for data storage
|
||||||
based on the number of workers. If workers=1, it uses thread locks and local dictionaries.
|
based on the number of workers. If workers=1, it uses thread locks and local dictionaries.
|
||||||
@@ -105,7 +105,6 @@ def initialize_share_data(workers: int = 1):
|
|||||||
_manager, \
|
_manager, \
|
||||||
_workers, \
|
_workers, \
|
||||||
is_multiprocess, \
|
is_multiprocess, \
|
||||||
is_multiprocess, \
|
|
||||||
_storage_lock, \
|
_storage_lock, \
|
||||||
_internal_lock, \
|
_internal_lock, \
|
||||||
_pipeline_status_lock, \
|
_pipeline_status_lock, \
|
||||||
@@ -152,6 +151,7 @@ def initialize_share_data(workers: int = 1):
|
|||||||
async def initialize_pipeline_status():
|
async def initialize_pipeline_status():
|
||||||
"""
|
"""
|
||||||
Initialize pipeline namespace with default values.
|
Initialize pipeline namespace with default values.
|
||||||
|
This function is called during FASTAPI lifespan for each worker.
|
||||||
"""
|
"""
|
||||||
pipeline_namespace = await get_namespace_data("pipeline_status")
|
pipeline_namespace = await get_namespace_data("pipeline_status")
|
||||||
|
|
||||||
@@ -249,8 +249,8 @@ async def get_all_update_flags_status() -> Dict[str, list]:
|
|||||||
|
|
||||||
def try_initialize_namespace(namespace: str) -> bool:
|
def try_initialize_namespace(namespace: str) -> bool:
|
||||||
"""
|
"""
|
||||||
Try to initialize a namespace. Returns True if the current process gets initialization permission.
|
Returns True if the current worker(process) gets initialization permission for loading data later.
|
||||||
Uses atomic operations on shared dictionaries to ensure only one process can successfully initialize.
|
The worker does not get the permission is prohibited to load data from files.
|
||||||
"""
|
"""
|
||||||
global _init_flags, _manager
|
global _init_flags, _manager
|
||||||
|
|
||||||
@@ -270,10 +270,10 @@ def try_initialize_namespace(namespace: str) -> bool:
|
|||||||
|
|
||||||
|
|
||||||
async def get_namespace_data(namespace: str) -> Dict[str, Any]:
|
async def get_namespace_data(namespace: str) -> Dict[str, Any]:
|
||||||
"""get storage space for specific storage type(namespace)"""
|
"""get the shared data reference for specific namespace"""
|
||||||
if _shared_dicts is None:
|
if _shared_dicts is None:
|
||||||
direct_log(
|
direct_log(
|
||||||
f"Error: try to getnanmespace before Shared-Data is initialized, pid={os.getpid()}",
|
f"Error: try to getnanmespace before it is initialized, pid={os.getpid()}",
|
||||||
level="ERROR",
|
level="ERROR",
|
||||||
)
|
)
|
||||||
raise ValueError("Shared dictionaries not initialized")
|
raise ValueError("Shared dictionaries not initialized")
|
||||||
@@ -301,10 +301,13 @@ def finalize_share_data():
|
|||||||
global \
|
global \
|
||||||
_manager, \
|
_manager, \
|
||||||
is_multiprocess, \
|
is_multiprocess, \
|
||||||
_global_lock, \
|
_storage_lock, \
|
||||||
|
_internal_lock, \
|
||||||
|
_pipeline_status_lock, \
|
||||||
_shared_dicts, \
|
_shared_dicts, \
|
||||||
_init_flags, \
|
_init_flags, \
|
||||||
_initialized
|
_initialized, \
|
||||||
|
_update_flags
|
||||||
|
|
||||||
# Check if already initialized
|
# Check if already initialized
|
||||||
if not _initialized:
|
if not _initialized:
|
||||||
@@ -320,13 +323,36 @@ def finalize_share_data():
|
|||||||
# In multi-process mode, shut down the Manager
|
# In multi-process mode, shut down the Manager
|
||||||
if is_multiprocess and _manager is not None:
|
if is_multiprocess and _manager is not None:
|
||||||
try:
|
try:
|
||||||
# Clear shared dictionaries first
|
# Clear shared resources before shutting down Manager
|
||||||
if _shared_dicts is not None:
|
if _shared_dicts is not None:
|
||||||
|
# Clear pipeline status history messages first if exists
|
||||||
|
try:
|
||||||
|
pipeline_status = _shared_dicts.get("pipeline_status", {})
|
||||||
|
if "history_messages" in pipeline_status:
|
||||||
|
pipeline_status["history_messages"].clear()
|
||||||
|
except Exception:
|
||||||
|
pass # Ignore any errors during history messages cleanup
|
||||||
_shared_dicts.clear()
|
_shared_dicts.clear()
|
||||||
if _init_flags is not None:
|
if _init_flags is not None:
|
||||||
_init_flags.clear()
|
_init_flags.clear()
|
||||||
|
if _update_flags is not None:
|
||||||
|
# Clear each namespace's update flags list and Value objects
|
||||||
|
try:
|
||||||
|
for namespace in _update_flags:
|
||||||
|
flags_list = _update_flags[namespace]
|
||||||
|
if isinstance(flags_list, list):
|
||||||
|
# Clear Value objects in the list
|
||||||
|
for flag in flags_list:
|
||||||
|
if hasattr(
|
||||||
|
flag, "value"
|
||||||
|
): # Check if it's a Value object
|
||||||
|
flag.value = False
|
||||||
|
flags_list.clear()
|
||||||
|
except Exception:
|
||||||
|
pass # Ignore any errors during update flags cleanup
|
||||||
|
_update_flags.clear()
|
||||||
|
|
||||||
# Shut down the Manager
|
# Shut down the Manager - this will automatically clean up all shared resources
|
||||||
_manager.shutdown()
|
_manager.shutdown()
|
||||||
direct_log(f"Process {os.getpid()} Manager shutdown complete")
|
direct_log(f"Process {os.getpid()} Manager shutdown complete")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -340,6 +366,9 @@ def finalize_share_data():
|
|||||||
is_multiprocess = None
|
is_multiprocess = None
|
||||||
_shared_dicts = None
|
_shared_dicts = None
|
||||||
_init_flags = None
|
_init_flags = None
|
||||||
_global_lock = None
|
_storage_lock = None
|
||||||
|
_internal_lock = None
|
||||||
|
_pipeline_status_lock = None
|
||||||
|
_update_flags = None
|
||||||
|
|
||||||
direct_log(f"Process {os.getpid()} storage data finalization complete")
|
direct_log(f"Process {os.getpid()} storage data finalization complete")
|
||||||
|
Reference in New Issue
Block a user