diff --git a/gunicorn_config.py b/gunicorn_config.py index f4c9178e..7239acd9 100644 --- a/gunicorn_config.py +++ b/gunicorn_config.py @@ -8,7 +8,7 @@ from lightrag.api.utils_api import parse_args args = parse_args() # Determine worker count - from environment variable or command line arguments -workers = int(os.getenv('WORKERS', args.workers)) +workers = int(os.getenv("WORKERS", args.workers)) # If not specified, use CPU count * 2 + 1 (Gunicorn recommended configuration) if workers <= 1: @@ -24,7 +24,7 @@ preload_app = True worker_class = "uvicorn.workers.UvicornWorker" # Other Gunicorn configurations -timeout = int(os.getenv('TIMEOUT', 120)) +timeout = int(os.getenv("TIMEOUT", 120)) keepalive = 5 # Optional SSL configuration @@ -33,9 +33,10 @@ if args.ssl: keyfile = args.ssl_keyfile # Logging configuration -errorlog = os.getenv('ERROR_LOG', '-') # '-' means stderr -accesslog = os.getenv('ACCESS_LOG', '-') # '-' means stderr -loglevel = os.getenv('LOG_LEVEL', 'info') +errorlog = os.getenv("ERROR_LOG", "-") # '-' means stderr +accesslog = os.getenv("ACCESS_LOG", "-") # '-' means stderr +loglevel = os.getenv("LOG_LEVEL", "info") + def on_starting(server): """ @@ -46,21 +47,25 @@ def on_starting(server): print(f"GUNICORN MASTER PROCESS: on_starting jobs for all {workers} workers") print(f"Process ID: {os.getpid()}") print("=" * 80) - + # Memory usage monitoring try: import psutil + process = psutil.Process(os.getpid()) memory_info = process.memory_info() - msg = f"Memory usage after initialization: {memory_info.rss / 1024 / 1024:.2f} MB" + msg = ( + f"Memory usage after initialization: {memory_info.rss / 1024 / 1024:.2f} MB" + ) print(msg) except ImportError: print("psutil not installed, skipping memory usage reporting") - + print("=" * 80) print("Gunicorn initialization complete, forking workers...") print("=" * 80) + def on_exit(server): """ Executed when Gunicorn is shutting down. @@ -70,10 +75,10 @@ def on_exit(server): print("GUNICORN MASTER PROCESS: Shutting down") print(f"Process ID: {os.getpid()}") print("=" * 80) - + # Release shared resources finalize_share_data() - + print("=" * 80) print("Gunicorn shutdown complete") print("=" * 80) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 9f162290..155e22f5 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -471,12 +471,13 @@ def configure_logging(): def main(): # Check if running under Gunicorn - if 'GUNICORN_CMD_ARGS' in os.environ: + if "GUNICORN_CMD_ARGS" in os.environ: # If started with Gunicorn, return directly as Gunicorn will call get_application print("Running under Gunicorn - worker management handled by Gunicorn") return from multiprocessing import freeze_support + freeze_support() args = parse_args() @@ -487,10 +488,10 @@ def main(): configure_logging() display_splash_screen(args) - + # Create application instance directly instead of using factory function app = create_app(args) - + # Start Uvicorn in single process mode uvicorn_config = { "app": app, # Pass application instance directly instead of string path @@ -498,7 +499,7 @@ def main(): "port": args.port, "log_config": None, # Disable default config } - + if args.ssl: uvicorn_config.update( { @@ -506,7 +507,7 @@ def main(): "ssl_keyfile": args.ssl_keyfile, } ) - + print(f"Starting Uvicorn server in single-process mode on {args.host}:{args.port}") uvicorn.run(**uvicorn_config) diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index f13cdfb6..0d935ebd 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -10,7 +10,11 @@ from lightrag.utils import ( logger, write_json, ) -from .shared_storage import get_namespace_data, get_storage_lock, try_initialize_namespace +from .shared_storage import ( + get_namespace_data, + get_storage_lock, + try_initialize_namespace, +) @final @@ -20,7 +24,7 @@ class JsonKVStorage(BaseKVStorage): working_dir = self.global_config["working_dir"] self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json") self._storage_lock = get_storage_lock() - + # check need_init must before get_namespace_data need_init = try_initialize_namespace(self.namespace) self._data = get_namespace_data(self.namespace) diff --git a/lightrag/kg/nano_vector_db_impl.py b/lightrag/kg/nano_vector_db_impl.py index 953a19a7..43dbcf97 100644 --- a/lightrag/kg/nano_vector_db_impl.py +++ b/lightrag/kg/nano_vector_db_impl.py @@ -11,7 +11,12 @@ from lightrag.utils import ( ) import pipmaster as pm from lightrag.base import BaseVectorStorage -from .shared_storage import get_storage_lock, get_namespace_object, is_multiprocess, try_initialize_namespace +from .shared_storage import ( + get_storage_lock, + get_namespace_object, + is_multiprocess, + try_initialize_namespace, +) if not pm.is_installed("nano-vectordb"): pm.install("nano-vectordb") diff --git a/lightrag/kg/networkx_impl.py b/lightrag/kg/networkx_impl.py index db059393..c42a1981 100644 --- a/lightrag/kg/networkx_impl.py +++ b/lightrag/kg/networkx_impl.py @@ -6,7 +6,12 @@ import numpy as np from lightrag.types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge from lightrag.utils import logger from lightrag.base import BaseGraphStorage -from .shared_storage import get_storage_lock, get_namespace_object, is_multiprocess, try_initialize_namespace +from .shared_storage import ( + get_storage_lock, + get_namespace_object, + is_multiprocess, + try_initialize_namespace, +) import pipmaster as pm @@ -74,16 +79,14 @@ class NetworkXStorage(BaseGraphStorage): self.global_config["working_dir"], f"graph_{self.namespace}.graphml" ) self._storage_lock = get_storage_lock() - + # check need_init must before get_namespace_object need_init = try_initialize_namespace(self.namespace) self._graph = get_namespace_object(self.namespace) - + if need_init: if is_multiprocess: - preloaded_graph = NetworkXStorage.load_nx_graph( - self._graphml_xml_file - ) + preloaded_graph = NetworkXStorage.load_nx_graph(self._graphml_xml_file) self._graph.value = preloaded_graph or nx.Graph() if preloaded_graph: logger.info( @@ -92,9 +95,7 @@ class NetworkXStorage(BaseGraphStorage): else: logger.info("Created new empty graph") else: - preloaded_graph = NetworkXStorage.load_nx_graph( - self._graphml_xml_file - ) + preloaded_graph = NetworkXStorage.load_nx_graph(self._graphml_xml_file) self._graph = preloaded_graph or nx.Graph() if preloaded_graph: logger.info( diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 73ffb306..d8cf71c9 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -4,16 +4,17 @@ from multiprocessing.synchronize import Lock as ProcessLock from threading import Lock as ThreadLock from multiprocessing import Manager from typing import Any, Dict, Optional, Union -from lightrag.utils import logger + # Define a direct print function for critical logs that must be visible in all processes def direct_log(message, level="INFO"): """ Log a message directly to stderr to ensure visibility in all processes, including the Gunicorn master process. - """ + """ print(f"{level}: {message}", file=sys.stderr, flush=True) + LockType = Union[ProcessLock, ThreadLock] _manager = None @@ -31,39 +32,53 @@ _global_lock: Optional[LockType] = None def initialize_share_data(workers: int = 1): """ Initialize shared storage data for single or multi-process mode. - + When used with Gunicorn's preload feature, this function is called once in the master process before forking worker processes, allowing all workers to share the same initialized data. - + In single-process mode, this function is called during LightRAG object initialization. - + The function determines whether to use cross-process shared variables for data storage based on the number of workers. If workers=1, it uses thread locks and local dictionaries. If workers>1, it uses process locks and shared dictionaries managed by multiprocessing.Manager. - + Args: workers (int): Number of worker processes. If 1, single-process mode is used. If > 1, multi-process mode with shared memory is used. """ - global _manager, is_multiprocess, is_multiprocess, _global_lock, _shared_dicts, _share_objects, _init_flags, _initialized - + global \ + _manager, \ + is_multiprocess, \ + is_multiprocess, \ + _global_lock, \ + _shared_dicts, \ + _share_objects, \ + _init_flags, \ + _initialized + # Check if already initialized if _initialized: - direct_log(f"Process {os.getpid()} Shared-Data already initialized (multiprocess={is_multiprocess})") + direct_log( + f"Process {os.getpid()} Shared-Data already initialized (multiprocess={is_multiprocess})" + ) return - + _manager = Manager() # Force multi-process mode if workers > 1 if workers > 1: is_multiprocess = True - _global_lock = _manager.Lock() + _global_lock = _manager.Lock() # Create shared dictionaries with manager _shared_dicts = _manager.dict() _share_objects = _manager.dict() - _init_flags = _manager.dict() # Use shared dictionary to store initialization flags - direct_log(f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})") + _init_flags = ( + _manager.dict() + ) # Use shared dictionary to store initialization flags + direct_log( + f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})" + ) else: is_multiprocess = False _global_lock = ThreadLock() @@ -75,6 +90,7 @@ def initialize_share_data(workers: int = 1): # Mark as initialized _initialized = True + def try_initialize_namespace(namespace: str) -> bool: """ Try to initialize a namespace. Returns True if the current process gets initialization permission. @@ -83,8 +99,11 @@ def try_initialize_namespace(namespace: str) -> bool: global _init_flags, _manager if _init_flags is None: - direct_log(f"Error: try to create nanmespace before Shared-Data is initialized, pid={os.getpid()}", level="ERROR") - raise ValueError("Shared dictionaries not initialized") + direct_log( + f"Error: try to create nanmespace before Shared-Data is initialized, pid={os.getpid()}", + level="ERROR", + ) + raise ValueError("Shared dictionaries not initialized") if namespace not in _init_flags: _init_flags[namespace] = True @@ -112,7 +131,10 @@ def get_namespace_object(namespace: str) -> Any: """Get an object for specific namespace""" if _share_objects is None: - direct_log(f"Error: try to getnanmespace before Shared-Data is initialized, pid={os.getpid()}", level="ERROR") + direct_log( + f"Error: try to getnanmespace before Shared-Data is initialized, pid={os.getpid()}", + level="ERROR", + ) raise ValueError("Shared dictionaries not initialized") lock = _get_global_lock() @@ -123,14 +145,20 @@ def get_namespace_object(namespace: str) -> Any: _share_objects[namespace] = _manager.Value("O", None) else: _share_objects[namespace] = None - direct_log(f"Created namespace({namespace}): type={type(_share_objects[namespace])}, pid={os.getpid()}") + direct_log( + f"Created namespace({namespace}): type={type(_share_objects[namespace])}, pid={os.getpid()}" + ) return _share_objects[namespace] + def get_namespace_data(namespace: str) -> Dict[str, Any]: """get storage space for specific storage type(namespace)""" if _shared_dicts is None: - direct_log(f"Error: try to getnanmespace before Shared-Data is initialized, pid={os.getpid()}", level="ERROR") + direct_log( + f"Error: try to getnanmespace before Shared-Data is initialized, pid={os.getpid()}", + level="ERROR", + ) raise ValueError("Shared dictionaries not initialized") lock = _get_global_lock() @@ -140,8 +168,10 @@ def get_namespace_data(namespace: str) -> Dict[str, Any]: _shared_dicts[namespace] = _manager.dict() else: _shared_dicts[namespace] = {} - direct_log(f"Created namespace({namespace}): type={type(_shared_dicts[namespace])}, pid={os.getpid()}") - + direct_log( + f"Created namespace({namespace}): type={type(_shared_dicts[namespace])}, pid={os.getpid()}" + ) + return _shared_dicts[namespace] @@ -153,22 +183,33 @@ def get_scan_progress() -> Dict[str, Any]: def finalize_share_data(): """ Release shared resources and clean up. - + This function should be called when the application is shutting down to properly release shared resources and avoid memory leaks. - + In multi-process mode, it shuts down the Manager and releases all shared objects. In single-process mode, it simply resets the global variables. """ - global _manager, is_multiprocess, _global_lock, _shared_dicts, _share_objects, _init_flags, _initialized - + global \ + _manager, \ + is_multiprocess, \ + _global_lock, \ + _shared_dicts, \ + _share_objects, \ + _init_flags, \ + _initialized + # Check if already initialized if not _initialized: - direct_log(f"Process {os.getpid()} storage data not initialized, nothing to finalize") + direct_log( + f"Process {os.getpid()} storage data not initialized, nothing to finalize" + ) return - - direct_log(f"Process {os.getpid()} finalizing storage data (multiprocess={is_multiprocess})") - + + direct_log( + f"Process {os.getpid()} finalizing storage data (multiprocess={is_multiprocess})" + ) + # In multi-process mode, shut down the Manager if is_multiprocess and _manager is not None: try: @@ -179,13 +220,15 @@ def finalize_share_data(): _share_objects.clear() if _init_flags is not None: _init_flags.clear() - + # Shut down the Manager _manager.shutdown() direct_log(f"Process {os.getpid()} Manager shutdown complete") except Exception as e: - direct_log(f"Process {os.getpid()} Error shutting down Manager: {e}", level="ERROR") - + direct_log( + f"Process {os.getpid()} Error shutting down Manager: {e}", level="ERROR" + ) + # Reset global variables _manager = None _initialized = None @@ -194,5 +237,5 @@ def finalize_share_data(): _share_objects = None _init_flags = None _global_lock = None - + direct_log(f"Process {os.getpid()} storage data finalization complete") diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 924fbae3..ae250bac 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -271,12 +271,17 @@ class LightRAG: set_logger(self.log_file_path, self.log_level) logger.info(f"Logger initialized for working directory: {self.working_dir}") - from lightrag.kg.shared_storage import initialize_share_data, try_initialize_namespace, get_namespace_data + from lightrag.kg.shared_storage import ( + initialize_share_data, + try_initialize_namespace, + get_namespace_data, + ) + initialize_share_data() - need_init = try_initialize_namespace("scan_progress") + need_init = try_initialize_namespace("scan_progress") scan_progress = get_namespace_data("scan_progress") - logger.info(f"scan_progress type after init: {type(scan_progress)}") + logger.info(f"scan_progress type after init: {type(scan_progress)}") scan_progress.update( { "is_scanning": False, @@ -286,9 +291,6 @@ class LightRAG: "progress": 0, } ) - scan_progress = get_namespace_data("scan_progress") - logger.info(f"scan_progress type after update: {type(scan_progress)}") - logger.info(f"Scan_progres value after update: {scan_progress}") if not os.path.exists(self.working_dir): logger.info(f"Creating working directory {self.working_dir}") diff --git a/run_with_gunicorn.py b/run_with_gunicorn.py index 44a49e93..705cb88f 100755 --- a/run_with_gunicorn.py +++ b/run_with_gunicorn.py @@ -2,127 +2,149 @@ """ Start LightRAG server with Gunicorn """ + import os import sys import json import signal import argparse -import subprocess from lightrag.api.utils_api import parse_args, display_splash_screen from lightrag.kg.shared_storage import initialize_share_data, finalize_share_data + # Signal handler for graceful shutdown def signal_handler(sig, frame): - print("\n\n" + "="*80) + print("\n\n" + "=" * 80) print("RECEIVED TERMINATION SIGNAL") print(f"Process ID: {os.getpid()}") - print("="*80 + "\n") - + print("=" * 80 + "\n") + # Release shared resources finalize_share_data() - + # Exit with success status sys.exit(0) + def main(): # Register signal handlers for graceful shutdown signal.signal(signal.SIGINT, signal_handler) # Ctrl+C - signal.signal(signal.SIGTERM, signal_handler) # kill command + signal.signal(signal.SIGTERM, signal_handler) # kill command # Create a parser to handle Gunicorn-specific parameters - parser = argparse.ArgumentParser( - description="Start LightRAG server with Gunicorn" - ) + parser = argparse.ArgumentParser(description="Start LightRAG server with Gunicorn") parser.add_argument( "--workers", type=int, - help="Number of worker processes (overrides the default or config.ini setting)" + help="Number of worker processes (overrides the default or config.ini setting)", ) parser.add_argument( - "--timeout", - type=int, - help="Worker timeout in seconds (default: 120)" + "--timeout", type=int, help="Worker timeout in seconds (default: 120)" ) parser.add_argument( "--log-level", choices=["debug", "info", "warning", "error", "critical"], - help="Gunicorn log level" + help="Gunicorn log level", ) - + # Parse Gunicorn-specific arguments gunicorn_args, remaining_args = parser.parse_known_args() - + # Pass remaining arguments to LightRAG's parse_args sys.argv = [sys.argv[0]] + remaining_args args = parse_args() - + # If workers specified, override args value if gunicorn_args.workers: args.workers = gunicorn_args.workers os.environ["WORKERS"] = str(gunicorn_args.workers) - + # If timeout specified, set environment variable if gunicorn_args.timeout: os.environ["TIMEOUT"] = str(gunicorn_args.timeout) - + # If log-level specified, set environment variable if gunicorn_args.log_level: os.environ["LOG_LEVEL"] = gunicorn_args.log_level - + # Save all LightRAG args to environment variable for worker processes # This is the key step for passing arguments to lightrag_server.py os.environ["LIGHTRAG_ARGS"] = json.dumps(vars(args)) - + # Display startup information display_splash_screen(args) - + print("🚀 Starting LightRAG with Gunicorn") print(f"🔄 Worker management: Gunicorn (workers={args.workers})") print("🔍 Preloading app: Enabled") print("📝 Note: Using Gunicorn's preload feature for shared data initialization") - print("\n\n" + "="*80) + print("\n\n" + "=" * 80) print("MAIN PROCESS INITIALIZATION") print(f"Process ID: {os.getpid()}") print(f"Workers setting: {args.workers}") - print("="*80 + "\n") - + print("=" * 80 + "\n") + # Start application with Gunicorn using direct Python API # Ensure WORKERS environment variable is set before importing gunicorn_config if args.workers > 1: os.environ["WORKERS"] = str(args.workers) - + # Import Gunicorn's StandaloneApplication from gunicorn.app.base import BaseApplication - + # Define a custom application class that loads our config class GunicornApp(BaseApplication): def __init__(self, app, options=None): self.options = options or {} self.application = app super().__init__() - + def load_config(self): # Define valid Gunicorn configuration options valid_options = { - 'bind', 'workers', 'worker_class', 'timeout', 'keepalive', - 'preload_app', 'errorlog', 'accesslog', 'loglevel', - 'certfile', 'keyfile', 'limit_request_line', 'limit_request_fields', - 'limit_request_field_size', 'graceful_timeout', 'max_requests', - 'max_requests_jitter' + "bind", + "workers", + "worker_class", + "timeout", + "keepalive", + "preload_app", + "errorlog", + "accesslog", + "loglevel", + "certfile", + "keyfile", + "limit_request_line", + "limit_request_fields", + "limit_request_field_size", + "graceful_timeout", + "max_requests", + "max_requests_jitter", } - + # Special hooks that need to be set separately special_hooks = { - 'on_starting', 'on_reload', 'on_exit', 'pre_fork', 'post_fork', - 'pre_exec', 'pre_request', 'post_request', 'worker_init', - 'worker_exit', 'nworkers_changed', 'child_exit' + "on_starting", + "on_reload", + "on_exit", + "pre_fork", + "post_fork", + "pre_exec", + "pre_request", + "post_request", + "worker_init", + "worker_exit", + "nworkers_changed", + "child_exit", } - + # Import the gunicorn_config module directly import importlib.util - spec = importlib.util.spec_from_file_location("gunicorn_config", "gunicorn_config.py") + + spec = importlib.util.spec_from_file_location( + "gunicorn_config", "gunicorn_config.py" + ) self.config_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(self.config_module) - + # Set configuration options for key in dir(self.config_module): if key in valid_options: @@ -135,7 +157,7 @@ def main(): value = getattr(self.config_module, key) if callable(value): self.cfg.set(key, value) - + # Override with command line arguments if provided if gunicorn_args.workers: self.cfg.set("workers", gunicorn_args.workers) @@ -143,18 +165,18 @@ def main(): self.cfg.set("timeout", gunicorn_args.timeout) if gunicorn_args.log_level: self.cfg.set("loglevel", gunicorn_args.log_level) - + def load(self): # Import the application from lightrag.api.lightrag_server import get_application + return get_application() - + # Create the application app = GunicornApp("") - + # Directly call initialize_share_data with the correct workers value - from lightrag.kg.shared_storage import initialize_share_data - + # Force workers to be an integer and greater than 1 for multi-process mode workers_count = int(args.workers) if workers_count > 1: @@ -163,10 +185,11 @@ def main(): initialize_share_data(workers_count) else: initialize_share_data(1) - + # Run the application print("\nStarting Gunicorn with direct Python API...") app.run() + if __name__ == "__main__": main()