diff --git a/lightrag/api/auth.py b/lightrag/api/auth.py index 1e92d1b9..5d9b00ac 100644 --- a/lightrag/api/auth.py +++ b/lightrag/api/auth.py @@ -7,6 +7,7 @@ from dotenv import load_dotenv load_dotenv() + class TokenPayload(BaseModel): sub: str # Username exp: datetime # Expiration time diff --git a/lightrag/api/gunicorn_config.py b/lightrag/api/gunicorn_config.py index cb6c3d83..0aef108e 100644 --- a/lightrag/api/gunicorn_config.py +++ b/lightrag/api/gunicorn_config.py @@ -29,7 +29,9 @@ preload_app = True worker_class = "uvicorn.workers.UvicornWorker" # Other Gunicorn configurations -timeout = int(os.getenv("TIMEOUT", 150 * 2)) # Default 150s *2 to match run_with_gunicorn.py +timeout = int( + os.getenv("TIMEOUT", 150 * 2) +) # Default 150s *2 to match run_with_gunicorn.py keepalive = int(os.getenv("KEEPALIVE", 5)) # Default 5s # Logging configuration diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 248505ba..e0c8f545 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -479,19 +479,23 @@ async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager): max_parallel = global_args["max_parallel_insert"] # Calculate batch size as 2 * MAX_PARALLEL_INSERT batch_size = 2 * max_parallel - + # Process files in batches for i in range(0, total_files, batch_size): - batch_files = new_files[i:i+batch_size] + batch_files = new_files[i : i + batch_size] batch_num = i // batch_size + 1 total_batches = (total_files + batch_size - 1) // batch_size - - logger.info(f"Processing batch {batch_num}/{total_batches} with {len(batch_files)} files") + + logger.info( + f"Processing batch {batch_num}/{total_batches} with {len(batch_files)} files" + ) await pipeline_index_files(rag, batch_files) - + # Log progress processed = min(i + batch_size, total_files) - logger.info(f"Processed {processed}/{total_files} files ({processed/total_files*100:.1f}%)") + logger.info( + f"Processed {processed}/{total_files} files ({processed/total_files*100:.1f}%)" + ) except Exception as e: logger.error(f"Error during scanning process: {str(e)}") diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index d5006d62..e26645c8 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -69,7 +69,7 @@ class UnifiedLock(Generic[T]): f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (async={self._is_async})", enable_output=self._enable_logging, ) - + # If in multiprocess mode and async lock exists, acquire it first if not self._is_async and self._async_lock is not None: direct_log( @@ -81,13 +81,13 @@ class UnifiedLock(Generic[T]): f"== Lock == Process {self._pid}: Async lock for '{self._name}' acquired", enable_output=self._enable_logging, ) - + # Then acquire the main lock if self._is_async: await self._lock.acquire() else: self._lock.acquire() - + direct_log( f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (async={self._is_async})", enable_output=self._enable_logging, @@ -95,9 +95,13 @@ class UnifiedLock(Generic[T]): return self except Exception as e: # If main lock acquisition fails, release the async lock if it was acquired - if not self._is_async and self._async_lock is not None and self._async_lock.locked(): + if ( + not self._is_async + and self._async_lock is not None + and self._async_lock.locked() + ): self._async_lock.release() - + direct_log( f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}", level="ERROR", @@ -112,15 +116,15 @@ class UnifiedLock(Generic[T]): f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (async={self._is_async})", enable_output=self._enable_logging, ) - + # Release main lock first if self._is_async: self._lock.release() else: self._lock.release() - + main_lock_released = True - + # Then release async lock if in multiprocess mode if not self._is_async and self._async_lock is not None: direct_log( @@ -128,7 +132,7 @@ class UnifiedLock(Generic[T]): enable_output=self._enable_logging, ) self._async_lock.release() - + direct_log( f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})", enable_output=self._enable_logging, @@ -139,9 +143,13 @@ class UnifiedLock(Generic[T]): level="ERROR", enable_output=self._enable_logging, ) - + # If main lock release failed but async lock hasn't been released, try to release it - if not main_lock_released and not self._is_async and self._async_lock is not None: + if ( + not main_lock_released + and not self._is_async + and self._async_lock is not None + ): try: direct_log( f"== Lock == Process {self._pid}: Attempting to release async lock after main lock failure", @@ -159,7 +167,7 @@ class UnifiedLock(Generic[T]): level="ERROR", enable_output=self._enable_logging, ) - + raise def __enter__(self) -> "UnifiedLock[T]": @@ -321,7 +329,7 @@ def initialize_share_data(workers: int = 1): _shared_dicts = _manager.dict() _init_flags = _manager.dict() _update_flags = _manager.dict() - + # Initialize async locks for multiprocess mode _async_locks = { "internal_lock": asyncio.Lock(), @@ -330,7 +338,7 @@ def initialize_share_data(workers: int = 1): "graph_db_lock": asyncio.Lock(), "data_init_lock": asyncio.Lock(), } - + direct_log( f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})" )