Add concurrency check for auto scan task to prevent duplicate scans

• Add pipeline status check before scan
• Add storage lock protection
• Add latest_message to status tracking
• Add helpful log message at startup
This commit is contained in:
yangdx
2025-02-28 12:22:20 +08:00
parent 04bd5413c9
commit b090a22be7
2 changed files with 20 additions and 4 deletions

View File

@@ -145,10 +145,25 @@ def create_app(args):
# Auto scan documents if enabled # Auto scan documents if enabled
if args.auto_scan_at_startup: if args.auto_scan_at_startup:
# Import necessary functions from shared_storage
from lightrag.kg.shared_storage import get_namespace_data, get_storage_lock
# Get pipeline status and lock
pipeline_status = get_namespace_data("pipeline_status")
storage_lock = get_storage_lock()
# Check if a task is already running (with lock protection)
should_start_task = False
with storage_lock:
if not pipeline_status.get("busy", False):
should_start_task = True
# Only start the task if no other task is running
if should_start_task:
# Create background task # Create background task
task = asyncio.create_task(run_scanning_process(rag, doc_manager)) task = asyncio.create_task(run_scanning_process(rag, doc_manager))
app.state.background_tasks.add(task) app.state.background_tasks.add(task)
task.add_done_callback(app.state.background_tasks.discard) task.add_done_callback(app.state.background_tasks.discard)
logger.info("Auto scan task started at startup.")
ASCIIColors.green("\nServer is ready to accept connections! 🚀\n") ASCIIColors.green("\nServer is ready to accept connections! 🚀\n")

View File

@@ -91,6 +91,7 @@ def initialize_share_data(workers: int = 1):
"batchs": 0, # Number of batches for processing documents "batchs": 0, # Number of batches for processing documents
"cur_batch": 0, # Current processing batch "cur_batch": 0, # Current processing batch
"request_pending": False, # Flag for pending request for processing "request_pending": False, # Flag for pending request for processing
"latest_message": "" # Latest message from pipeline processing
}) })