Optimized the document processing pipeline

- Enqueue all files to pipeline before starting process when scanning input folder
- Changed parallel enqueue to sequential to prevent CPU overload by heavy file extraction jobs
This commit is contained in:
yangdx
2025-03-21 13:08:57 +08:00
parent 7a1e4df875
commit 168232803b

View File

@@ -405,7 +405,7 @@ async def pipeline_index_file(rag: LightRAG, file_path: Path):
async def pipeline_index_files(rag: LightRAG, file_paths: List[Path]):
"""Index multiple files concurrently
"""Index multiple files sequentially to avoid high CPU load
Args:
rag: LightRAG instance
@@ -416,12 +416,12 @@ async def pipeline_index_files(rag: LightRAG, file_paths: List[Path]):
try:
enqueued = False
if len(file_paths) == 1:
enqueued = await pipeline_enqueue_file(rag, file_paths[0])
else:
tasks = [pipeline_enqueue_file(rag, path) for path in file_paths]
enqueued = any(await asyncio.gather(*tasks))
# Process files sequentially
for file_path in file_paths:
if await pipeline_enqueue_file(rag, file_path):
enqueued = True
# Process the queue only if at least one file was successfully enqueued
if enqueued:
await rag.apipeline_process_enqueue_documents()
except Exception as e:
@@ -472,11 +472,8 @@ async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager):
total_files = len(new_files)
logger.info(f"Found {total_files} new files to index.")
for idx, file_path in enumerate(new_files):
try:
await pipeline_index_file(rag, file_path)
except Exception as e:
logger.error(f"Error indexing file {file_path}: {str(e)}")
if new_files:
await pipeline_index_files(rag, new_files)
except Exception as e:
logger.error(f"Error during scanning process: {str(e)}")