Refactor shared storage locks to separate pipeline, storage and internal locks for deadlock preventing

This commit is contained in:
yangdx
2025-03-01 10:48:55 +08:00
parent d3de57c1e4
commit c07a5039b7
3 changed files with 59 additions and 38 deletions

View File

@@ -138,17 +138,17 @@ def create_app(args):
# Import necessary functions from shared_storage # Import necessary functions from shared_storage
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_namespace_data, get_namespace_data,
get_storage_lock, get_pipeline_status_lock,
initialize_pipeline_namespace, initialize_pipeline_status,
) )
await initialize_pipeline_namespace() await initialize_pipeline_status()
# Auto scan documents if enabled # Auto scan documents if enabled
if args.auto_scan_at_startup: if args.auto_scan_at_startup:
# Check if a task is already running (with lock protection) # Check if a task is already running (with lock protection)
pipeline_status = await get_namespace_data("pipeline_status") pipeline_status = await get_namespace_data("pipeline_status")
should_start_task = False should_start_task = False
async with get_storage_lock(): async with get_pipeline_status_lock():
if not pipeline_status.get("busy", False): if not pipeline_status.get("busy", False):
should_start_task = True should_start_task = True
# Only start the task if no other task is running # Only start the task if no other task is running

View File

@@ -16,6 +16,22 @@ def direct_log(message, level="INFO"):
T = TypeVar('T') T = TypeVar('T')
LockType = Union[ProcessLock, asyncio.Lock]
is_multiprocess = None
_workers = None
_manager = None
_initialized = None
# shared data for storage across processes
_shared_dicts: Optional[Dict[str, Any]] = None
_init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized
_update_flags: Optional[Dict[str, bool]] = None # namespace -> updated
# locks for mutex access
_storage_lock: Optional[LockType] = None
_internal_lock: Optional[LockType] = None
_pipeline_status_lock: Optional[LockType] = None
class UnifiedLock(Generic[T]): class UnifiedLock(Generic[T]):
"""Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock""" """Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock"""
@@ -39,30 +55,37 @@ class UnifiedLock(Generic[T]):
def __enter__(self) -> 'UnifiedLock[T]': def __enter__(self) -> 'UnifiedLock[T]':
"""For backward compatibility""" """For backward compatibility"""
if self._is_async: if self._is_async:
raise RuntimeError("Use 'async with' for asyncio.Lock") raise RuntimeError("Use 'async with' for shared_storage lock")
self._lock.acquire() self._lock.acquire()
return self return self
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
"""For backward compatibility""" """For backward compatibility"""
if self._is_async: if self._is_async:
raise RuntimeError("Use 'async with' for asyncio.Lock") raise RuntimeError("Use 'async with' for shared_storage lock")
self._lock.release() self._lock.release()
LockType = Union[ProcessLock, asyncio.Lock] def get_internal_lock() -> UnifiedLock:
"""return unified storage lock for data consistency"""
return UnifiedLock(
lock=_internal_lock,
is_async=not is_multiprocess
)
is_multiprocess = None def get_storage_lock() -> UnifiedLock:
_workers = None """return unified storage lock for data consistency"""
_manager = None return UnifiedLock(
_initialized = None lock=_storage_lock,
_global_lock: Optional[LockType] = None is_async=not is_multiprocess
)
# shared data for storage across processes
_shared_dicts: Optional[Dict[str, Any]] = None
_init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized
_update_flags: Optional[Dict[str, bool]] = None # namespace -> updated
def get_pipeline_status_lock() -> UnifiedLock:
"""return unified storage lock for data consistency"""
return UnifiedLock(
lock=_pipeline_status_lock,
is_async=not is_multiprocess
)
def initialize_share_data(workers: int = 1): def initialize_share_data(workers: int = 1):
""" """
@@ -87,7 +110,9 @@ def initialize_share_data(workers: int = 1):
_workers, \ _workers, \
is_multiprocess, \ is_multiprocess, \
is_multiprocess, \ is_multiprocess, \
_global_lock, \ _storage_lock, \
_internal_lock, \
_pipeline_status_lock, \
_shared_dicts, \ _shared_dicts, \
_init_flags, \ _init_flags, \
_initialized, \ _initialized, \
@@ -105,7 +130,9 @@ def initialize_share_data(workers: int = 1):
if workers > 1: if workers > 1:
is_multiprocess = True is_multiprocess = True
_global_lock = _manager.Lock() _internal_lock = _manager.Lock()
_storage_lock = _manager.Lock()
_pipeline_status_lock = _manager.Lock()
_shared_dicts = _manager.dict() _shared_dicts = _manager.dict()
_init_flags = _manager.dict() _init_flags = _manager.dict()
_update_flags = _manager.dict() _update_flags = _manager.dict()
@@ -114,7 +141,9 @@ def initialize_share_data(workers: int = 1):
) )
else: else:
is_multiprocess = False is_multiprocess = False
_global_lock = asyncio.Lock() _internal_lock = asyncio.Lock()
_storage_lock = asyncio.Lock()
_pipeline_status_lock = asyncio.Lock()
_shared_dicts = {} _shared_dicts = {}
_init_flags = {} _init_flags = {}
_update_flags = {} _update_flags = {}
@@ -124,13 +153,13 @@ def initialize_share_data(workers: int = 1):
_initialized = True _initialized = True
async def initialize_pipeline_namespace(): async def initialize_pipeline_status():
""" """
Initialize pipeline namespace with default values. Initialize pipeline namespace with default values.
""" """
pipeline_namespace = await get_namespace_data("pipeline_status") pipeline_namespace = await get_namespace_data("pipeline_status")
async with get_storage_lock(): async with get_internal_lock():
# Check if already initialized by checking for required fields # Check if already initialized by checking for required fields
if "busy" in pipeline_namespace: if "busy" in pipeline_namespace:
return return
@@ -160,7 +189,7 @@ async def get_update_flag(namespace: str):
if _update_flags is None: if _update_flags is None:
raise ValueError("Try to create namespace before Shared-Data is initialized") raise ValueError("Try to create namespace before Shared-Data is initialized")
async with get_storage_lock(): async with get_internal_lock():
if namespace not in _update_flags: if namespace not in _update_flags:
if is_multiprocess and _manager is not None: if is_multiprocess and _manager is not None:
_update_flags[namespace] = _manager.list() _update_flags[namespace] = _manager.list()
@@ -182,7 +211,7 @@ async def set_all_update_flags(namespace: str):
if _update_flags is None: if _update_flags is None:
raise ValueError("Try to create namespace before Shared-Data is initialized") raise ValueError("Try to create namespace before Shared-Data is initialized")
async with get_storage_lock(): async with get_internal_lock():
if namespace not in _update_flags: if namespace not in _update_flags:
raise ValueError(f"Namespace {namespace} not found in update flags") raise ValueError(f"Namespace {namespace} not found in update flags")
# Update flags for both modes # Update flags for both modes
@@ -215,14 +244,6 @@ def try_initialize_namespace(namespace: str) -> bool:
return False return False
def get_storage_lock() -> UnifiedLock:
"""return unified storage lock for data consistency"""
return UnifiedLock(
lock=_global_lock,
is_async=not is_multiprocess
)
async def get_namespace_data(namespace: str) -> Dict[str, Any]: async def get_namespace_data(namespace: str) -> Dict[str, Any]:
"""get storage space for specific storage type(namespace)""" """get storage space for specific storage type(namespace)"""
if _shared_dicts is None: if _shared_dicts is None:
@@ -232,7 +253,7 @@ async def get_namespace_data(namespace: str) -> Dict[str, Any]:
) )
raise ValueError("Shared dictionaries not initialized") raise ValueError("Shared dictionaries not initialized")
async with get_storage_lock(): async with get_internal_lock():
if namespace not in _shared_dicts: if namespace not in _shared_dicts:
if is_multiprocess and _manager is not None: if is_multiprocess and _manager is not None:
_shared_dicts[namespace] = _manager.dict() _shared_dicts[namespace] = _manager.dict()

View File

@@ -669,15 +669,15 @@ class LightRAG:
3. Process each chunk for entity and relation extraction 3. Process each chunk for entity and relation extraction
4. Update the document status 4. Update the document status
""" """
from lightrag.kg.shared_storage import get_namespace_data, get_storage_lock from lightrag.kg.shared_storage import get_namespace_data, get_pipeline_status_lock
# Get pipeline status shared data and lock # Get pipeline status shared data and lock
pipeline_status = await get_namespace_data("pipeline_status") pipeline_status = await get_namespace_data("pipeline_status")
storage_lock = get_storage_lock() pipeline_status_lock = get_pipeline_status_lock()
# Check if another process is already processing the queue # Check if another process is already processing the queue
process_documents = False process_documents = False
async with storage_lock: async with pipeline_status_lock:
# Ensure only one worker is processing documents # Ensure only one worker is processing documents
if not pipeline_status.get("busy", False): if not pipeline_status.get("busy", False):
# Cleaning history_messages without breaking it as a shared list object # Cleaning history_messages without breaking it as a shared list object
@@ -851,7 +851,7 @@ class LightRAG:
# Check if there's a pending request to process more documents (with lock) # Check if there's a pending request to process more documents (with lock)
has_pending_request = False has_pending_request = False
async with storage_lock: async with pipeline_status_lock:
has_pending_request = pipeline_status.get("request_pending", False) has_pending_request = pipeline_status.get("request_pending", False)
if has_pending_request: if has_pending_request:
# Clear the request flag before checking for more documents # Clear the request flag before checking for more documents
@@ -869,7 +869,7 @@ class LightRAG:
log_message = "Document processing pipeline completed" log_message = "Document processing pipeline completed"
logger.info(log_message) logger.info(log_message)
# Always reset busy status when done or if an exception occurs (with lock) # Always reset busy status when done or if an exception occurs (with lock)
async with storage_lock: async with pipeline_status_lock:
pipeline_status["busy"] = False pipeline_status["busy"] = False
pipeline_status["latest_message"] = log_message pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message) pipeline_status["history_messages"].append(log_message)