From 0761af19c68210ef2f7fcac909adc255fea26d84 Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 21 Mar 2025 13:41:37 +0800 Subject: [PATCH] Files are now processed in batches in auto scan --- lightrag/api/routers/document_routes.py | 23 +++++++++++++++++++++-- lightrag/api/utils_api.py | 3 +++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index a26216d2..248505ba 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -472,11 +472,30 @@ async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager): total_files = len(new_files) logger.info(f"Found {total_files} new files to index.") - if new_files: - await pipeline_index_files(rag, new_files) + if not new_files: + return + + # Get MAX_PARALLEL_INSERT from global_args + max_parallel = global_args["max_parallel_insert"] + # Calculate batch size as 2 * MAX_PARALLEL_INSERT + batch_size = 2 * max_parallel + + # Process files in batches + for i in range(0, total_files, batch_size): + batch_files = new_files[i:i+batch_size] + batch_num = i // batch_size + 1 + total_batches = (total_files + batch_size - 1) // batch_size + + logger.info(f"Processing batch {batch_num}/{total_batches} with {len(batch_files)} files") + await pipeline_index_files(rag, batch_files) + + # Log progress + processed = min(i + batch_size, total_files) + logger.info(f"Processed {processed}/{total_files} files ({processed/total_files*100:.1f}%)") except Exception as e: logger.error(f"Error during scanning process: {str(e)}") + logger.error(traceback.format_exc()) def create_document_routes( diff --git a/lightrag/api/utils_api.py b/lightrag/api/utils_api.py index 53ea3ae8..53da3e34 100644 --- a/lightrag/api/utils_api.py +++ b/lightrag/api/utils_api.py @@ -365,6 +365,9 @@ def parse_args(is_uvicorn_mode: bool = False) -> argparse.Namespace: "LIGHTRAG_VECTOR_STORAGE", DefaultRAGStorageConfig.VECTOR_STORAGE ) + # Get MAX_PARALLEL_INSERT from environment + global_args["max_parallel_insert"] = get_env_value("MAX_PARALLEL_INSERT", 2, int) + # Handle openai-ollama special case if args.llm_binding == "openai-ollama": args.llm_binding = "openai"