From 15a6a9cf7c4be8eecf33d9c4dae9ac68ea0e4302 Mon Sep 17 00:00:00 2001 From: yangdx Date: Wed, 26 Feb 2025 12:23:35 +0800 Subject: [PATCH] fix: log filtering void when uvicorn wokers is greater than 1 - Centralize logging setup - Fix logger propagation issues --- lightrag/api/lightrag_server.py | 86 +++++++++++++++++++++------------ lightrag/utils.py | 29 ++++++----- 2 files changed, 71 insertions(+), 44 deletions(-) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 65227e97..56f55833 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -30,7 +30,6 @@ from lightrag import LightRAG from lightrag.types import GPTKeywordExtractionFormat from lightrag.api import __api_version__ from lightrag.utils import EmbeddingFunc -from lightrag.utils import logger from .routers.document_routes import ( DocumentManager, create_document_routes, @@ -40,36 +39,40 @@ from .routers.query_routes import create_query_routes from .routers.graph_routes import create_graph_routes from .routers.ollama_api import OllamaAPI +from lightrag.utils import logger as utils_logger + # Load environment variables try: load_dotenv(override=True) except Exception as e: - logger.warning(f"Failed to load .env file: {e}") + utils_logger.warning(f"Failed to load .env file: {e}") # Initialize config parser config = configparser.ConfigParser() config.read("config.ini") -class AccessLogFilter(logging.Filter): +class LightragPathFilter(logging.Filter): + """Filter for lightrag logger to filter out frequent path access logs""" def __init__(self): super().__init__() # Define paths to be filtered self.filtered_paths = ["/documents", "/health", "/webui/"] - + def filter(self, record): try: + # Check if record has the required attributes for an access log if not hasattr(record, "args") or not isinstance(record.args, tuple): return True if len(record.args) < 5: return True + # Extract method, path and status from the record args method = record.args[1] path = record.args[2] status = record.args[4] - # print(f"Debug - Method: {method}, Path: {path}, Status: {status}") - # print(f"Debug - Filtered paths: {self.filtered_paths}") + # Filter out successful GET requests to filtered paths if ( method == "GET" and (status == 200 or status == 304) @@ -78,17 +81,23 @@ class AccessLogFilter(logging.Filter): return False return True - except Exception: + # In case of any error, let the message through return True def create_app(args): # Initialize verbose debug setting - from lightrag.utils import set_verbose_debug - + # Can not use the logger at the top of this module when workers > 1 + from lightrag.utils import set_verbose_debug, logger + # Setup logging + logger.setLevel(getattr(logging, args.log_level)) set_verbose_debug(args.verbose) + # Display splash screen + from lightrag.kg.shared_storage import is_multiprocess + logger.info(f"==== Multi-processor mode: {is_multiprocess} ====") + # Verify that bindings are correctly setup if args.llm_binding not in [ "lollms", @@ -120,11 +129,6 @@ def create_app(args): if not os.path.exists(args.ssl_keyfile): raise Exception(f"SSL key file not found: {args.ssl_keyfile}") - # Setup logging - logging.basicConfig( - format="%(levelname)s:%(message)s", level=getattr(logging, args.log_level) - ) - # Check if API key is provided either through env var or args api_key = os.getenv("LIGHTRAG_API_KEY") or args.key @@ -406,6 +410,9 @@ def create_app(args): def get_application(): """Factory function for creating the FastAPI application""" + # Configure logging for this worker process + configure_logging() + # Get args from environment variable args_json = os.environ.get('LIGHTRAG_ARGS') if not args_json: @@ -414,24 +421,22 @@ def get_application(): import types args = types.SimpleNamespace(**json.loads(args_json)) + # if args.workers > 1: + # from lightrag.kg.shared_storage import initialize_manager + # initialize_manager() + return create_app(args) -def main(): - from multiprocessing import freeze_support - freeze_support() +def configure_logging(): + """Configure logging for both uvicorn and lightrag""" + # Reset any existing handlers to ensure clean configuration + for logger_name in ["uvicorn.access", "lightrag"]: + logger = logging.getLogger(logger_name) + logger.handlers = [] + logger.filters = [] - args = parse_args() - # Save args to environment variable for child processes - os.environ['LIGHTRAG_ARGS'] = json.dumps(vars(args)) - - if args.workers > 1: - from lightrag.kg.shared_storage import initialize_manager - initialize_manager() - import lightrag.kg.shared_storage as shared_storage - shared_storage.is_multiprocess = True - - # Configure uvicorn logging + # Configure basic logging logging.config.dictConfig({ "version": 1, "disable_existing_loggers": False, @@ -452,13 +457,32 @@ def main(): "handlers": ["default"], "level": "INFO", "propagate": False, + "filters": ["path_filter"], + }, + "lightrag": { + "handlers": ["default"], + "level": "INFO", + "propagate": False, + "filters": ["path_filter"], + }, + }, + "filters": { + "path_filter": { + "()": "lightrag.api.lightrag_server.LightragPathFilter", }, }, }) - # Add filter to uvicorn access logger - uvicorn_access_logger = logging.getLogger("uvicorn.access") - uvicorn_access_logger.addFilter(AccessLogFilter()) +def main(): + from multiprocessing import freeze_support + freeze_support() + + args = parse_args() + # Save args to environment variable for child processes + os.environ['LIGHTRAG_ARGS'] = json.dumps(vars(args)) + + # Configure logging before starting uvicorn + configure_logging() display_splash_screen(args) diff --git a/lightrag/utils.py b/lightrag/utils.py index e7217def..bc78e2cb 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -55,22 +55,13 @@ def set_verbose_debug(enabled: bool): global VERBOSE_DEBUG VERBOSE_DEBUG = enabled - -class UnlimitedSemaphore: - """A context manager that allows unlimited access.""" - - async def __aenter__(self): - pass - - async def __aexit__(self, exc_type, exc, tb): - pass - - -ENCODER = None - statistic_data = {"llm_call": 0, "llm_cache": 0, "embed_call": 0} +# Initialize logger logger = logging.getLogger("lightrag") +logger.propagate = False # prevent log message send to root loggger +# Let the main application configure the handlers +logger.setLevel(logging.INFO) # Set httpx logging level to WARNING logging.getLogger("httpx").setLevel(logging.WARNING) @@ -97,6 +88,18 @@ def set_logger(log_file: str, level: int = logging.DEBUG): logger.addHandler(file_handler) +class UnlimitedSemaphore: + """A context manager that allows unlimited access.""" + + async def __aenter__(self): + pass + + async def __aexit__(self, exc_type, exc, tb): + pass + + +ENCODER = None + @dataclass class EmbeddingFunc: embedding_dim: int