diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index f5f9f8ea..76901b90 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -39,6 +39,12 @@ from .routers.graph_routes import create_graph_routes from .routers.ollama_api import OllamaAPI from lightrag.utils import logger, set_verbose_debug +from lightrag.kg.shared_storage import ( + get_namespace_data, + get_pipeline_status_lock, + initialize_pipeline_status, + get_all_update_flags_status, +) # Load environment variables load_dotenv(override=True) @@ -134,13 +140,6 @@ def create_app(args): try: # Initialize database connections await rag.initialize_storages() - - # Import necessary functions from shared_storage - from lightrag.kg.shared_storage import ( - get_namespace_data, - get_pipeline_status_lock, - initialize_pipeline_status, - ) await initialize_pipeline_status() # Auto scan documents if enabled @@ -376,6 +375,9 @@ def create_app(args): @app.get("/health", dependencies=[Depends(optional_api_key)]) async def get_status(): """Get current system status""" + # Get update flags status for all namespaces + update_status = await get_all_update_flags_status() + return { "status": "healthy", "working_directory": str(args.working_dir), @@ -395,6 +397,7 @@ def create_app(args): "graph_storage": args.graph_storage, "vector_storage": args.vector_storage, }, + "update_status": update_status, } # Webui mount webui/index.html diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 237ed302..27d23f2e 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -222,6 +222,30 @@ async def set_all_update_flags(namespace: str): _update_flags[namespace][i] = True +async def get_all_update_flags_status() -> Dict[str, list]: + """ + Get update flags status for all namespaces. + + Returns: + Dict[str, list]: A dictionary mapping namespace names to lists of update flag statuses + """ + if _update_flags is None: + return {} + + result = {} + async with get_internal_lock(): + for namespace, flags in _update_flags.items(): + worker_statuses = [] + for flag in flags: + if is_multiprocess: + worker_statuses.append(flag.value) + else: + worker_statuses.append(flag) + result[namespace] = worker_statuses + + return result + + def try_initialize_namespace(namespace: str) -> bool: """ Try to initialize a namespace. Returns True if the current process gets initialization permission.