Fix linting

This commit is contained in:
yangdx
2025-02-27 19:05:51 +08:00
parent 946095ef80
commit 64f22966a3
8 changed files with 196 additions and 112 deletions

View File

@@ -8,7 +8,7 @@ from lightrag.api.utils_api import parse_args
args = parse_args()
# Determine worker count - from environment variable or command line arguments
workers = int(os.getenv('WORKERS', args.workers))
workers = int(os.getenv("WORKERS", args.workers))
# If not specified, use CPU count * 2 + 1 (Gunicorn recommended configuration)
if workers <= 1:
@@ -24,7 +24,7 @@ preload_app = True
worker_class = "uvicorn.workers.UvicornWorker"
# Other Gunicorn configurations
timeout = int(os.getenv('TIMEOUT', 120))
timeout = int(os.getenv("TIMEOUT", 120))
keepalive = 5
# Optional SSL configuration
@@ -33,9 +33,10 @@ if args.ssl:
keyfile = args.ssl_keyfile
# Logging configuration
errorlog = os.getenv('ERROR_LOG', '-') # '-' means stderr
accesslog = os.getenv('ACCESS_LOG', '-') # '-' means stderr
loglevel = os.getenv('LOG_LEVEL', 'info')
errorlog = os.getenv("ERROR_LOG", "-") # '-' means stderr
accesslog = os.getenv("ACCESS_LOG", "-") # '-' means stderr
loglevel = os.getenv("LOG_LEVEL", "info")
def on_starting(server):
"""
@@ -46,21 +47,25 @@ def on_starting(server):
print(f"GUNICORN MASTER PROCESS: on_starting jobs for all {workers} workers")
print(f"Process ID: {os.getpid()}")
print("=" * 80)
# Memory usage monitoring
try:
import psutil
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
msg = f"Memory usage after initialization: {memory_info.rss / 1024 / 1024:.2f} MB"
msg = (
f"Memory usage after initialization: {memory_info.rss / 1024 / 1024:.2f} MB"
)
print(msg)
except ImportError:
print("psutil not installed, skipping memory usage reporting")
print("=" * 80)
print("Gunicorn initialization complete, forking workers...")
print("=" * 80)
def on_exit(server):
"""
Executed when Gunicorn is shutting down.
@@ -70,10 +75,10 @@ def on_exit(server):
print("GUNICORN MASTER PROCESS: Shutting down")
print(f"Process ID: {os.getpid()}")
print("=" * 80)
# Release shared resources
finalize_share_data()
print("=" * 80)
print("Gunicorn shutdown complete")
print("=" * 80)

View File

@@ -471,12 +471,13 @@ def configure_logging():
def main():
# Check if running under Gunicorn
if 'GUNICORN_CMD_ARGS' in os.environ:
if "GUNICORN_CMD_ARGS" in os.environ:
# If started with Gunicorn, return directly as Gunicorn will call get_application
print("Running under Gunicorn - worker management handled by Gunicorn")
return
from multiprocessing import freeze_support
freeze_support()
args = parse_args()
@@ -487,10 +488,10 @@ def main():
configure_logging()
display_splash_screen(args)
# Create application instance directly instead of using factory function
app = create_app(args)
# Start Uvicorn in single process mode
uvicorn_config = {
"app": app, # Pass application instance directly instead of string path
@@ -498,7 +499,7 @@ def main():
"port": args.port,
"log_config": None, # Disable default config
}
if args.ssl:
uvicorn_config.update(
{
@@ -506,7 +507,7 @@ def main():
"ssl_keyfile": args.ssl_keyfile,
}
)
print(f"Starting Uvicorn server in single-process mode on {args.host}:{args.port}")
uvicorn.run(**uvicorn_config)

View File

@@ -10,7 +10,11 @@ from lightrag.utils import (
logger,
write_json,
)
from .shared_storage import get_namespace_data, get_storage_lock, try_initialize_namespace
from .shared_storage import (
get_namespace_data,
get_storage_lock,
try_initialize_namespace,
)
@final
@@ -20,7 +24,7 @@ class JsonKVStorage(BaseKVStorage):
working_dir = self.global_config["working_dir"]
self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json")
self._storage_lock = get_storage_lock()
# check need_init must before get_namespace_data
need_init = try_initialize_namespace(self.namespace)
self._data = get_namespace_data(self.namespace)

View File

@@ -11,7 +11,12 @@ from lightrag.utils import (
)
import pipmaster as pm
from lightrag.base import BaseVectorStorage
from .shared_storage import get_storage_lock, get_namespace_object, is_multiprocess, try_initialize_namespace
from .shared_storage import (
get_storage_lock,
get_namespace_object,
is_multiprocess,
try_initialize_namespace,
)
if not pm.is_installed("nano-vectordb"):
pm.install("nano-vectordb")

View File

@@ -6,7 +6,12 @@ import numpy as np
from lightrag.types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge
from lightrag.utils import logger
from lightrag.base import BaseGraphStorage
from .shared_storage import get_storage_lock, get_namespace_object, is_multiprocess, try_initialize_namespace
from .shared_storage import (
get_storage_lock,
get_namespace_object,
is_multiprocess,
try_initialize_namespace,
)
import pipmaster as pm
@@ -74,16 +79,14 @@ class NetworkXStorage(BaseGraphStorage):
self.global_config["working_dir"], f"graph_{self.namespace}.graphml"
)
self._storage_lock = get_storage_lock()
# check need_init must before get_namespace_object
need_init = try_initialize_namespace(self.namespace)
self._graph = get_namespace_object(self.namespace)
if need_init:
if is_multiprocess:
preloaded_graph = NetworkXStorage.load_nx_graph(
self._graphml_xml_file
)
preloaded_graph = NetworkXStorage.load_nx_graph(self._graphml_xml_file)
self._graph.value = preloaded_graph or nx.Graph()
if preloaded_graph:
logger.info(
@@ -92,9 +95,7 @@ class NetworkXStorage(BaseGraphStorage):
else:
logger.info("Created new empty graph")
else:
preloaded_graph = NetworkXStorage.load_nx_graph(
self._graphml_xml_file
)
preloaded_graph = NetworkXStorage.load_nx_graph(self._graphml_xml_file)
self._graph = preloaded_graph or nx.Graph()
if preloaded_graph:
logger.info(

View File

@@ -4,16 +4,17 @@ from multiprocessing.synchronize import Lock as ProcessLock
from threading import Lock as ThreadLock
from multiprocessing import Manager
from typing import Any, Dict, Optional, Union
from lightrag.utils import logger
# Define a direct print function for critical logs that must be visible in all processes
def direct_log(message, level="INFO"):
"""
Log a message directly to stderr to ensure visibility in all processes,
including the Gunicorn master process.
"""
"""
print(f"{level}: {message}", file=sys.stderr, flush=True)
LockType = Union[ProcessLock, ThreadLock]
_manager = None
@@ -31,39 +32,53 @@ _global_lock: Optional[LockType] = None
def initialize_share_data(workers: int = 1):
"""
Initialize shared storage data for single or multi-process mode.
When used with Gunicorn's preload feature, this function is called once in the
master process before forking worker processes, allowing all workers to share
the same initialized data.
In single-process mode, this function is called during LightRAG object initialization.
The function determines whether to use cross-process shared variables for data storage
based on the number of workers. If workers=1, it uses thread locks and local dictionaries.
If workers>1, it uses process locks and shared dictionaries managed by multiprocessing.Manager.
Args:
workers (int): Number of worker processes. If 1, single-process mode is used.
If > 1, multi-process mode with shared memory is used.
"""
global _manager, is_multiprocess, is_multiprocess, _global_lock, _shared_dicts, _share_objects, _init_flags, _initialized
global \
_manager, \
is_multiprocess, \
is_multiprocess, \
_global_lock, \
_shared_dicts, \
_share_objects, \
_init_flags, \
_initialized
# Check if already initialized
if _initialized:
direct_log(f"Process {os.getpid()} Shared-Data already initialized (multiprocess={is_multiprocess})")
direct_log(
f"Process {os.getpid()} Shared-Data already initialized (multiprocess={is_multiprocess})"
)
return
_manager = Manager()
# Force multi-process mode if workers > 1
if workers > 1:
is_multiprocess = True
_global_lock = _manager.Lock()
_global_lock = _manager.Lock()
# Create shared dictionaries with manager
_shared_dicts = _manager.dict()
_share_objects = _manager.dict()
_init_flags = _manager.dict() # Use shared dictionary to store initialization flags
direct_log(f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})")
_init_flags = (
_manager.dict()
) # Use shared dictionary to store initialization flags
direct_log(
f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})"
)
else:
is_multiprocess = False
_global_lock = ThreadLock()
@@ -75,6 +90,7 @@ def initialize_share_data(workers: int = 1):
# Mark as initialized
_initialized = True
def try_initialize_namespace(namespace: str) -> bool:
"""
Try to initialize a namespace. Returns True if the current process gets initialization permission.
@@ -83,8 +99,11 @@ def try_initialize_namespace(namespace: str) -> bool:
global _init_flags, _manager
if _init_flags is None:
direct_log(f"Error: try to create nanmespace before Shared-Data is initialized, pid={os.getpid()}", level="ERROR")
raise ValueError("Shared dictionaries not initialized")
direct_log(
f"Error: try to create nanmespace before Shared-Data is initialized, pid={os.getpid()}",
level="ERROR",
)
raise ValueError("Shared dictionaries not initialized")
if namespace not in _init_flags:
_init_flags[namespace] = True
@@ -112,7 +131,10 @@ def get_namespace_object(namespace: str) -> Any:
"""Get an object for specific namespace"""
if _share_objects is None:
direct_log(f"Error: try to getnanmespace before Shared-Data is initialized, pid={os.getpid()}", level="ERROR")
direct_log(
f"Error: try to getnanmespace before Shared-Data is initialized, pid={os.getpid()}",
level="ERROR",
)
raise ValueError("Shared dictionaries not initialized")
lock = _get_global_lock()
@@ -123,14 +145,20 @@ def get_namespace_object(namespace: str) -> Any:
_share_objects[namespace] = _manager.Value("O", None)
else:
_share_objects[namespace] = None
direct_log(f"Created namespace({namespace}): type={type(_share_objects[namespace])}, pid={os.getpid()}")
direct_log(
f"Created namespace({namespace}): type={type(_share_objects[namespace])}, pid={os.getpid()}"
)
return _share_objects[namespace]
def get_namespace_data(namespace: str) -> Dict[str, Any]:
"""get storage space for specific storage type(namespace)"""
if _shared_dicts is None:
direct_log(f"Error: try to getnanmespace before Shared-Data is initialized, pid={os.getpid()}", level="ERROR")
direct_log(
f"Error: try to getnanmespace before Shared-Data is initialized, pid={os.getpid()}",
level="ERROR",
)
raise ValueError("Shared dictionaries not initialized")
lock = _get_global_lock()
@@ -140,8 +168,10 @@ def get_namespace_data(namespace: str) -> Dict[str, Any]:
_shared_dicts[namespace] = _manager.dict()
else:
_shared_dicts[namespace] = {}
direct_log(f"Created namespace({namespace}): type={type(_shared_dicts[namespace])}, pid={os.getpid()}")
direct_log(
f"Created namespace({namespace}): type={type(_shared_dicts[namespace])}, pid={os.getpid()}"
)
return _shared_dicts[namespace]
@@ -153,22 +183,33 @@ def get_scan_progress() -> Dict[str, Any]:
def finalize_share_data():
"""
Release shared resources and clean up.
This function should be called when the application is shutting down
to properly release shared resources and avoid memory leaks.
In multi-process mode, it shuts down the Manager and releases all shared objects.
In single-process mode, it simply resets the global variables.
"""
global _manager, is_multiprocess, _global_lock, _shared_dicts, _share_objects, _init_flags, _initialized
global \
_manager, \
is_multiprocess, \
_global_lock, \
_shared_dicts, \
_share_objects, \
_init_flags, \
_initialized
# Check if already initialized
if not _initialized:
direct_log(f"Process {os.getpid()} storage data not initialized, nothing to finalize")
direct_log(
f"Process {os.getpid()} storage data not initialized, nothing to finalize"
)
return
direct_log(f"Process {os.getpid()} finalizing storage data (multiprocess={is_multiprocess})")
direct_log(
f"Process {os.getpid()} finalizing storage data (multiprocess={is_multiprocess})"
)
# In multi-process mode, shut down the Manager
if is_multiprocess and _manager is not None:
try:
@@ -179,13 +220,15 @@ def finalize_share_data():
_share_objects.clear()
if _init_flags is not None:
_init_flags.clear()
# Shut down the Manager
_manager.shutdown()
direct_log(f"Process {os.getpid()} Manager shutdown complete")
except Exception as e:
direct_log(f"Process {os.getpid()} Error shutting down Manager: {e}", level="ERROR")
direct_log(
f"Process {os.getpid()} Error shutting down Manager: {e}", level="ERROR"
)
# Reset global variables
_manager = None
_initialized = None
@@ -194,5 +237,5 @@ def finalize_share_data():
_share_objects = None
_init_flags = None
_global_lock = None
direct_log(f"Process {os.getpid()} storage data finalization complete")

View File

@@ -271,12 +271,17 @@ class LightRAG:
set_logger(self.log_file_path, self.log_level)
logger.info(f"Logger initialized for working directory: {self.working_dir}")
from lightrag.kg.shared_storage import initialize_share_data, try_initialize_namespace, get_namespace_data
from lightrag.kg.shared_storage import (
initialize_share_data,
try_initialize_namespace,
get_namespace_data,
)
initialize_share_data()
need_init = try_initialize_namespace("scan_progress")
need_init = try_initialize_namespace("scan_progress")
scan_progress = get_namespace_data("scan_progress")
logger.info(f"scan_progress type after init: {type(scan_progress)}")
logger.info(f"scan_progress type after init: {type(scan_progress)}")
scan_progress.update(
{
"is_scanning": False,
@@ -286,9 +291,6 @@ class LightRAG:
"progress": 0,
}
)
scan_progress = get_namespace_data("scan_progress")
logger.info(f"scan_progress type after update: {type(scan_progress)}")
logger.info(f"Scan_progres value after update: {scan_progress}")
if not os.path.exists(self.working_dir):
logger.info(f"Creating working directory {self.working_dir}")

View File

@@ -2,127 +2,149 @@
"""
Start LightRAG server with Gunicorn
"""
import os
import sys
import json
import signal
import argparse
import subprocess
from lightrag.api.utils_api import parse_args, display_splash_screen
from lightrag.kg.shared_storage import initialize_share_data, finalize_share_data
# Signal handler for graceful shutdown
def signal_handler(sig, frame):
print("\n\n" + "="*80)
print("\n\n" + "=" * 80)
print("RECEIVED TERMINATION SIGNAL")
print(f"Process ID: {os.getpid()}")
print("="*80 + "\n")
print("=" * 80 + "\n")
# Release shared resources
finalize_share_data()
# Exit with success status
sys.exit(0)
def main():
# Register signal handlers for graceful shutdown
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # kill command
signal.signal(signal.SIGTERM, signal_handler) # kill command
# Create a parser to handle Gunicorn-specific parameters
parser = argparse.ArgumentParser(
description="Start LightRAG server with Gunicorn"
)
parser = argparse.ArgumentParser(description="Start LightRAG server with Gunicorn")
parser.add_argument(
"--workers",
type=int,
help="Number of worker processes (overrides the default or config.ini setting)"
help="Number of worker processes (overrides the default or config.ini setting)",
)
parser.add_argument(
"--timeout",
type=int,
help="Worker timeout in seconds (default: 120)"
"--timeout", type=int, help="Worker timeout in seconds (default: 120)"
)
parser.add_argument(
"--log-level",
choices=["debug", "info", "warning", "error", "critical"],
help="Gunicorn log level"
help="Gunicorn log level",
)
# Parse Gunicorn-specific arguments
gunicorn_args, remaining_args = parser.parse_known_args()
# Pass remaining arguments to LightRAG's parse_args
sys.argv = [sys.argv[0]] + remaining_args
args = parse_args()
# If workers specified, override args value
if gunicorn_args.workers:
args.workers = gunicorn_args.workers
os.environ["WORKERS"] = str(gunicorn_args.workers)
# If timeout specified, set environment variable
if gunicorn_args.timeout:
os.environ["TIMEOUT"] = str(gunicorn_args.timeout)
# If log-level specified, set environment variable
if gunicorn_args.log_level:
os.environ["LOG_LEVEL"] = gunicorn_args.log_level
# Save all LightRAG args to environment variable for worker processes
# This is the key step for passing arguments to lightrag_server.py
os.environ["LIGHTRAG_ARGS"] = json.dumps(vars(args))
# Display startup information
display_splash_screen(args)
print("🚀 Starting LightRAG with Gunicorn")
print(f"🔄 Worker management: Gunicorn (workers={args.workers})")
print("🔍 Preloading app: Enabled")
print("📝 Note: Using Gunicorn's preload feature for shared data initialization")
print("\n\n" + "="*80)
print("\n\n" + "=" * 80)
print("MAIN PROCESS INITIALIZATION")
print(f"Process ID: {os.getpid()}")
print(f"Workers setting: {args.workers}")
print("="*80 + "\n")
print("=" * 80 + "\n")
# Start application with Gunicorn using direct Python API
# Ensure WORKERS environment variable is set before importing gunicorn_config
if args.workers > 1:
os.environ["WORKERS"] = str(args.workers)
# Import Gunicorn's StandaloneApplication
from gunicorn.app.base import BaseApplication
# Define a custom application class that loads our config
class GunicornApp(BaseApplication):
def __init__(self, app, options=None):
self.options = options or {}
self.application = app
super().__init__()
def load_config(self):
# Define valid Gunicorn configuration options
valid_options = {
'bind', 'workers', 'worker_class', 'timeout', 'keepalive',
'preload_app', 'errorlog', 'accesslog', 'loglevel',
'certfile', 'keyfile', 'limit_request_line', 'limit_request_fields',
'limit_request_field_size', 'graceful_timeout', 'max_requests',
'max_requests_jitter'
"bind",
"workers",
"worker_class",
"timeout",
"keepalive",
"preload_app",
"errorlog",
"accesslog",
"loglevel",
"certfile",
"keyfile",
"limit_request_line",
"limit_request_fields",
"limit_request_field_size",
"graceful_timeout",
"max_requests",
"max_requests_jitter",
}
# Special hooks that need to be set separately
special_hooks = {
'on_starting', 'on_reload', 'on_exit', 'pre_fork', 'post_fork',
'pre_exec', 'pre_request', 'post_request', 'worker_init',
'worker_exit', 'nworkers_changed', 'child_exit'
"on_starting",
"on_reload",
"on_exit",
"pre_fork",
"post_fork",
"pre_exec",
"pre_request",
"post_request",
"worker_init",
"worker_exit",
"nworkers_changed",
"child_exit",
}
# Import the gunicorn_config module directly
import importlib.util
spec = importlib.util.spec_from_file_location("gunicorn_config", "gunicorn_config.py")
spec = importlib.util.spec_from_file_location(
"gunicorn_config", "gunicorn_config.py"
)
self.config_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(self.config_module)
# Set configuration options
for key in dir(self.config_module):
if key in valid_options:
@@ -135,7 +157,7 @@ def main():
value = getattr(self.config_module, key)
if callable(value):
self.cfg.set(key, value)
# Override with command line arguments if provided
if gunicorn_args.workers:
self.cfg.set("workers", gunicorn_args.workers)
@@ -143,18 +165,18 @@ def main():
self.cfg.set("timeout", gunicorn_args.timeout)
if gunicorn_args.log_level:
self.cfg.set("loglevel", gunicorn_args.log_level)
def load(self):
# Import the application
from lightrag.api.lightrag_server import get_application
return get_application()
# Create the application
app = GunicornApp("")
# Directly call initialize_share_data with the correct workers value
from lightrag.kg.shared_storage import initialize_share_data
# Force workers to be an integer and greater than 1 for multi-process mode
workers_count = int(args.workers)
if workers_count > 1:
@@ -163,10 +185,11 @@ def main():
initialize_share_data(workers_count)
else:
initialize_share_data(1)
# Run the application
print("\nStarting Gunicorn with direct Python API...")
app.run()
if __name__ == "__main__":
main()