From 157ec862aec115e374775523acaf05b35071dda3 Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 28 Feb 2025 14:57:25 +0800 Subject: [PATCH] Enhance logging system with file rotation and unified configuration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Unify logging across Gunicorn and Uvicorn • Add rotating file handlers --- gunicorn_config.py | 60 +++++++++++++++++++++++- lightrag/api/lightrag_server.py | 37 +++++++++++++-- lightrag/api/routers/document_routes.py | 62 ++++++++++++------------- lightrag/utils.py | 47 ++++++++++++++++--- run_with_gunicorn.py | 4 ++ 5 files changed, 166 insertions(+), 44 deletions(-) diff --git a/gunicorn_config.py b/gunicorn_config.py index 9cdb18e8..fdb0140a 100644 --- a/gunicorn_config.py +++ b/gunicorn_config.py @@ -1,5 +1,8 @@ # gunicorn_config.py import os +import logging +from logging.config import dictConfig +from logging.handlers import RotatingFileHandler from lightrag.kg.shared_storage import finalize_share_data from lightrag.api.utils_api import parse_args @@ -27,11 +30,64 @@ if args.ssl: certfile = args.ssl_certfile keyfile = args.ssl_keyfile +# 获取日志文件路径 +log_file_path = os.path.abspath(os.path.join(os.getcwd(), "lightrag.log")) + # Logging configuration -errorlog = os.getenv("ERROR_LOG", "-") # '-' means stderr -accesslog = os.getenv("ACCESS_LOG", "-") # '-' means stderr +errorlog = os.getenv("ERROR_LOG", log_file_path) # 默认写入到 lightrag.log +accesslog = os.getenv("ACCESS_LOG", log_file_path) # 默认写入到 lightrag.log loglevel = os.getenv("LOG_LEVEL", "info") +# 配置日志系统 +logconfig_dict = { + 'version': 1, + 'disable_existing_loggers': False, + 'formatters': { + 'standard': { + 'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s' + }, + }, + 'handlers': { + 'console': { + 'class': 'logging.StreamHandler', + 'level': 'INFO', + 'formatter': 'standard', + 'stream': 'ext://sys.stdout' + }, + 'file': { + 'class': 'logging.handlers.RotatingFileHandler', + 'level': 'INFO', + 'formatter': 'standard', + 'filename': log_file_path, + 'maxBytes': 10485760, # 10MB + 'backupCount': 5, + 'encoding': 'utf8' + } + }, + 'loggers': { + 'lightrag': { + 'handlers': ['console', 'file'], + 'level': 'INFO', + 'propagate': False + }, + 'uvicorn': { + 'handlers': ['console', 'file'], + 'level': 'INFO', + 'propagate': False + }, + 'gunicorn': { + 'handlers': ['console', 'file'], + 'level': 'INFO', + 'propagate': False + }, + 'gunicorn.error': { + 'handlers': ['console', 'file'], + 'level': 'INFO', + 'propagate': False + } + } +} + def on_starting(server): """ diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 9af1a90e..66fcacde 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -438,13 +438,20 @@ def get_application(): def configure_logging(): """Configure logging for both uvicorn and lightrag""" + # Check if running under Gunicorn + if "GUNICORN_CMD_ARGS" in os.environ: + # If started with Gunicorn, return directly as Gunicorn will handle logging + return + # Reset any existing handlers to ensure clean configuration - for logger_name in ["uvicorn.access", "lightrag"]: + for logger_name in ["uvicorn", "uvicorn.access", "uvicorn.error", "lightrag"]: logger = logging.getLogger(logger_name) logger.handlers = [] logger.filters = [] # Configure basic logging + log_file_path = os.path.abspath(os.path.join(os.getcwd(), "lightrag.log")) + logging.config.dictConfig( { "version": 1, @@ -453,23 +460,45 @@ def configure_logging(): "default": { "format": "%(levelname)s: %(message)s", }, + "detailed": { + "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + }, }, "handlers": { - "default": { + "console": { "formatter": "default", "class": "logging.StreamHandler", "stream": "ext://sys.stderr", }, + "file": { + "formatter": "detailed", + "class": "logging.handlers.RotatingFileHandler", + "filename": log_file_path, + "maxBytes": 10*1024*1024, # 10MB + "backupCount": 5, + "encoding": "utf-8", + }, }, "loggers": { + # Configure all uvicorn related loggers + "uvicorn": { + "handlers": ["console", "file"], + "level": "INFO", + "propagate": False, + }, "uvicorn.access": { - "handlers": ["default"], + "handlers": ["console", "file"], "level": "INFO", "propagate": False, "filters": ["path_filter"], }, + "uvicorn.error": { + "handlers": ["console", "file"], + "level": "INFO", + "propagate": False, + }, "lightrag": { - "handlers": ["default"], + "handlers": ["console", "file"], "level": "INFO", "propagate": False, "filters": ["path_filter"], diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index e274f4c4..3126b8ce 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -3,7 +3,7 @@ This module contains all document-related routes for the LightRAG API. """ import asyncio -import logging +from lightrag.utils import logger import aiofiles import shutil import traceback @@ -147,7 +147,7 @@ class DocumentManager: """Scan input directory for new files""" new_files = [] for ext in self.supported_extensions: - logging.debug(f"Scanning for {ext} files in {self.input_dir}") + logger.debug(f"Scanning for {ext} files in {self.input_dir}") for file_path in self.input_dir.rglob(f"*{ext}"): if file_path not in self.indexed_files: new_files.append(file_path) @@ -266,7 +266,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool: ) content += "\n" case _: - logging.error( + logger.error( f"Unsupported file type: {file_path.name} (extension {ext})" ) return False @@ -274,20 +274,20 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool: # Insert into the RAG queue if content: await rag.apipeline_enqueue_documents(content) - logging.info(f"Successfully fetched and enqueued file: {file_path.name}") + logger.info(f"Successfully fetched and enqueued file: {file_path.name}") return True else: - logging.error(f"No content could be extracted from file: {file_path.name}") + logger.error(f"No content could be extracted from file: {file_path.name}") except Exception as e: - logging.error(f"Error processing or enqueueing file {file_path.name}: {str(e)}") - logging.error(traceback.format_exc()) + logger.error(f"Error processing or enqueueing file {file_path.name}: {str(e)}") + logger.error(traceback.format_exc()) finally: if file_path.name.startswith(temp_prefix): try: file_path.unlink() except Exception as e: - logging.error(f"Error deleting file {file_path}: {str(e)}") + logger.error(f"Error deleting file {file_path}: {str(e)}") return False @@ -303,8 +303,8 @@ async def pipeline_index_file(rag: LightRAG, file_path: Path): await rag.apipeline_process_enqueue_documents() except Exception as e: - logging.error(f"Error indexing file {file_path.name}: {str(e)}") - logging.error(traceback.format_exc()) + logger.error(f"Error indexing file {file_path.name}: {str(e)}") + logger.error(traceback.format_exc()) async def pipeline_index_files(rag: LightRAG, file_paths: List[Path]): @@ -328,8 +328,8 @@ async def pipeline_index_files(rag: LightRAG, file_paths: List[Path]): if enqueued: await rag.apipeline_process_enqueue_documents() except Exception as e: - logging.error(f"Error indexing files: {str(e)}") - logging.error(traceback.format_exc()) + logger.error(f"Error indexing files: {str(e)}") + logger.error(traceback.format_exc()) async def pipeline_index_texts(rag: LightRAG, texts: List[str]): @@ -373,16 +373,16 @@ async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager): try: new_files = doc_manager.scan_directory_for_new_files() total_files = len(new_files) - logging.info(f"Found {total_files} new files to index.") + logger.info(f"Found {total_files} new files to index.") for idx, file_path in enumerate(new_files): try: await pipeline_index_file(rag, file_path) except Exception as e: - logging.error(f"Error indexing file {file_path}: {str(e)}") + logger.error(f"Error indexing file {file_path}: {str(e)}") except Exception as e: - logging.error(f"Error during scanning process: {str(e)}") + logger.error(f"Error during scanning process: {str(e)}") def create_document_routes( @@ -447,8 +447,8 @@ def create_document_routes( message=f"File '{file.filename}' uploaded successfully. Processing will continue in background.", ) except Exception as e: - logging.error(f"Error /documents/upload: {file.filename}: {str(e)}") - logging.error(traceback.format_exc()) + logger.error(f"Error /documents/upload: {file.filename}: {str(e)}") + logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) @router.post( @@ -480,8 +480,8 @@ def create_document_routes( message="Text successfully received. Processing will continue in background.", ) except Exception as e: - logging.error(f"Error /documents/text: {str(e)}") - logging.error(traceback.format_exc()) + logger.error(f"Error /documents/text: {str(e)}") + logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) @router.post( @@ -515,8 +515,8 @@ def create_document_routes( message="Text successfully received. Processing will continue in background.", ) except Exception as e: - logging.error(f"Error /documents/text: {str(e)}") - logging.error(traceback.format_exc()) + logger.error(f"Error /documents/text: {str(e)}") + logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) @router.post( @@ -558,8 +558,8 @@ def create_document_routes( message=f"File '{file.filename}' saved successfully. Processing will continue in background.", ) except Exception as e: - logging.error(f"Error /documents/file: {str(e)}") - logging.error(traceback.format_exc()) + logger.error(f"Error /documents/file: {str(e)}") + logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) @router.post( @@ -621,8 +621,8 @@ def create_document_routes( return InsertResponse(status=status, message=status_message) except Exception as e: - logging.error(f"Error /documents/batch: {str(e)}") - logging.error(traceback.format_exc()) + logger.error(f"Error /documents/batch: {str(e)}") + logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) @router.delete( @@ -649,8 +649,8 @@ def create_document_routes( status="success", message="All documents cleared successfully" ) except Exception as e: - logging.error(f"Error DELETE /documents: {str(e)}") - logging.error(traceback.format_exc()) + logger.error(f"Error DELETE /documents: {str(e)}") + logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) @router.get("/pipeline_status", dependencies=[Depends(optional_api_key)]) @@ -682,8 +682,8 @@ def create_document_routes( return status_dict except Exception as e: - logging.error(f"Error getting pipeline status: {str(e)}") - logging.error(traceback.format_exc()) + logger.error(f"Error getting pipeline status: {str(e)}") + logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) @router.get("", dependencies=[Depends(optional_api_key)]) @@ -739,8 +739,8 @@ def create_document_routes( ) return response except Exception as e: - logging.error(f"Error GET /documents: {str(e)}") - logging.error(traceback.format_exc()) + logger.error(f"Error GET /documents: {str(e)}") + logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) return router diff --git a/lightrag/utils.py b/lightrag/utils.py index a6265048..3ec96112 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -75,18 +75,51 @@ def set_logger(log_file: str, level: int = logging.DEBUG): log_file: Path to the log file level: Logging level (e.g. logging.DEBUG, logging.INFO) """ + # 设置日志级别 logger.setLevel(level) - - file_handler = logging.FileHandler(log_file, encoding="utf-8") - file_handler.setLevel(level) - + + # 确保使用绝对路径 + log_file = os.path.abspath(log_file) + + # 创建格式化器 formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) - file_handler.setFormatter(formatter) - - if not logger.handlers: + + # 检查是否已经有文件处理器 + has_file_handler = False + has_console_handler = False + + # 检查现有处理器 + for handler in logger.handlers: + if isinstance(handler, logging.FileHandler): + has_file_handler = True + elif isinstance(handler, logging.StreamHandler) and not isinstance(handler, logging.FileHandler): + has_console_handler = True + + # 如果没有文件处理器,添加一个 + if not has_file_handler: + # 使用 RotatingFileHandler 代替 FileHandler + from logging.handlers import RotatingFileHandler + file_handler = RotatingFileHandler( + log_file, + maxBytes=10*1024*1024, # 10MB + backupCount=5, + encoding="utf-8" + ) + file_handler.setLevel(level) + file_handler.setFormatter(formatter) logger.addHandler(file_handler) + + # 如果没有控制台处理器,添加一个 + if not has_console_handler: + console_handler = logging.StreamHandler() + console_handler.setLevel(level) + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + + # 设置日志传播为 False,避免重复输出 + logger.propagate = False class UnlimitedSemaphore: diff --git a/run_with_gunicorn.py b/run_with_gunicorn.py index 705cb88f..7b98cb1c 100755 --- a/run_with_gunicorn.py +++ b/run_with_gunicorn.py @@ -157,6 +157,10 @@ def main(): value = getattr(self.config_module, key) if callable(value): self.cfg.set(key, value) + + # 确保正确加载 logconfig_dict + if hasattr(self.config_module, 'logconfig_dict'): + self.cfg.set('logconfig_dict', getattr(self.config_module, 'logconfig_dict')) # Override with command line arguments if provided if gunicorn_args.workers: