Improved auto-scan task initialization and status tracking.

- Added autoscan status tracking in pipeline
- Ensured auto-scan runs only once per startup
This commit is contained in:
yangdx
2025-03-10 17:14:14 +08:00
parent 3cca18c59c
commit 5d64f3b0a0
2 changed files with 19 additions and 16 deletions

View File

@@ -141,23 +141,25 @@ def create_app(args):
try:
# Initialize database connections
await rag.initialize_storages()
await initialize_pipeline_status()
# Auto scan documents if enabled
if args.auto_scan_at_startup:
# Check if a task is already running (with lock protection)
pipeline_status = await get_namespace_data("pipeline_status")
should_start_task = False
async with get_pipeline_status_lock():
if not pipeline_status.get("busy", False):
should_start_task = True
# Only start the task if no other task is running
if should_start_task:
# Create background task
task = asyncio.create_task(run_scanning_process(rag, doc_manager))
app.state.background_tasks.add(task)
task.add_done_callback(app.state.background_tasks.discard)
logger.info("Auto scan task started at startup.")
await initialize_pipeline_status()
pipeline_status = await get_namespace_data("pipeline_status")
should_start_autoscan = False
async with get_pipeline_status_lock():
# Auto scan documents if enabled
if args.auto_scan_at_startup:
if not pipeline_status.get("autoscanned", False):
pipeline_status["autoscanned"] = True
should_start_autoscan = True
# Only run auto scan when no other process started it first
if should_start_autoscan:
# Create background task
task = asyncio.create_task(run_scanning_process(rag, doc_manager))
app.state.background_tasks.add(task)
task.add_done_callback(app.state.background_tasks.discard)
logger.info(f"Process {os.getpid()} auto scan task started at startup.")
ASCIIColors.green("\nServer is ready to accept connections! 🚀\n")

View File

@@ -286,6 +286,7 @@ async def initialize_pipeline_status():
history_messages = _manager.list() if is_multiprocess else []
pipeline_namespace.update(
{
"autoscanned": False, # Auto-scan started
"busy": False, # Control concurrent processes
"job_name": "Default Job", # Current job name (indexing files/indexing texts)
"job_start": None, # Job start time