Merge branch 'add-multi-worker-support' into max-graph-node

This commit is contained in:
yangdx
2025-03-02 18:24:22 +08:00
3 changed files with 204 additions and 0 deletions

View File

@@ -0,0 +1,187 @@
# gunicorn_config.py
import os
import logging
from lightrag.kg.shared_storage import finalize_share_data
from lightrag.api.lightrag_server import LightragPathFilter
# Get log directory path from environment variable
log_dir = os.getenv("LOG_DIR", os.getcwd())
log_file_path = os.path.abspath(os.path.join(log_dir, "lightrag.log"))
# Get log file max size and backup count from environment variables
log_max_bytes = int(os.getenv("LOG_MAX_BYTES", 10485760)) # Default 10MB
log_backup_count = int(os.getenv("LOG_BACKUP_COUNT", 5)) # Default 5 backups
# These variables will be set by run_with_gunicorn.py
workers = None
bind = None
loglevel = None
certfile = None
keyfile = None
# Enable preload_app option
preload_app = True
# Use Uvicorn worker
worker_class = "uvicorn.workers.UvicornWorker"
# Other Gunicorn configurations
timeout = int(os.getenv("TIMEOUT", 150)) # Default 150s to match run_with_gunicorn.py
keepalive = int(os.getenv("KEEPALIVE", 5)) # Default 5s
# Logging configuration
errorlog = os.getenv("ERROR_LOG", log_file_path) # Default write to lightrag.log
accesslog = os.getenv("ACCESS_LOG", log_file_path) # Default write to lightrag.log
logconfig_dict = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"standard": {"format": "%(asctime)s [%(levelname)s] %(name)s: %(message)s"},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "standard",
"stream": "ext://sys.stdout",
},
"file": {
"class": "logging.handlers.RotatingFileHandler",
"formatter": "standard",
"filename": log_file_path,
"maxBytes": log_max_bytes,
"backupCount": log_backup_count,
"encoding": "utf8",
},
},
"filters": {
"path_filter": {
"()": "lightrag.api.lightrag_server.LightragPathFilter",
},
},
"loggers": {
"lightrag": {
"handlers": ["console", "file"],
"level": loglevel.upper() if loglevel else "INFO",
"propagate": False,
},
"gunicorn": {
"handlers": ["console", "file"],
"level": loglevel.upper() if loglevel else "INFO",
"propagate": False,
},
"gunicorn.error": {
"handlers": ["console", "file"],
"level": loglevel.upper() if loglevel else "INFO",
"propagate": False,
},
"gunicorn.access": {
"handlers": ["console", "file"],
"level": loglevel.upper() if loglevel else "INFO",
"propagate": False,
"filters": ["path_filter"],
},
},
}
def on_starting(server):
"""
Executed when Gunicorn starts, before forking the first worker processes
You can use this function to do more initialization tasks for all processes
"""
print("=" * 80)
print(f"GUNICORN MASTER PROCESS: on_starting jobs for {workers} worker(s)")
print(f"Process ID: {os.getpid()}")
print("=" * 80)
# Memory usage monitoring
try:
import psutil
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
msg = (
f"Memory usage after initialization: {memory_info.rss / 1024 / 1024:.2f} MB"
)
print(msg)
except ImportError:
print("psutil not installed, skipping memory usage reporting")
print("Gunicorn initialization complete, forking workers...\n")
def on_exit(server):
"""
Executed when Gunicorn is shutting down.
This is a good place to release shared resources.
"""
print("=" * 80)
print("GUNICORN MASTER PROCESS: Shutting down")
print(f"Process ID: {os.getpid()}")
print("=" * 80)
# Release shared resources
finalize_share_data()
print("=" * 80)
print("Gunicorn shutdown complete")
print("=" * 80)
def post_fork(server, worker):
"""
Executed after a worker has been forked.
This is a good place to set up worker-specific configurations.
"""
# Configure formatters
detailed_formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
simple_formatter = logging.Formatter("%(levelname)s: %(message)s")
def setup_logger(logger_name: str, level: str = "INFO", add_filter: bool = False):
"""Set up a logger with console and file handlers"""
logger_instance = logging.getLogger(logger_name)
logger_instance.setLevel(level)
logger_instance.handlers = [] # Clear existing handlers
logger_instance.propagate = False
# Add console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(simple_formatter)
console_handler.setLevel(level)
logger_instance.addHandler(console_handler)
# Add file handler
file_handler = logging.handlers.RotatingFileHandler(
filename=log_file_path,
maxBytes=log_max_bytes,
backupCount=log_backup_count,
encoding="utf-8",
)
file_handler.setFormatter(detailed_formatter)
file_handler.setLevel(level)
logger_instance.addHandler(file_handler)
# Add path filter if requested
if add_filter:
path_filter = LightragPathFilter()
logger_instance.addFilter(path_filter)
# Set up main loggers
log_level = loglevel.upper() if loglevel else "INFO"
setup_logger("uvicorn", log_level)
setup_logger("uvicorn.access", log_level, add_filter=True)
setup_logger("lightrag", log_level, add_filter=True)
# Set up lightrag submodule loggers
for name in logging.root.manager.loggerDict:
if name.startswith("lightrag."):
setup_logger(name, log_level, add_filter=True)
# Disable uvicorn.error logger
uvicorn_error_logger = logging.getLogger("uvicorn.error")
uvicorn_error_logger.handlers = []
uvicorn_error_logger.setLevel(logging.CRITICAL)
uvicorn_error_logger.propagate = False

View File

@@ -0,0 +1,203 @@
#!/usr/bin/env python
"""
Start LightRAG server with Gunicorn
"""
import os
import sys
import signal
import pipmaster as pm
from lightrag.api.utils_api import parse_args, display_splash_screen
from lightrag.kg.shared_storage import initialize_share_data, finalize_share_data
def check_and_install_dependencies():
"""Check and install required dependencies"""
required_packages = [
"gunicorn",
"tiktoken",
"psutil",
# Add other required packages here
]
for package in required_packages:
if not pm.is_installed(package):
print(f"Installing {package}...")
pm.install(package)
print(f"{package} installed successfully")
# Signal handler for graceful shutdown
def signal_handler(sig, frame):
print("\n\n" + "=" * 80)
print("RECEIVED TERMINATION SIGNAL")
print(f"Process ID: {os.getpid()}")
print("=" * 80 + "\n")
# Release shared resources
finalize_share_data()
# Exit with success status
sys.exit(0)
def main():
# Check and install dependencies
check_and_install_dependencies()
# Register signal handlers for graceful shutdown
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # kill command
# Parse all arguments using parse_args
args = parse_args(is_uvicorn_mode=False)
# Display startup information
display_splash_screen(args)
print("🚀 Starting LightRAG with Gunicorn")
print(f"🔄 Worker management: Gunicorn (workers={args.workers})")
print("🔍 Preloading app: Enabled")
print("📝 Note: Using Gunicorn's preload feature for shared data initialization")
print("\n\n" + "=" * 80)
print("MAIN PROCESS INITIALIZATION")
print(f"Process ID: {os.getpid()}")
print(f"Workers setting: {args.workers}")
print("=" * 80 + "\n")
# Import Gunicorn's StandaloneApplication
from gunicorn.app.base import BaseApplication
# Define a custom application class that loads our config
class GunicornApp(BaseApplication):
def __init__(self, app, options=None):
self.options = options or {}
self.application = app
super().__init__()
def load_config(self):
# Define valid Gunicorn configuration options
valid_options = {
"bind",
"workers",
"worker_class",
"timeout",
"keepalive",
"preload_app",
"errorlog",
"accesslog",
"loglevel",
"certfile",
"keyfile",
"limit_request_line",
"limit_request_fields",
"limit_request_field_size",
"graceful_timeout",
"max_requests",
"max_requests_jitter",
}
# Special hooks that need to be set separately
special_hooks = {
"on_starting",
"on_reload",
"on_exit",
"pre_fork",
"post_fork",
"pre_exec",
"pre_request",
"post_request",
"worker_init",
"worker_exit",
"nworkers_changed",
"child_exit",
}
# Import and configure the gunicorn_config module
from lightrag.api import gunicorn_config
# Set configuration variables in gunicorn_config, prioritizing command line arguments
gunicorn_config.workers = (
args.workers if args.workers else int(os.getenv("WORKERS", 1))
)
# Bind configuration prioritizes command line arguments
host = args.host if args.host != "0.0.0.0" else os.getenv("HOST", "0.0.0.0")
port = args.port if args.port != 9621 else int(os.getenv("PORT", 9621))
gunicorn_config.bind = f"{host}:{port}"
# Log level configuration prioritizes command line arguments
gunicorn_config.loglevel = (
args.log_level.lower()
if args.log_level
else os.getenv("LOG_LEVEL", "info")
)
# Timeout configuration prioritizes command line arguments
gunicorn_config.timeout = (
args.timeout if args.timeout else int(os.getenv("TIMEOUT", 150))
)
# Keepalive configuration
gunicorn_config.keepalive = int(os.getenv("KEEPALIVE", 5))
# SSL configuration prioritizes command line arguments
if args.ssl or os.getenv("SSL", "").lower() in (
"true",
"1",
"yes",
"t",
"on",
):
gunicorn_config.certfile = (
args.ssl_certfile
if args.ssl_certfile
else os.getenv("SSL_CERTFILE")
)
gunicorn_config.keyfile = (
args.ssl_keyfile if args.ssl_keyfile else os.getenv("SSL_KEYFILE")
)
# Set configuration options from the module
for key in dir(gunicorn_config):
if key in valid_options:
value = getattr(gunicorn_config, key)
# Skip functions like on_starting and None values
if not callable(value) and value is not None:
self.cfg.set(key, value)
# Set special hooks
elif key in special_hooks:
value = getattr(gunicorn_config, key)
if callable(value):
self.cfg.set(key, value)
if hasattr(gunicorn_config, "logconfig_dict"):
self.cfg.set(
"logconfig_dict", getattr(gunicorn_config, "logconfig_dict")
)
def load(self):
# Import the application
from lightrag.api.lightrag_server import get_application
return get_application(args)
# Create the application
app = GunicornApp("")
# Force workers to be an integer and greater than 1 for multi-process mode
workers_count = int(args.workers)
if workers_count > 1:
# Set a flag to indicate we're in the main process
os.environ["LIGHTRAG_MAIN_PROCESS"] = "1"
initialize_share_data(workers_count)
else:
initialize_share_data(1)
# Run the application
print("\nStarting Gunicorn with direct Python API...")
app.run()
if __name__ == "__main__":
main()