From 7aec78833cb219bd95b4fe46074674955e8feb7e Mon Sep 17 00:00:00 2001 From: yangdx Date: Thu, 27 Feb 2025 13:25:22 +0800 Subject: [PATCH] Implement Gunicorn+Uvicorn integration for shared data preloading - Create run_with_gunicorn.py script to properly initialize shared data in the main process before forking worker processes - Revert unvicorn to single process mode only, and let gunicorn do all the multi-process jobs --- gunicorn_config.py | 80 +++++++++++++++ lightrag/api/lightrag_server.py | 23 ++++- lightrag/kg/shared_storage.py | 113 +++++++++++++++++---- run_with_gunicorn.py | 172 ++++++++++++++++++++++++++++++++ 4 files changed, 365 insertions(+), 23 deletions(-) create mode 100644 gunicorn_config.py create mode 100755 run_with_gunicorn.py diff --git a/gunicorn_config.py b/gunicorn_config.py new file mode 100644 index 00000000..8c1b22bf --- /dev/null +++ b/gunicorn_config.py @@ -0,0 +1,80 @@ +# gunicorn_config.py +import os +import multiprocessing +from lightrag.kg.shared_storage import finalize_share_data +from lightrag.api.utils_api import parse_args + +# Parse command line arguments +args = parse_args() + +# Determine worker count - from environment variable or command line arguments +workers = int(os.getenv('WORKERS', args.workers)) + +# If not specified, use CPU count * 2 + 1 (Gunicorn recommended configuration) +if workers <= 1: + workers = multiprocessing.cpu_count() * 2 + 1 + +# Binding address +bind = f"{os.getenv('HOST', args.host)}:{os.getenv('PORT', args.port)}" + +# Enable preload_app option +preload_app = True + +# Use Uvicorn worker +worker_class = "uvicorn.workers.UvicornWorker" + +# Other Gunicorn configurations +timeout = int(os.getenv('TIMEOUT', 120)) +keepalive = 5 + +# Optional SSL configuration +if args.ssl: + certfile = args.ssl_certfile + keyfile = args.ssl_keyfile + +# Logging configuration +errorlog = os.getenv('ERROR_LOG', '-') # '-' means stderr +accesslog = os.getenv('ACCESS_LOG', '-') # '-' means stderr +loglevel = os.getenv('LOG_LEVEL', 'info') + +def on_starting(server): + """ + Executed when Gunicorn starts, before forking the first worker processes + You can use this function to do more initialization tasks for all processes + """ + print("=" * 80) + print(f"GUNICORN MASTER PROCESS: on_starting jobs for all {workers} workers") + print(f"Process ID: {os.getpid()}") + print("=" * 80) + + # Memory usage monitoring + try: + import psutil + process = psutil.Process(os.getpid()) + memory_info = process.memory_info() + msg = f"Memory usage after initialization: {memory_info.rss / 1024 / 1024:.2f} MB" + print(msg) + except ImportError: + print("psutil not installed, skipping memory usage reporting") + + print("=" * 80) + print("Gunicorn initialization complete, forking workers...") + print("=" * 80) + + +def on_exit(server): + """ + Executed when Gunicorn is shutting down. + This is a good place to release shared resources. + """ + print("=" * 80) + print("GUNICORN MASTER PROCESS: Shutting down") + print(f"Process ID: {os.getpid()}") + print("=" * 80) + + # Release shared resources + finalize_share_data() + + print("=" * 80) + print("Gunicorn shutdown complete") + print("=" * 80) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 3af8887d..a9c9ab04 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -483,17 +483,28 @@ def main(): display_splash_screen(args) + # Check if running under Gunicorn + if 'GUNICORN_CMD_ARGS' in os.environ: + # If started with Gunicorn, return directly as Gunicorn will call get_application + print("Running under Gunicorn - worker management handled by Gunicorn") + return + + # If not running under Gunicorn, initialize shared data here from lightrag.kg.shared_storage import initialize_share_data - initialize_share_data(args.workers) - + print("Starting in single-process mode") + initialize_share_data(1) # Force single process mode + + # Create application instance directly instead of using factory function + app = create_app(args) + + # Start Uvicorn in single process mode uvicorn_config = { - "app": "lightrag.api.lightrag_server:get_application", - "factory": True, + "app": app, # Pass application instance directly instead of string path "host": args.host, "port": args.port, - "workers": args.workers, "log_config": None, # Disable default config } + if args.ssl: uvicorn_config.update( { @@ -501,6 +512,8 @@ def main(): "ssl_keyfile": args.ssl_keyfile, } ) + + print(f"Starting Uvicorn server in single-process mode on {args.host}:{args.port}") uvicorn.run(**uvicorn_config) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 6b5c07f6..8dc9e1a9 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -1,10 +1,19 @@ import os +import sys from multiprocessing.synchronize import Lock as ProcessLock from threading import Lock as ThreadLock from multiprocessing import Manager from typing import Any, Dict, Optional, Union from lightrag.utils import logger +# Define a direct print function for critical logs that must be visible in all processes +def direct_log(message, level="INFO"): + """ + Log a message directly to stderr to ensure visibility in all processes, + including the Gunicorn master process. + """ + print(f"{level}: {message}", file=sys.stderr, flush=True) + LockType = Union[ProcessLock, ThreadLock] _manager = None @@ -21,41 +30,60 @@ _global_lock: Optional[LockType] = None def initialize_share_data(workers: int = 1): - """Initialize storage data""" - global _manager, _is_multiprocess, is_multiprocess, _global_lock, _shared_dicts, _share_objects, _init_flags, _initialized + """ + Initialize shared storage data for single or multi-process mode. + When used with Gunicorn's preload feature, this function is called once in the + master process before forking worker processes, allowing all workers to share + the same initialized data. + + In single-process mode, this function is called during LightRAG object initialization. + + The function determines whether to use cross-process shared variables for data storage + based on the number of workers. If workers=1, it uses thread locks and local dictionaries. + If workers>1, it uses process locks and shared dictionaries managed by multiprocessing.Manager. + + Args: + workers (int): Number of worker processes. If 1, single-process mode is used. + If > 1, multi-process mode with shared memory is used. + """ + global _manager, _is_multiprocess, is_multiprocess, _global_lock, _shared_dicts, _share_objects, _init_flags, _initialized + + # Check if already initialized if _initialized and _initialized.value: is_multiprocess = _is_multiprocess.value - if _is_multiprocess.value: - logger.info(f"Process {os.getpid()} storage data already initialized!") - return - + direct_log(f"Process {os.getpid()} storage data already initialized (multiprocess={_is_multiprocess.value})!") + return + _manager = Manager() _initialized = _manager.Value("b", False) _is_multiprocess = _manager.Value("b", False) - if workers == 1: - _is_multiprocess.value = False - _global_lock = ThreadLock() - _shared_dicts = {} - _share_objects = {} - _init_flags = {} - logger.info(f"Process {os.getpid()} storage data created for Single Process") - else: + # Force multi-process mode if workers > 1 + if workers > 1: _is_multiprocess.value = True _global_lock = _manager.Lock() # Create shared dictionaries with manager _shared_dicts = _manager.dict() _share_objects = _manager.dict() - _init_flags = _manager.dict() # 使用共享字典存储初始化标志 - logger.info(f"Process {os.getpid()} storage data created for Multiple Process") + _init_flags = _manager.dict() # Use shared dictionary to store initialization flags + direct_log(f"Process {os.getpid()} storage data created for Multiple Process (workers={workers})") + else: + _is_multiprocess.value = False + _global_lock = ThreadLock() + _shared_dicts = {} + _share_objects = {} + _init_flags = {} + direct_log(f"Process {os.getpid()} storage data created for Single Process") + # Mark as initialized + _initialized.value = True is_multiprocess = _is_multiprocess.value def try_initialize_namespace(namespace: str) -> bool: """ - 尝试初始化命名空间。返回True表示当前进程获得了初始化权限。 - 使用共享字典的原子操作确保只有一个进程能成功初始化。 + Try to initialize a namespace. Returns True if the current process gets initialization permission. + Uses atomic operations on shared dictionaries to ensure only one process can successfully initialize. """ global _init_flags, _manager @@ -126,3 +154,52 @@ def get_namespace_data(namespace: str) -> Dict[str, Any]: def get_scan_progress() -> Dict[str, Any]: """get storage space for document scanning progress data""" return get_namespace_data("scan_progress") + + +def finalize_share_data(): + """ + Release shared resources and clean up. + + This function should be called when the application is shutting down + to properly release shared resources and avoid memory leaks. + + In multi-process mode, it shuts down the Manager and releases all shared objects. + In single-process mode, it simply resets the global variables. + """ + global _manager, _is_multiprocess, is_multiprocess, _global_lock, _shared_dicts, _share_objects, _init_flags, _initialized + + # Check if already initialized + if not (_initialized and _initialized.value): + direct_log(f"Process {os.getpid()} storage data not initialized, nothing to finalize") + return + + direct_log(f"Process {os.getpid()} finalizing storage data (multiprocess={_is_multiprocess.value})") + + # In multi-process mode, shut down the Manager + if _is_multiprocess.value and _manager is not None: + try: + # Clear shared dictionaries first + if _shared_dicts is not None: + _shared_dicts.clear() + if _share_objects is not None: + _share_objects.clear() + if _init_flags is not None: + _init_flags.clear() + + # Shut down the Manager + _manager.shutdown() + direct_log(f"Process {os.getpid()} Manager shutdown complete") + except Exception as e: + direct_log(f"Process {os.getpid()} Error shutting down Manager: {e}", level="ERROR") + + # Reset global variables + _manager = None + _initialized = None + _is_multiprocess = None + is_multiprocess = None + _shared_dicts = None + _share_objects = None + _init_flags = None + _global_lock = None + + direct_log(f"Process {os.getpid()} storage data finalization complete") diff --git a/run_with_gunicorn.py b/run_with_gunicorn.py new file mode 100755 index 00000000..44a49e93 --- /dev/null +++ b/run_with_gunicorn.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python +""" +Start LightRAG server with Gunicorn +""" +import os +import sys +import json +import signal +import argparse +import subprocess +from lightrag.api.utils_api import parse_args, display_splash_screen +from lightrag.kg.shared_storage import initialize_share_data, finalize_share_data + +# Signal handler for graceful shutdown +def signal_handler(sig, frame): + print("\n\n" + "="*80) + print("RECEIVED TERMINATION SIGNAL") + print(f"Process ID: {os.getpid()}") + print("="*80 + "\n") + + # Release shared resources + finalize_share_data() + + # Exit with success status + sys.exit(0) + +def main(): + # Register signal handlers for graceful shutdown + signal.signal(signal.SIGINT, signal_handler) # Ctrl+C + signal.signal(signal.SIGTERM, signal_handler) # kill command + # Create a parser to handle Gunicorn-specific parameters + parser = argparse.ArgumentParser( + description="Start LightRAG server with Gunicorn" + ) + parser.add_argument( + "--workers", + type=int, + help="Number of worker processes (overrides the default or config.ini setting)" + ) + parser.add_argument( + "--timeout", + type=int, + help="Worker timeout in seconds (default: 120)" + ) + parser.add_argument( + "--log-level", + choices=["debug", "info", "warning", "error", "critical"], + help="Gunicorn log level" + ) + + # Parse Gunicorn-specific arguments + gunicorn_args, remaining_args = parser.parse_known_args() + + # Pass remaining arguments to LightRAG's parse_args + sys.argv = [sys.argv[0]] + remaining_args + args = parse_args() + + # If workers specified, override args value + if gunicorn_args.workers: + args.workers = gunicorn_args.workers + os.environ["WORKERS"] = str(gunicorn_args.workers) + + # If timeout specified, set environment variable + if gunicorn_args.timeout: + os.environ["TIMEOUT"] = str(gunicorn_args.timeout) + + # If log-level specified, set environment variable + if gunicorn_args.log_level: + os.environ["LOG_LEVEL"] = gunicorn_args.log_level + + # Save all LightRAG args to environment variable for worker processes + # This is the key step for passing arguments to lightrag_server.py + os.environ["LIGHTRAG_ARGS"] = json.dumps(vars(args)) + + # Display startup information + display_splash_screen(args) + + print("🚀 Starting LightRAG with Gunicorn") + print(f"🔄 Worker management: Gunicorn (workers={args.workers})") + print("🔍 Preloading app: Enabled") + print("📝 Note: Using Gunicorn's preload feature for shared data initialization") + print("\n\n" + "="*80) + print("MAIN PROCESS INITIALIZATION") + print(f"Process ID: {os.getpid()}") + print(f"Workers setting: {args.workers}") + print("="*80 + "\n") + + # Start application with Gunicorn using direct Python API + # Ensure WORKERS environment variable is set before importing gunicorn_config + if args.workers > 1: + os.environ["WORKERS"] = str(args.workers) + + # Import Gunicorn's StandaloneApplication + from gunicorn.app.base import BaseApplication + + # Define a custom application class that loads our config + class GunicornApp(BaseApplication): + def __init__(self, app, options=None): + self.options = options or {} + self.application = app + super().__init__() + + def load_config(self): + # Define valid Gunicorn configuration options + valid_options = { + 'bind', 'workers', 'worker_class', 'timeout', 'keepalive', + 'preload_app', 'errorlog', 'accesslog', 'loglevel', + 'certfile', 'keyfile', 'limit_request_line', 'limit_request_fields', + 'limit_request_field_size', 'graceful_timeout', 'max_requests', + 'max_requests_jitter' + } + + # Special hooks that need to be set separately + special_hooks = { + 'on_starting', 'on_reload', 'on_exit', 'pre_fork', 'post_fork', + 'pre_exec', 'pre_request', 'post_request', 'worker_init', + 'worker_exit', 'nworkers_changed', 'child_exit' + } + + # Import the gunicorn_config module directly + import importlib.util + spec = importlib.util.spec_from_file_location("gunicorn_config", "gunicorn_config.py") + self.config_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(self.config_module) + + # Set configuration options + for key in dir(self.config_module): + if key in valid_options: + value = getattr(self.config_module, key) + # Skip functions like on_starting + if not callable(value): + self.cfg.set(key, value) + # Set special hooks + elif key in special_hooks: + value = getattr(self.config_module, key) + if callable(value): + self.cfg.set(key, value) + + # Override with command line arguments if provided + if gunicorn_args.workers: + self.cfg.set("workers", gunicorn_args.workers) + if gunicorn_args.timeout: + self.cfg.set("timeout", gunicorn_args.timeout) + if gunicorn_args.log_level: + self.cfg.set("loglevel", gunicorn_args.log_level) + + def load(self): + # Import the application + from lightrag.api.lightrag_server import get_application + return get_application() + + # Create the application + app = GunicornApp("") + + # Directly call initialize_share_data with the correct workers value + from lightrag.kg.shared_storage import initialize_share_data + + # Force workers to be an integer and greater than 1 for multi-process mode + workers_count = int(args.workers) + if workers_count > 1: + # Set a flag to indicate we're in the main process + os.environ["LIGHTRAG_MAIN_PROCESS"] = "1" + initialize_share_data(workers_count) + else: + initialize_share_data(1) + + # Run the application + print("\nStarting Gunicorn with direct Python API...") + app.run() + +if __name__ == "__main__": + main()