Merge pull request #1145 from danielaskdd/optimize-doc-scan

Optimize parallel file processing in folder scan and fix dead lock problem in Gunicorn multi workers mode
This commit is contained in:
Daniel.y
2025-03-21 17:21:47 +08:00
committed by GitHub
7 changed files with 145 additions and 24 deletions

View File

@@ -3,6 +3,9 @@ from datetime import datetime, timedelta
import jwt import jwt
from fastapi import HTTPException, status from fastapi import HTTPException, status
from pydantic import BaseModel from pydantic import BaseModel
from dotenv import load_dotenv
load_dotenv()
class TokenPayload(BaseModel): class TokenPayload(BaseModel):

View File

@@ -29,7 +29,9 @@ preload_app = True
worker_class = "uvicorn.workers.UvicornWorker" worker_class = "uvicorn.workers.UvicornWorker"
# Other Gunicorn configurations # Other Gunicorn configurations
timeout = int(os.getenv("TIMEOUT", 150)) # Default 150s to match run_with_gunicorn.py timeout = int(
os.getenv("TIMEOUT", 150 * 2)
) # Default 150s *2 to match run_with_gunicorn.py
keepalive = int(os.getenv("KEEPALIVE", 5)) # Default 5s keepalive = int(os.getenv("KEEPALIVE", 5)) # Default 5s
# Logging configuration # Logging configuration

View File

@@ -49,7 +49,7 @@ from .auth import auth_handler
# Load environment variables # Load environment variables
# Updated to use the .env that is inside the current folder # Updated to use the .env that is inside the current folder
# This update allows the user to put a different.env file for each lightrag folder # This update allows the user to put a different.env file for each lightrag folder
load_dotenv(".env", override=True) load_dotenv()
# Initialize config parser # Initialize config parser
config = configparser.ConfigParser() config = configparser.ConfigParser()

View File

@@ -405,7 +405,7 @@ async def pipeline_index_file(rag: LightRAG, file_path: Path):
async def pipeline_index_files(rag: LightRAG, file_paths: List[Path]): async def pipeline_index_files(rag: LightRAG, file_paths: List[Path]):
"""Index multiple files concurrently """Index multiple files sequentially to avoid high CPU load
Args: Args:
rag: LightRAG instance rag: LightRAG instance
@@ -416,12 +416,12 @@ async def pipeline_index_files(rag: LightRAG, file_paths: List[Path]):
try: try:
enqueued = False enqueued = False
if len(file_paths) == 1: # Process files sequentially
enqueued = await pipeline_enqueue_file(rag, file_paths[0]) for file_path in file_paths:
else: if await pipeline_enqueue_file(rag, file_path):
tasks = [pipeline_enqueue_file(rag, path) for path in file_paths] enqueued = True
enqueued = any(await asyncio.gather(*tasks))
# Process the queue only if at least one file was successfully enqueued
if enqueued: if enqueued:
await rag.apipeline_process_enqueue_documents() await rag.apipeline_process_enqueue_documents()
except Exception as e: except Exception as e:
@@ -472,14 +472,34 @@ async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager):
total_files = len(new_files) total_files = len(new_files)
logger.info(f"Found {total_files} new files to index.") logger.info(f"Found {total_files} new files to index.")
for idx, file_path in enumerate(new_files): if not new_files:
try: return
await pipeline_index_file(rag, file_path)
except Exception as e: # Get MAX_PARALLEL_INSERT from global_args
logger.error(f"Error indexing file {file_path}: {str(e)}") max_parallel = global_args["max_parallel_insert"]
# Calculate batch size as 2 * MAX_PARALLEL_INSERT
batch_size = 2 * max_parallel
# Process files in batches
for i in range(0, total_files, batch_size):
batch_files = new_files[i : i + batch_size]
batch_num = i // batch_size + 1
total_batches = (total_files + batch_size - 1) // batch_size
logger.info(
f"Processing batch {batch_num}/{total_batches} with {len(batch_files)} files"
)
await pipeline_index_files(rag, batch_files)
# Log progress
processed = min(i + batch_size, total_files)
logger.info(
f"Processed {processed}/{total_files} files ({processed/total_files*100:.1f}%)"
)
except Exception as e: except Exception as e:
logger.error(f"Error during scanning process: {str(e)}") logger.error(f"Error during scanning process: {str(e)}")
logger.error(traceback.format_exc())
def create_document_routes( def create_document_routes(

View File

@@ -13,7 +13,7 @@ from dotenv import load_dotenv
# Updated to use the .env that is inside the current folder # Updated to use the .env that is inside the current folder
# This update allows the user to put a different.env file for each lightrag folder # This update allows the user to put a different.env file for each lightrag folder
load_dotenv(".env") load_dotenv()
def check_and_install_dependencies(): def check_and_install_dependencies():
@@ -140,7 +140,7 @@ def main():
# Timeout configuration prioritizes command line arguments # Timeout configuration prioritizes command line arguments
gunicorn_config.timeout = ( gunicorn_config.timeout = (
args.timeout if args.timeout else int(os.getenv("TIMEOUT", 150)) args.timeout if args.timeout * 2 else int(os.getenv("TIMEOUT", 150 * 2))
) )
# Keepalive configuration # Keepalive configuration

View File

@@ -16,7 +16,7 @@ from starlette.status import HTTP_403_FORBIDDEN
from .auth import auth_handler from .auth import auth_handler
# Load environment variables # Load environment variables
load_dotenv(override=True) load_dotenv()
global_args = {"main_args": None} global_args = {"main_args": None}
@@ -365,6 +365,9 @@ def parse_args(is_uvicorn_mode: bool = False) -> argparse.Namespace:
"LIGHTRAG_VECTOR_STORAGE", DefaultRAGStorageConfig.VECTOR_STORAGE "LIGHTRAG_VECTOR_STORAGE", DefaultRAGStorageConfig.VECTOR_STORAGE
) )
# Get MAX_PARALLEL_INSERT from environment
global_args["max_parallel_insert"] = get_env_value("MAX_PARALLEL_INSERT", 2, int)
# Handle openai-ollama special case # Handle openai-ollama special case
if args.llm_binding == "openai-ollama": if args.llm_binding == "openai-ollama":
args.llm_binding = "openai" args.llm_binding = "openai"
@@ -441,8 +444,8 @@ def display_splash_screen(args: argparse.Namespace) -> None:
ASCIIColors.yellow(f"{args.log_level}") ASCIIColors.yellow(f"{args.log_level}")
ASCIIColors.white(" ├─ Verbose Debug: ", end="") ASCIIColors.white(" ├─ Verbose Debug: ", end="")
ASCIIColors.yellow(f"{args.verbose}") ASCIIColors.yellow(f"{args.verbose}")
ASCIIColors.white(" ├─ Timeout: ", end="") ASCIIColors.white(" ├─ History Turns: ", end="")
ASCIIColors.yellow(f"{args.timeout if args.timeout else 'None (infinite)'}") ASCIIColors.yellow(f"{args.history_turns}")
ASCIIColors.white(" └─ API Key: ", end="") ASCIIColors.white(" └─ API Key: ", end="")
ASCIIColors.yellow("Set" if args.key else "Not Set") ASCIIColors.yellow("Set" if args.key else "Not Set")
@@ -459,8 +462,10 @@ def display_splash_screen(args: argparse.Namespace) -> None:
ASCIIColors.yellow(f"{args.llm_binding}") ASCIIColors.yellow(f"{args.llm_binding}")
ASCIIColors.white(" ├─ Host: ", end="") ASCIIColors.white(" ├─ Host: ", end="")
ASCIIColors.yellow(f"{args.llm_binding_host}") ASCIIColors.yellow(f"{args.llm_binding_host}")
ASCIIColors.white(" ─ Model: ", end="") ASCIIColors.white(" ─ Model: ", end="")
ASCIIColors.yellow(f"{args.llm_model}") ASCIIColors.yellow(f"{args.llm_model}")
ASCIIColors.white(" └─ Timeout: ", end="")
ASCIIColors.yellow(f"{args.timeout if args.timeout else 'None (infinite)'}")
# Embedding Configuration # Embedding Configuration
ASCIIColors.magenta("\n📊 Embedding Configuration:") ASCIIColors.magenta("\n📊 Embedding Configuration:")
@@ -475,8 +480,10 @@ def display_splash_screen(args: argparse.Namespace) -> None:
# RAG Configuration # RAG Configuration
ASCIIColors.magenta("\n⚙️ RAG Configuration:") ASCIIColors.magenta("\n⚙️ RAG Configuration:")
ASCIIColors.white(" ├─ Max Async Operations: ", end="") ASCIIColors.white(" ├─ Max Async for LLM: ", end="")
ASCIIColors.yellow(f"{args.max_async}") ASCIIColors.yellow(f"{args.max_async}")
ASCIIColors.white(" ├─ Max Parallel Insert: ", end="")
ASCIIColors.yellow(f"{global_args['max_parallel_insert']}")
ASCIIColors.white(" ├─ Max Tokens: ", end="") ASCIIColors.white(" ├─ Max Tokens: ", end="")
ASCIIColors.yellow(f"{args.max_tokens}") ASCIIColors.yellow(f"{args.max_tokens}")
ASCIIColors.white(" ├─ Max Embed Tokens: ", end="") ASCIIColors.white(" ├─ Max Embed Tokens: ", end="")
@@ -485,8 +492,6 @@ def display_splash_screen(args: argparse.Namespace) -> None:
ASCIIColors.yellow(f"{args.chunk_size}") ASCIIColors.yellow(f"{args.chunk_size}")
ASCIIColors.white(" ├─ Chunk Overlap Size: ", end="") ASCIIColors.white(" ├─ Chunk Overlap Size: ", end="")
ASCIIColors.yellow(f"{args.chunk_overlap_size}") ASCIIColors.yellow(f"{args.chunk_overlap_size}")
ASCIIColors.white(" ├─ History Turns: ", end="")
ASCIIColors.yellow(f"{args.history_turns}")
ASCIIColors.white(" ├─ Cosine Threshold: ", end="") ASCIIColors.white(" ├─ Cosine Threshold: ", end="")
ASCIIColors.yellow(f"{args.cosine_threshold}") ASCIIColors.yellow(f"{args.cosine_threshold}")
ASCIIColors.white(" ├─ Top-K: ", end="") ASCIIColors.white(" ├─ Top-K: ", end="")

View File

@@ -41,6 +41,9 @@ _pipeline_status_lock: Optional[LockType] = None
_graph_db_lock: Optional[LockType] = None _graph_db_lock: Optional[LockType] = None
_data_init_lock: Optional[LockType] = None _data_init_lock: Optional[LockType] = None
# async locks for coroutine synchronization in multiprocess mode
_async_locks: Optional[Dict[str, asyncio.Lock]] = None
class UnifiedLock(Generic[T]): class UnifiedLock(Generic[T]):
"""Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock""" """Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock"""
@@ -51,12 +54,14 @@ class UnifiedLock(Generic[T]):
is_async: bool, is_async: bool,
name: str = "unnamed", name: str = "unnamed",
enable_logging: bool = True, enable_logging: bool = True,
async_lock: Optional[asyncio.Lock] = None,
): ):
self._lock = lock self._lock = lock
self._is_async = is_async self._is_async = is_async
self._pid = os.getpid() # for debug only self._pid = os.getpid() # for debug only
self._name = name # for debug only self._name = name # for debug only
self._enable_logging = enable_logging # for debug only self._enable_logging = enable_logging # for debug only
self._async_lock = async_lock # auxiliary lock for coroutine synchronization
async def __aenter__(self) -> "UnifiedLock[T]": async def __aenter__(self) -> "UnifiedLock[T]":
try: try:
@@ -64,16 +69,39 @@ class UnifiedLock(Generic[T]):
f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (async={self._is_async})", f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (async={self._is_async})",
enable_output=self._enable_logging, enable_output=self._enable_logging,
) )
# If in multiprocess mode and async lock exists, acquire it first
if not self._is_async and self._async_lock is not None:
direct_log(
f"== Lock == Process {self._pid}: Acquiring async lock for '{self._name}'",
enable_output=self._enable_logging,
)
await self._async_lock.acquire()
direct_log(
f"== Lock == Process {self._pid}: Async lock for '{self._name}' acquired",
enable_output=self._enable_logging,
)
# Then acquire the main lock
if self._is_async: if self._is_async:
await self._lock.acquire() await self._lock.acquire()
else: else:
self._lock.acquire() self._lock.acquire()
direct_log( direct_log(
f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (async={self._is_async})", f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (async={self._is_async})",
enable_output=self._enable_logging, enable_output=self._enable_logging,
) )
return self return self
except Exception as e: except Exception as e:
# If main lock acquisition fails, release the async lock if it was acquired
if (
not self._is_async
and self._async_lock is not None
and self._async_lock.locked()
):
self._async_lock.release()
direct_log( direct_log(
f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}", f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}",
level="ERROR", level="ERROR",
@@ -82,15 +110,29 @@ class UnifiedLock(Generic[T]):
raise raise
async def __aexit__(self, exc_type, exc_val, exc_tb): async def __aexit__(self, exc_type, exc_val, exc_tb):
main_lock_released = False
try: try:
direct_log( direct_log(
f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (async={self._is_async})", f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (async={self._is_async})",
enable_output=self._enable_logging, enable_output=self._enable_logging,
) )
# Release main lock first
if self._is_async: if self._is_async:
self._lock.release() self._lock.release()
else: else:
self._lock.release() self._lock.release()
main_lock_released = True
# Then release async lock if in multiprocess mode
if not self._is_async and self._async_lock is not None:
direct_log(
f"== Lock == Process {self._pid}: Releasing async lock for '{self._name}'",
enable_output=self._enable_logging,
)
self._async_lock.release()
direct_log( direct_log(
f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})", f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})",
enable_output=self._enable_logging, enable_output=self._enable_logging,
@@ -101,6 +143,31 @@ class UnifiedLock(Generic[T]):
level="ERROR", level="ERROR",
enable_output=self._enable_logging, enable_output=self._enable_logging,
) )
# If main lock release failed but async lock hasn't been released, try to release it
if (
not main_lock_released
and not self._is_async
and self._async_lock is not None
):
try:
direct_log(
f"== Lock == Process {self._pid}: Attempting to release async lock after main lock failure",
level="WARNING",
enable_output=self._enable_logging,
)
self._async_lock.release()
direct_log(
f"== Lock == Process {self._pid}: Successfully released async lock after main lock failure",
enable_output=self._enable_logging,
)
except Exception as inner_e:
direct_log(
f"== Lock == Process {self._pid}: Failed to release async lock after main lock failure: {inner_e}",
level="ERROR",
enable_output=self._enable_logging,
)
raise raise
def __enter__(self) -> "UnifiedLock[T]": def __enter__(self) -> "UnifiedLock[T]":
@@ -151,51 +218,61 @@ class UnifiedLock(Generic[T]):
def get_internal_lock(enable_logging: bool = False) -> UnifiedLock: def get_internal_lock(enable_logging: bool = False) -> UnifiedLock:
"""return unified storage lock for data consistency""" """return unified storage lock for data consistency"""
async_lock = _async_locks.get("internal_lock") if is_multiprocess else None
return UnifiedLock( return UnifiedLock(
lock=_internal_lock, lock=_internal_lock,
is_async=not is_multiprocess, is_async=not is_multiprocess,
name="internal_lock", name="internal_lock",
enable_logging=enable_logging, enable_logging=enable_logging,
async_lock=async_lock,
) )
def get_storage_lock(enable_logging: bool = False) -> UnifiedLock: def get_storage_lock(enable_logging: bool = False) -> UnifiedLock:
"""return unified storage lock for data consistency""" """return unified storage lock for data consistency"""
async_lock = _async_locks.get("storage_lock") if is_multiprocess else None
return UnifiedLock( return UnifiedLock(
lock=_storage_lock, lock=_storage_lock,
is_async=not is_multiprocess, is_async=not is_multiprocess,
name="storage_lock", name="storage_lock",
enable_logging=enable_logging, enable_logging=enable_logging,
async_lock=async_lock,
) )
def get_pipeline_status_lock(enable_logging: bool = False) -> UnifiedLock: def get_pipeline_status_lock(enable_logging: bool = False) -> UnifiedLock:
"""return unified storage lock for data consistency""" """return unified storage lock for data consistency"""
async_lock = _async_locks.get("pipeline_status_lock") if is_multiprocess else None
return UnifiedLock( return UnifiedLock(
lock=_pipeline_status_lock, lock=_pipeline_status_lock,
is_async=not is_multiprocess, is_async=not is_multiprocess,
name="pipeline_status_lock", name="pipeline_status_lock",
enable_logging=enable_logging, enable_logging=enable_logging,
async_lock=async_lock,
) )
def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock: def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock:
"""return unified graph database lock for ensuring atomic operations""" """return unified graph database lock for ensuring atomic operations"""
async_lock = _async_locks.get("graph_db_lock") if is_multiprocess else None
return UnifiedLock( return UnifiedLock(
lock=_graph_db_lock, lock=_graph_db_lock,
is_async=not is_multiprocess, is_async=not is_multiprocess,
name="graph_db_lock", name="graph_db_lock",
enable_logging=enable_logging, enable_logging=enable_logging,
async_lock=async_lock,
) )
def get_data_init_lock(enable_logging: bool = False) -> UnifiedLock: def get_data_init_lock(enable_logging: bool = False) -> UnifiedLock:
"""return unified data initialization lock for ensuring atomic data initialization""" """return unified data initialization lock for ensuring atomic data initialization"""
async_lock = _async_locks.get("data_init_lock") if is_multiprocess else None
return UnifiedLock( return UnifiedLock(
lock=_data_init_lock, lock=_data_init_lock,
is_async=not is_multiprocess, is_async=not is_multiprocess,
name="data_init_lock", name="data_init_lock",
enable_logging=enable_logging, enable_logging=enable_logging,
async_lock=async_lock,
) )
@@ -229,7 +306,8 @@ def initialize_share_data(workers: int = 1):
_shared_dicts, \ _shared_dicts, \
_init_flags, \ _init_flags, \
_initialized, \ _initialized, \
_update_flags _update_flags, \
_async_locks
# Check if already initialized # Check if already initialized
if _initialized: if _initialized:
@@ -251,6 +329,16 @@ def initialize_share_data(workers: int = 1):
_shared_dicts = _manager.dict() _shared_dicts = _manager.dict()
_init_flags = _manager.dict() _init_flags = _manager.dict()
_update_flags = _manager.dict() _update_flags = _manager.dict()
# Initialize async locks for multiprocess mode
_async_locks = {
"internal_lock": asyncio.Lock(),
"storage_lock": asyncio.Lock(),
"pipeline_status_lock": asyncio.Lock(),
"graph_db_lock": asyncio.Lock(),
"data_init_lock": asyncio.Lock(),
}
direct_log( direct_log(
f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})" f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})"
) )
@@ -264,6 +352,7 @@ def initialize_share_data(workers: int = 1):
_shared_dicts = {} _shared_dicts = {}
_init_flags = {} _init_flags = {}
_update_flags = {} _update_flags = {}
_async_locks = None # No need for async locks in single process mode
direct_log(f"Process {os.getpid()} Shared-Data created for Single Process") direct_log(f"Process {os.getpid()} Shared-Data created for Single Process")
# Mark as initialized # Mark as initialized
@@ -458,7 +547,8 @@ def finalize_share_data():
_shared_dicts, \ _shared_dicts, \
_init_flags, \ _init_flags, \
_initialized, \ _initialized, \
_update_flags _update_flags, \
_async_locks
# Check if already initialized # Check if already initialized
if not _initialized: if not _initialized:
@@ -523,5 +613,6 @@ def finalize_share_data():
_graph_db_lock = None _graph_db_lock = None
_data_init_lock = None _data_init_lock = None
_update_flags = None _update_flags = None
_async_locks = None
direct_log(f"Process {os.getpid()} storage data finalization complete") direct_log(f"Process {os.getpid()} storage data finalization complete")