diff --git a/gunicorn_config.py b/gunicorn_config.py index 0ca6f9d9..810fc721 100644 --- a/gunicorn_config.py +++ b/gunicorn_config.py @@ -28,58 +28,55 @@ keepalive = 5 errorlog = os.getenv("ERROR_LOG", log_file_path) # 默认写入到 lightrag.log accesslog = os.getenv("ACCESS_LOG", log_file_path) # 默认写入到 lightrag.log -# 配置日志系统 logconfig_dict = { - 'version': 1, - 'disable_existing_loggers': False, - 'formatters': { - 'standard': { - 'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s' + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "standard": {"format": "%(asctime)s [%(levelname)s] %(name)s: %(message)s"}, + }, + "handlers": { + "console": { + "class": "logging.StreamHandler", + "formatter": "standard", + "stream": "ext://sys.stdout", + }, + "file": { + "class": "logging.handlers.RotatingFileHandler", + "formatter": "standard", + "filename": log_file_path, + "maxBytes": 10485760, # 10MB + "backupCount": 5, + "encoding": "utf8", }, }, - 'handlers': { - 'console': { - 'class': 'logging.StreamHandler', - 'formatter': 'standard', - 'stream': 'ext://sys.stdout' - }, - 'file': { - 'class': 'logging.handlers.RotatingFileHandler', - 'formatter': 'standard', - 'filename': log_file_path, - 'maxBytes': 10485760, # 10MB - 'backupCount': 5, - 'encoding': 'utf8' - } - }, - 'filters': { - 'path_filter': { - '()': 'lightrag.api.lightrag_server.LightragPathFilter', + "filters": { + "path_filter": { + "()": "lightrag.api.lightrag_server.LightragPathFilter", }, }, - 'loggers': { - 'lightrag': { - 'handlers': ['console', 'file'], - 'level': loglevel.upper() if loglevel else 'INFO', - 'propagate': False + "loggers": { + "lightrag": { + "handlers": ["console", "file"], + "level": loglevel.upper() if loglevel else "INFO", + "propagate": False, }, - 'gunicorn': { - 'handlers': ['console', 'file'], - 'level': loglevel.upper() if loglevel else 'INFO', - 'propagate': False + "gunicorn": { + "handlers": ["console", "file"], + "level": loglevel.upper() if loglevel else "INFO", + "propagate": False, }, - 'gunicorn.error': { - 'handlers': ['console', 'file'], - 'level': loglevel.upper() if loglevel else 'INFO', - 'propagate': False + "gunicorn.error": { + "handlers": ["console", "file"], + "level": loglevel.upper() if loglevel else "INFO", + "propagate": False, }, - 'gunicorn.access': { - 'handlers': ['console', 'file'], - 'level': loglevel.upper() if loglevel else 'INFO', - 'propagate': False, - 'filters': ['path_filter'] - } - } + "gunicorn.access": { + "handlers": ["console", "file"], + "level": loglevel.upper() if loglevel else "INFO", + "propagate": False, + "filters": ["path_filter"], + }, + }, } @@ -134,14 +131,15 @@ def post_fork(server, worker): """ # Set lightrag logger level in worker processes using gunicorn's loglevel from lightrag.utils import logger + logger.setLevel(loglevel.upper()) - + # Disable uvicorn.error logger in worker processes uvicorn_error_logger = logging.getLogger("uvicorn.error") uvicorn_error_logger.setLevel(logging.CRITICAL) uvicorn_error_logger.handlers = [] uvicorn_error_logger.propagate = False - + # Add log filter to uvicorn.access handler in worker processes uvicorn_access_logger = logging.getLogger("uvicorn.access") path_filter = LightragPathFilter() diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 8f7a6781..d00d39d1 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -9,7 +9,6 @@ from fastapi import ( from fastapi.responses import FileResponse import asyncio import os -import json import logging import logging.config import uvicorn @@ -139,17 +138,20 @@ def create_app(args): # Auto scan documents if enabled if args.auto_scan_at_startup: # Import necessary functions from shared_storage - from lightrag.kg.shared_storage import get_namespace_data, get_storage_lock - + from lightrag.kg.shared_storage import ( + get_namespace_data, + get_storage_lock, + ) + # Get pipeline status and lock pipeline_status = get_namespace_data("pipeline_status") storage_lock = get_storage_lock() - + # Check if a task is already running (with lock protection) should_start_task = False with storage_lock: if not pipeline_status.get("busy", False): - should_start_task = True + should_start_task = True # Only start the task if no other task is running if should_start_task: # Create background task @@ -430,7 +432,7 @@ def configure_logging(): # Configure basic logging log_file_path = os.path.abspath(os.path.join(os.getcwd(), "lightrag.log")) - + logging.config.dictConfig( { "version": 1, @@ -453,7 +455,7 @@ def configure_logging(): "formatter": "detailed", "class": "logging.handlers.RotatingFileHandler", "filename": log_file_path, - "maxBytes": 10*1024*1024, # 10MB + "maxBytes": 10 * 1024 * 1024, # 10MB "backupCount": 5, "encoding": "utf-8", }, diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 3126b8ce..3fdbdf9e 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -406,7 +406,6 @@ def create_document_routes( background_tasks.add_task(run_scanning_process, rag, doc_manager) return {"status": "scanning_started"} - @router.post("/upload", dependencies=[Depends(optional_api_key)]) async def upload_to_input_dir( background_tasks: BackgroundTasks, file: UploadFile = File(...) @@ -657,29 +656,30 @@ def create_document_routes( async def get_pipeline_status(): """ Get the current status of the document indexing pipeline. - + This endpoint returns information about the current state of the document processing pipeline, including whether it's busy, the current job name, when it started, how many documents are being processed, how many batches there are, and which batch is currently being processed. - + Returns: dict: A dictionary containing the pipeline status information """ try: from lightrag.kg.shared_storage import get_namespace_data + pipeline_status = get_namespace_data("pipeline_status") - + # Convert to regular dict if it's a Manager.dict status_dict = dict(pipeline_status) - + # Convert history_messages to a regular list if it's a Manager.list if "history_messages" in status_dict: status_dict["history_messages"] = list(status_dict["history_messages"]) - + # Format the job_start time if it exists if status_dict.get("job_start"): status_dict["job_start"] = str(status_dict["job_start"]) - + return status_dict except Exception as e: logger.error(f"Error getting pipeline status: {str(e)}") diff --git a/lightrag/api/utils_api.py b/lightrag/api/utils_api.py index 4b5e0a28..ed1250d4 100644 --- a/lightrag/api/utils_api.py +++ b/lightrag/api/utils_api.py @@ -295,7 +295,9 @@ def parse_args(is_uvicorn_mode: bool = False) -> argparse.Namespace: original_workers = args.workers args.workers = 1 # Log warning directly here - logging.warning(f"In uvicorn mode, workers parameter was set to {original_workers}. Forcing workers=1") + logging.warning( + f"In uvicorn mode, workers parameter was set to {original_workers}. Forcing workers=1" + ) # convert relative path to absolute path args.working_dir = os.path.abspath(args.working_dir) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index ee5bc397..2dfcae44 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -274,6 +274,7 @@ class LightRAG: from lightrag.kg.shared_storage import ( initialize_share_data, ) + initialize_share_data() if not os.path.exists(self.working_dir): @@ -671,44 +672,45 @@ class LightRAG: 4. Update the document status """ from lightrag.kg.shared_storage import get_namespace_data, get_storage_lock - + # Get pipeline status shared data and lock pipeline_status = get_namespace_data("pipeline_status") storage_lock = get_storage_lock() - + # Check if another process is already processing the queue process_documents = False with storage_lock: + # Ensure only one worker is processing documents if not pipeline_status.get("busy", False): - # No other process is busy, we can process documents - # 获取当前的 history_messages 列表 + # Cleaning history_messages without breaking it as a shared list object current_history = pipeline_status.get("history_messages", []) - - # 清空当前列表内容但保持同一个列表对象 if hasattr(current_history, "clear"): current_history.clear() - - pipeline_status.update({ - "busy": True, - "job_name": "indexing files", - "job_start": datetime.now().isoformat(), - "docs": 0, - "batchs": 0, - "cur_batch": 0, - "request_pending": False, # Clear any previous request - "latest_message": "", - # 保持使用同一个列表对象 - "history_messages": current_history, - }) + + pipeline_status.update( + { + "busy": True, + "job_name": "indexing files", + "job_start": datetime.now().isoformat(), + "docs": 0, + "batchs": 0, + "cur_batch": 0, + "request_pending": False, # Clear any previous request + "latest_message": "", + "history_messages": current_history, # keep it as a shared list object + } + ) process_documents = True else: # Another process is busy, just set request flag and return pipeline_status["request_pending"] = True - logger.info("Another process is already processing the document queue. Request queued.") - + logger.info( + "Another process is already processing the document queue. Request queued." + ) + if not process_documents: return - + try: # Process documents until no more documents or requests while True: @@ -734,7 +736,7 @@ class LightRAG: # Update pipeline status with document count (with lock) with storage_lock: pipeline_status["docs"] = len(to_process_docs) - + # 2. split docs into chunks, insert chunks, update doc status docs_batches = [ list(to_process_docs.items())[i : i + self.max_parallel_insert] @@ -742,11 +744,8 @@ class LightRAG: ] # Update pipeline status with batch information (directly, as it's atomic) - pipeline_status.update({ - "batchs": len(docs_batches), - "cur_batch": 0 - }) - + pipeline_status.update({"batchs": len(docs_batches), "cur_batch": 0}) + log_message = f"Number of batches to process: {len(docs_batches)}." logger.info(log_message) pipeline_status["latest_message"] = log_message @@ -757,13 +756,15 @@ class LightRAG: for batch_idx, docs_batch in enumerate(docs_batches): # Update current batch in pipeline status (directly, as it's atomic) pipeline_status["cur_batch"] = batch_idx + 1 - + async def batch( batch_idx: int, docs_batch: list[tuple[str, DocProcessingStatus]], size_batch: int, ) -> None: - log_message = f"Start processing batch {batch_idx + 1} of {size_batch}." + log_message = ( + f"Start processing batch {batch_idx + 1} of {size_batch}." + ) logger.info(log_message) pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) @@ -822,7 +823,9 @@ class LightRAG: } ) except Exception as e: - logger.error(f"Failed to process document {doc_id}: {str(e)}") + logger.error( + f"Failed to process document {doc_id}: {str(e)}" + ) await self.doc_status.upsert( { doc_id: { @@ -837,7 +840,9 @@ class LightRAG: } ) continue - log_message = f"Completed batch {batch_idx + 1} of {len(docs_batches)}." + log_message = ( + f"Completed batch {batch_idx + 1} of {len(docs_batches)}." + ) logger.info(log_message) pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) @@ -846,7 +851,7 @@ class LightRAG: await asyncio.gather(*batches) await self._insert_done() - + # Check if there's a pending request to process more documents (with lock) has_pending_request = False with storage_lock: @@ -854,15 +859,15 @@ class LightRAG: if has_pending_request: # Clear the request flag before checking for more documents pipeline_status["request_pending"] = False - + if not has_pending_request: break - + log_message = "Processing additional documents due to pending request" logger.info(log_message) pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) - + finally: # Always reset busy status when done or if an exception occurs (with lock) with storage_lock: @@ -901,12 +906,13 @@ class LightRAG: if storage_inst is not None ] await asyncio.gather(*tasks) - + log_message = "All Insert done" logger.info(log_message) - + # 获取 pipeline_status 并更新 latest_message 和 history_messages from lightrag.kg.shared_storage import get_namespace_data + pipeline_status = get_namespace_data("pipeline_status") pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) diff --git a/lightrag/operate.py b/lightrag/operate.py index 44a68655..59dfb063 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -336,8 +336,9 @@ async def extract_entities( global_config: dict[str, str], llm_response_cache: BaseKVStorage | None = None, ) -> None: - # 在函数开始处添加获取 pipeline_status 的代码 + from lightrag.kg.shared_storage import get_namespace_data + pipeline_status = get_namespace_data("pipeline_status") use_llm_func: callable = global_config["llm_model_func"] entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"] diff --git a/lightrag/utils.py b/lightrag/utils.py index 3ec96112..5e579a06 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -75,50 +75,42 @@ def set_logger(log_file: str, level: int = logging.DEBUG): log_file: Path to the log file level: Logging level (e.g. logging.DEBUG, logging.INFO) """ - # 设置日志级别 + logger.setLevel(level) - - # 确保使用绝对路径 log_file = os.path.abspath(log_file) - - # 创建格式化器 formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) - - # 检查是否已经有文件处理器 has_file_handler = False has_console_handler = False - - # 检查现有处理器 + for handler in logger.handlers: if isinstance(handler, logging.FileHandler): has_file_handler = True - elif isinstance(handler, logging.StreamHandler) and not isinstance(handler, logging.FileHandler): + elif isinstance(handler, logging.StreamHandler) and not isinstance( + handler, logging.FileHandler + ): has_console_handler = True - - # 如果没有文件处理器,添加一个 + if not has_file_handler: - # 使用 RotatingFileHandler 代替 FileHandler from logging.handlers import RotatingFileHandler + file_handler = RotatingFileHandler( - log_file, - maxBytes=10*1024*1024, # 10MB + log_file, + maxBytes=10 * 1024 * 1024, # 10MB backupCount=5, - encoding="utf-8" + encoding="utf-8", ) file_handler.setLevel(level) file_handler.setFormatter(formatter) logger.addHandler(file_handler) - - # 如果没有控制台处理器,添加一个 + if not has_console_handler: console_handler = logging.StreamHandler() console_handler.setLevel(level) console_handler.setFormatter(formatter) logger.addHandler(console_handler) - - # 设置日志传播为 False,避免重复输出 + logger.propagate = False diff --git a/run_with_gunicorn.py b/run_with_gunicorn.py index de2b21b6..94cbdedf 100755 --- a/run_with_gunicorn.py +++ b/run_with_gunicorn.py @@ -5,9 +5,7 @@ Start LightRAG server with Gunicorn import os import sys -import json import signal -import argparse from lightrag.api.utils_api import parse_args, display_splash_screen from lightrag.kg.shared_storage import initialize_share_data, finalize_share_data @@ -34,7 +32,6 @@ def main(): # Parse all arguments using parse_args args = parse_args(is_uvicorn_mode=False) - # Display startup information display_splash_screen(args) @@ -101,9 +98,15 @@ def main(): # Set configuration variables in gunicorn_config gunicorn_config.workers = int(os.getenv("WORKERS", args.workers)) - gunicorn_config.bind = f"{os.getenv('HOST', args.host)}:{os.getenv('PORT', args.port)}" - gunicorn_config.loglevel = args.log_level.lower() if args.log_level else os.getenv("LOG_LEVEL", "info") - + gunicorn_config.bind = ( + f"{os.getenv('HOST', args.host)}:{os.getenv('PORT', args.port)}" + ) + gunicorn_config.loglevel = ( + args.log_level.lower() + if args.log_level + else os.getenv("LOG_LEVEL", "info") + ) + # Set SSL configuration if enabled if args.ssl: gunicorn_config.certfile = args.ssl_certfile @@ -121,10 +124,12 @@ def main(): value = getattr(gunicorn_config, key) if callable(value): self.cfg.set(key, value) - - # 确保正确加载 logconfig_dict - if hasattr(gunicorn_config, 'logconfig_dict'): - self.cfg.set('logconfig_dict', getattr(gunicorn_config, 'logconfig_dict')) + + + if hasattr(gunicorn_config, "logconfig_dict"): + self.cfg.set( + "logconfig_dict", getattr(gunicorn_config, "logconfig_dict") + ) def load(self): # Import the application