From 5d64f3b0a03fbb856504f5729530166d1107f495 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 10 Mar 2025 17:14:14 +0800 Subject: [PATCH] Improved auto-scan task initialization and status tracking. - Added autoscan status tracking in pipeline - Ensured auto-scan runs only once per startup --- lightrag/api/lightrag_server.py | 34 +++++++++++++++++---------------- lightrag/kg/shared_storage.py | 1 + 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 8871650a..fd09a691 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -141,23 +141,25 @@ def create_app(args): try: # Initialize database connections await rag.initialize_storages() - await initialize_pipeline_status() - # Auto scan documents if enabled - if args.auto_scan_at_startup: - # Check if a task is already running (with lock protection) - pipeline_status = await get_namespace_data("pipeline_status") - should_start_task = False - async with get_pipeline_status_lock(): - if not pipeline_status.get("busy", False): - should_start_task = True - # Only start the task if no other task is running - if should_start_task: - # Create background task - task = asyncio.create_task(run_scanning_process(rag, doc_manager)) - app.state.background_tasks.add(task) - task.add_done_callback(app.state.background_tasks.discard) - logger.info("Auto scan task started at startup.") + await initialize_pipeline_status() + pipeline_status = await get_namespace_data("pipeline_status") + + should_start_autoscan = False + async with get_pipeline_status_lock(): + # Auto scan documents if enabled + if args.auto_scan_at_startup: + if not pipeline_status.get("autoscanned", False): + pipeline_status["autoscanned"] = True + should_start_autoscan = True + + # Only run auto scan when no other process started it first + if should_start_autoscan: + # Create background task + task = asyncio.create_task(run_scanning_process(rag, doc_manager)) + app.state.background_tasks.add(task) + task.add_done_callback(app.state.background_tasks.discard) + logger.info(f"Process {os.getpid()} auto scan task started at startup.") ASCIIColors.green("\nServer is ready to accept connections! 🚀\n") diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 382e490b..736887a6 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -286,6 +286,7 @@ async def initialize_pipeline_status(): history_messages = _manager.list() if is_multiprocess else [] pipeline_namespace.update( { + "autoscanned": False, # Auto-scan started "busy": False, # Control concurrent processes "job_name": "Default Job", # Current job name (indexing files/indexing texts) "job_start": None, # Job start time