Add graph DB lock to shared storage system

• Introduced new graph_db_lock
• Added detailed lock debugging output
This commit is contained in:
yangdx
2025-03-08 22:36:41 +08:00
parent fb4a4c736e
commit 95c06f1bde

View File

@@ -7,11 +7,17 @@ from typing import Any, Dict, Optional, Union, TypeVar, Generic
# Define a direct print function for critical logs that must be visible in all processes # Define a direct print function for critical logs that must be visible in all processes
def direct_log(message, level="INFO"): def direct_log(message, level="INFO", enable_output: bool = True):
""" """
Log a message directly to stderr to ensure visibility in all processes, Log a message directly to stderr to ensure visibility in all processes,
including the Gunicorn master process. including the Gunicorn master process.
Args:
message: The message to log
level: Log level (default: "INFO")
enable_output: Whether to actually output the log (default: True)
""" """
if enable_output:
print(f"{level}: {message}", file=sys.stderr, flush=True) print(f"{level}: {message}", file=sys.stderr, flush=True)
@@ -32,55 +38,88 @@ _update_flags: Optional[Dict[str, bool]] = None # namespace -> updated
_storage_lock: Optional[LockType] = None _storage_lock: Optional[LockType] = None
_internal_lock: Optional[LockType] = None _internal_lock: Optional[LockType] = None
_pipeline_status_lock: Optional[LockType] = None _pipeline_status_lock: Optional[LockType] = None
_graph_db_lock: Optional[LockType] = None
class UnifiedLock(Generic[T]): class UnifiedLock(Generic[T]):
"""Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock""" """Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock"""
def __init__(self, lock: Union[ProcessLock, asyncio.Lock], is_async: bool): def __init__(self, lock: Union[ProcessLock, asyncio.Lock], is_async: bool, name: str = "unnamed", enable_logging: bool = True):
self._lock = lock self._lock = lock
self._is_async = is_async self._is_async = is_async
self._pid = os.getpid() # for debug only
self._name = name # for debug only
self._enable_logging = enable_logging # for debug only
async def __aenter__(self) -> "UnifiedLock[T]": async def __aenter__(self) -> "UnifiedLock[T]":
try:
direct_log(f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (async={self._is_async})", enable_output=self._enable_logging)
if self._is_async: if self._is_async:
await self._lock.acquire() await self._lock.acquire()
else: else:
self._lock.acquire() self._lock.acquire()
direct_log(f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (async={self._is_async})", enable_output=self._enable_logging)
return self return self
except Exception as e:
direct_log(f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}", level="ERROR", enable_output=self._enable_logging)
raise
async def __aexit__(self, exc_type, exc_val, exc_tb): async def __aexit__(self, exc_type, exc_val, exc_tb):
try:
direct_log(f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (async={self._is_async})", enable_output=self._enable_logging)
if self._is_async: if self._is_async:
self._lock.release() self._lock.release()
else: else:
self._lock.release() self._lock.release()
direct_log(f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})", enable_output=self._enable_logging)
except Exception as e:
direct_log(f"== Lock == Process {self._pid}: Failed to release lock '{self._name}': {e}", level="ERROR", enable_output=self._enable_logging)
raise
def __enter__(self) -> "UnifiedLock[T]": def __enter__(self) -> "UnifiedLock[T]":
"""For backward compatibility""" """For backward compatibility"""
try:
if self._is_async: if self._is_async:
raise RuntimeError("Use 'async with' for shared_storage lock") raise RuntimeError("Use 'async with' for shared_storage lock")
direct_log(f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (sync)", enable_output=self._enable_logging)
self._lock.acquire() self._lock.acquire()
direct_log(f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (sync)", enable_output=self._enable_logging)
return self return self
except Exception as e:
direct_log(f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}' (sync): {e}", level="ERROR", enable_output=self._enable_logging)
raise
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
"""For backward compatibility""" """For backward compatibility"""
try:
if self._is_async: if self._is_async:
raise RuntimeError("Use 'async with' for shared_storage lock") raise RuntimeError("Use 'async with' for shared_storage lock")
direct_log(f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (sync)", enable_output=self._enable_logging)
self._lock.release() self._lock.release()
direct_log(f"== Lock == Process {self._pid}: Lock '{self._name}' released (sync)", enable_output=self._enable_logging)
except Exception as e:
direct_log(f"== Lock == Process {self._pid}: Failed to release lock '{self._name}' (sync): {e}", level="ERROR", enable_output=self._enable_logging)
raise
def get_internal_lock() -> UnifiedLock: def get_internal_lock(enable_logging: bool = False) -> UnifiedLock:
"""return unified storage lock for data consistency""" """return unified storage lock for data consistency"""
return UnifiedLock(lock=_internal_lock, is_async=not is_multiprocess) return UnifiedLock(lock=_internal_lock, is_async=not is_multiprocess, name="internal_lock", enable_logging=enable_logging)
def get_storage_lock() -> UnifiedLock: def get_storage_lock(enable_logging: bool = False) -> UnifiedLock:
"""return unified storage lock for data consistency""" """return unified storage lock for data consistency"""
return UnifiedLock(lock=_storage_lock, is_async=not is_multiprocess) return UnifiedLock(lock=_storage_lock, is_async=not is_multiprocess, name="storage_lock", enable_logging=enable_logging)
def get_pipeline_status_lock() -> UnifiedLock: def get_pipeline_status_lock(enable_logging: bool = False) -> UnifiedLock:
"""return unified storage lock for data consistency""" """return unified storage lock for data consistency"""
return UnifiedLock(lock=_pipeline_status_lock, is_async=not is_multiprocess) return UnifiedLock(lock=_pipeline_status_lock, is_async=not is_multiprocess, name="pipeline_status_lock", enable_logging=enable_logging)
def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock:
"""return unified graph database lock for ensuring atomic operations"""
return UnifiedLock(lock=_graph_db_lock, is_async=not is_multiprocess, name="graph_db_lock", enable_logging=enable_logging)
def initialize_share_data(workers: int = 1): def initialize_share_data(workers: int = 1):
@@ -108,6 +147,7 @@ def initialize_share_data(workers: int = 1):
_storage_lock, \ _storage_lock, \
_internal_lock, \ _internal_lock, \
_pipeline_status_lock, \ _pipeline_status_lock, \
_graph_db_lock, \
_shared_dicts, \ _shared_dicts, \
_init_flags, \ _init_flags, \
_initialized, \ _initialized, \
@@ -128,6 +168,7 @@ def initialize_share_data(workers: int = 1):
_internal_lock = _manager.Lock() _internal_lock = _manager.Lock()
_storage_lock = _manager.Lock() _storage_lock = _manager.Lock()
_pipeline_status_lock = _manager.Lock() _pipeline_status_lock = _manager.Lock()
_graph_db_lock = _manager.Lock()
_shared_dicts = _manager.dict() _shared_dicts = _manager.dict()
_init_flags = _manager.dict() _init_flags = _manager.dict()
_update_flags = _manager.dict() _update_flags = _manager.dict()
@@ -139,6 +180,7 @@ def initialize_share_data(workers: int = 1):
_internal_lock = asyncio.Lock() _internal_lock = asyncio.Lock()
_storage_lock = asyncio.Lock() _storage_lock = asyncio.Lock()
_pipeline_status_lock = asyncio.Lock() _pipeline_status_lock = asyncio.Lock()
_graph_db_lock = asyncio.Lock()
_shared_dicts = {} _shared_dicts = {}
_init_flags = {} _init_flags = {}
_update_flags = {} _update_flags = {}
@@ -304,6 +346,7 @@ def finalize_share_data():
_storage_lock, \ _storage_lock, \
_internal_lock, \ _internal_lock, \
_pipeline_status_lock, \ _pipeline_status_lock, \
_graph_db_lock, \
_shared_dicts, \ _shared_dicts, \
_init_flags, \ _init_flags, \
_initialized, \ _initialized, \
@@ -369,6 +412,7 @@ def finalize_share_data():
_storage_lock = None _storage_lock = None
_internal_lock = None _internal_lock = None
_pipeline_status_lock = None _pipeline_status_lock = None
_graph_db_lock = None
_update_flags = None _update_flags = None
direct_log(f"Process {os.getpid()} storage data finalization complete") direct_log(f"Process {os.getpid()} storage data finalization complete")