simplify process state management by removing redundant multiprocess flag

This commit is contained in:
yangdx
2025-02-27 15:36:12 +08:00
parent 1699b10a25
commit e881bc0709

View File

@@ -18,7 +18,6 @@ LockType = Union[ProcessLock, ThreadLock]
_manager = None _manager = None
_initialized = None _initialized = None
_is_multiprocess = None
is_multiprocess = None is_multiprocess = None
# shared data for storage across processes # shared data for storage across processes
@@ -47,21 +46,18 @@ def initialize_share_data(workers: int = 1):
workers (int): Number of worker processes. If 1, single-process mode is used. workers (int): Number of worker processes. If 1, single-process mode is used.
If > 1, multi-process mode with shared memory is used. If > 1, multi-process mode with shared memory is used.
""" """
global _manager, _is_multiprocess, is_multiprocess, _global_lock, _shared_dicts, _share_objects, _init_flags, _initialized global _manager, is_multiprocess, is_multiprocess, _global_lock, _shared_dicts, _share_objects, _init_flags, _initialized
# Check if already initialized # Check if already initialized
if _initialized and _initialized.value: if _initialized:
is_multiprocess = _is_multiprocess.value direct_log(f"Process {os.getpid()} storage data already initialized (multiprocess={is_multiprocess})")
direct_log(f"Process {os.getpid()} storage data already initialized (multiprocess={_is_multiprocess.value})")
return return
_manager = Manager() _manager = Manager()
_initialized = _manager.Value("b", False)
_is_multiprocess = _manager.Value("b", False)
# Force multi-process mode if workers > 1 # Force multi-process mode if workers > 1
if workers > 1: if workers > 1:
_is_multiprocess.value = True is_multiprocess = True
_global_lock = _manager.Lock() _global_lock = _manager.Lock()
# Create shared dictionaries with manager # Create shared dictionaries with manager
_shared_dicts = _manager.dict() _shared_dicts = _manager.dict()
@@ -69,7 +65,7 @@ def initialize_share_data(workers: int = 1):
_init_flags = _manager.dict() # Use shared dictionary to store initialization flags _init_flags = _manager.dict() # Use shared dictionary to store initialization flags
direct_log(f"Process {os.getpid()} storage data created for Multiple Process (workers={workers})") direct_log(f"Process {os.getpid()} storage data created for Multiple Process (workers={workers})")
else: else:
_is_multiprocess.value = False is_multiprocess = False
_global_lock = ThreadLock() _global_lock = ThreadLock()
_shared_dicts = {} _shared_dicts = {}
_share_objects = {} _share_objects = {}
@@ -77,8 +73,7 @@ def initialize_share_data(workers: int = 1):
direct_log(f"Process {os.getpid()} storage data created for Single Process") direct_log(f"Process {os.getpid()} storage data created for Single Process")
# Mark as initialized # Mark as initialized
_initialized.value = True _initialized = True
is_multiprocess = _is_multiprocess.value
def try_initialize_namespace(namespace: str) -> bool: def try_initialize_namespace(namespace: str) -> bool:
""" """
@@ -87,7 +82,7 @@ def try_initialize_namespace(namespace: str) -> bool:
""" """
global _init_flags, _manager global _init_flags, _manager
if _is_multiprocess.value: if is_multiprocess:
if _init_flags is None: if _init_flags is None:
raise RuntimeError( raise RuntimeError(
"Shared storage not initialized. Call initialize_share_data() first." "Shared storage not initialized. Call initialize_share_data() first."
@@ -132,7 +127,7 @@ def get_namespace_object(namespace: str) -> Any:
lock = _get_global_lock() lock = _get_global_lock()
with lock: with lock:
if namespace not in _share_objects: if namespace not in _share_objects:
if _is_multiprocess.value: if is_multiprocess:
_share_objects[namespace] = _manager.Value("O", None) _share_objects[namespace] = _manager.Value("O", None)
else: else:
_share_objects[namespace] = None _share_objects[namespace] = None
@@ -166,17 +161,17 @@ def finalize_share_data():
In multi-process mode, it shuts down the Manager and releases all shared objects. In multi-process mode, it shuts down the Manager and releases all shared objects.
In single-process mode, it simply resets the global variables. In single-process mode, it simply resets the global variables.
""" """
global _manager, _is_multiprocess, is_multiprocess, _global_lock, _shared_dicts, _share_objects, _init_flags, _initialized global _manager, is_multiprocess, _global_lock, _shared_dicts, _share_objects, _init_flags, _initialized
# Check if already initialized # Check if already initialized
if not (_initialized and _initialized.value): if not _initialized:
direct_log(f"Process {os.getpid()} storage data not initialized, nothing to finalize") direct_log(f"Process {os.getpid()} storage data not initialized, nothing to finalize")
return return
direct_log(f"Process {os.getpid()} finalizing storage data (multiprocess={_is_multiprocess.value})") direct_log(f"Process {os.getpid()} finalizing storage data (multiprocess={is_multiprocess})")
# In multi-process mode, shut down the Manager # In multi-process mode, shut down the Manager
if _is_multiprocess.value and _manager is not None: if is_multiprocess and _manager is not None:
try: try:
# Clear shared dictionaries first # Clear shared dictionaries first
if _shared_dicts is not None: if _shared_dicts is not None:
@@ -195,7 +190,6 @@ def finalize_share_data():
# Reset global variables # Reset global variables
_manager = None _manager = None
_initialized = None _initialized = None
_is_multiprocess = None
is_multiprocess = None is_multiprocess = None
_shared_dicts = None _shared_dicts = None
_share_objects = None _share_objects = None