From b090a22be7553b947f182138e4fe4379dc915507 Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 28 Feb 2025 12:22:20 +0800 Subject: [PATCH] Add concurrency check for auto scan task to prevent duplicate scans MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Add pipeline status check before scan • Add storage lock protection • Add latest_message to status tracking • Add helpful log message at startup --- lightrag/api/lightrag_server.py | 23 +++++++++++++++++++---- lightrag/kg/shared_storage.py | 1 + 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 155e22f5..9af1a90e 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -145,10 +145,25 @@ def create_app(args): # Auto scan documents if enabled if args.auto_scan_at_startup: - # Create background task - task = asyncio.create_task(run_scanning_process(rag, doc_manager)) - app.state.background_tasks.add(task) - task.add_done_callback(app.state.background_tasks.discard) + # Import necessary functions from shared_storage + from lightrag.kg.shared_storage import get_namespace_data, get_storage_lock + + # Get pipeline status and lock + pipeline_status = get_namespace_data("pipeline_status") + storage_lock = get_storage_lock() + + # Check if a task is already running (with lock protection) + should_start_task = False + with storage_lock: + if not pipeline_status.get("busy", False): + should_start_task = True + # Only start the task if no other task is running + if should_start_task: + # Create background task + task = asyncio.create_task(run_scanning_process(rag, doc_manager)) + app.state.background_tasks.add(task) + task.add_done_callback(app.state.background_tasks.discard) + logger.info("Auto scan task started at startup.") ASCIIColors.green("\nServer is ready to accept connections! 🚀\n") diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 9369376e..a4970321 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -91,6 +91,7 @@ def initialize_share_data(workers: int = 1): "batchs": 0, # Number of batches for processing documents "cur_batch": 0, # Current processing batch "request_pending": False, # Flag for pending request for processing + "latest_message": "" # Latest message from pipeline processing })