From 4f68f3e41003985f852aeb80eb6f648b9e48f65e Mon Sep 17 00:00:00 2001 From: yangdx Date: Thu, 24 Apr 2025 13:45:44 +0800 Subject: [PATCH 1/4] Using semaphore to control parallel doc processing instead of batching. --- lightrag/lightrag.py | 293 ++++++++++++++++++++++--------------------- 1 file changed, 149 insertions(+), 144 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 45352e21..742db478 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -841,8 +841,8 @@ class LightRAG: "job_name": "Default Job", "job_start": datetime.now().isoformat(), "docs": 0, - "batchs": 0, - "cur_batch": 0, + "batchs": 0, # 将被重新定义为待处理的文件总数 + "cur_batch": 0, # 将被重新定义为当前处理的第几个文件 "request_pending": False, # Clear any previous request "latest_message": "", } @@ -867,18 +867,13 @@ class LightRAG: pipeline_status["history_messages"].append(log_message) break - # 2. split docs into chunks, insert chunks, update doc status - docs_batches = [ - list(to_process_docs.items())[i : i + self.max_parallel_insert] - for i in range(0, len(to_process_docs), self.max_parallel_insert) - ] - - log_message = f"Processing {len(to_process_docs)} document(s) in {len(docs_batches)} batches" + log_message = f"Processing {len(to_process_docs)} document(s)" logger.info(log_message) - # Update pipeline status with current batch information + # 更新 pipeline_status,batchs 现在表示待处理的文件总数 pipeline_status["docs"] = len(to_process_docs) - pipeline_status["batchs"] = len(docs_batches) + pipeline_status["batchs"] = len(to_process_docs) + pipeline_status["cur_batch"] = 0 # 初始化为0,表示当前已处理的文件数量 pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) @@ -892,6 +887,11 @@ class LightRAG: job_name = f"{path_prefix}[{total_files} files]" pipeline_status["job_name"] = job_name + # 创建一个计数器,用于跟踪已处理的文件数量 + processed_count = 0 + # 创建一个信号量,限制并发处理文件的数量 + semaphore = asyncio.Semaphore(self.max_parallel_insert) + async def process_document( doc_id: str, status_doc: DocProcessingStatus, @@ -899,45 +899,95 @@ class LightRAG: split_by_character_only: bool, pipeline_status: dict, pipeline_status_lock: asyncio.Lock, + semaphore: asyncio.Semaphore, ) -> None: """Process single document""" - try: - # Get file path from status document - file_path = getattr(status_doc, "file_path", "unknown_source") + # 使用信号量控制并发 + async with semaphore: + nonlocal processed_count + # 获取并保存当前文件的序号 + current_file_number = 0 + try: + # Get file path from status document + file_path = getattr(status_doc, "file_path", "unknown_source") - async with pipeline_status_lock: - log_message = f"Processing file: {file_path}" - logger.info(log_message) - pipeline_status["history_messages"].append(log_message) - log_message = f"Processing d-id: {doc_id}" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + async with pipeline_status_lock: + # 更新已处理文件数量并保存当前文件序号 + processed_count += 1 + current_file_number = processed_count # 保存当前文件的序号 + pipeline_status["cur_batch"] = processed_count + + log_message = f"Processing file ({current_file_number}/{total_files}): {file_path}" + logger.info(log_message) + pipeline_status["history_messages"].append(log_message) + log_message = f"Processing d-id: {doc_id}" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) - # Generate chunks from document - chunks: dict[str, Any] = { - compute_mdhash_id(dp["content"], prefix="chunk-"): { - **dp, - "full_doc_id": doc_id, - "file_path": file_path, # Add file path to each chunk + # Generate chunks from document + chunks: dict[str, Any] = { + compute_mdhash_id(dp["content"], prefix="chunk-"): { + **dp, + "full_doc_id": doc_id, + "file_path": file_path, # Add file path to each chunk + } + for dp in self.chunking_func( + self.tokenizer, + status_doc.content, + split_by_character, + split_by_character_only, + self.chunk_overlap_token_size, + self.chunk_token_size, + ) } - for dp in self.chunking_func( - self.tokenizer, - status_doc.content, - split_by_character, - split_by_character_only, - self.chunk_overlap_token_size, - self.chunk_token_size, - ) - } - # Process document (text chunks and full docs) in parallel - # Create tasks with references for potential cancellation - doc_status_task = asyncio.create_task( - self.doc_status.upsert( + # Process document (text chunks and full docs) in parallel + # Create tasks with references for potential cancellation + doc_status_task = asyncio.create_task( + self.doc_status.upsert( + { + doc_id: { + "status": DocStatus.PROCESSING, + "chunks_count": len(chunks), + "content": status_doc.content, + "content_summary": status_doc.content_summary, + "content_length": status_doc.content_length, + "created_at": status_doc.created_at, + "updated_at": datetime.now().isoformat(), + "file_path": file_path, + } + } + ) + ) + chunks_vdb_task = asyncio.create_task( + self.chunks_vdb.upsert(chunks) + ) + entity_relation_task = asyncio.create_task( + self._process_entity_relation_graph( + chunks, pipeline_status, pipeline_status_lock + ) + ) + full_docs_task = asyncio.create_task( + self.full_docs.upsert( + {doc_id: {"content": status_doc.content}} + ) + ) + text_chunks_task = asyncio.create_task( + self.text_chunks.upsert(chunks) + ) + tasks = [ + doc_status_task, + chunks_vdb_task, + entity_relation_task, + full_docs_task, + text_chunks_task, + ] + await asyncio.gather(*tasks) + await self.doc_status.upsert( { doc_id: { - "status": DocStatus.PROCESSING, + "status": DocStatus.PROCESSED, "chunks_count": len(chunks), "content": status_doc.content, "content_summary": status_doc.content_summary, @@ -948,112 +998,67 @@ class LightRAG: } } ) - ) - chunks_vdb_task = asyncio.create_task( - self.chunks_vdb.upsert(chunks) - ) - entity_relation_task = asyncio.create_task( - self._process_entity_relation_graph( - chunks, pipeline_status, pipeline_status_lock - ) - ) - full_docs_task = asyncio.create_task( - self.full_docs.upsert( - {doc_id: {"content": status_doc.content}} - ) - ) - text_chunks_task = asyncio.create_task( - self.text_chunks.upsert(chunks) - ) - tasks = [ - doc_status_task, - chunks_vdb_task, - entity_relation_task, - full_docs_task, - text_chunks_task, - ] - await asyncio.gather(*tasks) - await self.doc_status.upsert( - { - doc_id: { - "status": DocStatus.PROCESSED, - "chunks_count": len(chunks), - "content": status_doc.content, - "content_summary": status_doc.content_summary, - "content_length": status_doc.content_length, - "created_at": status_doc.created_at, - "updated_at": datetime.now().isoformat(), - "file_path": file_path, + + # 每处理完一个文件,就调用一次 _insert_done + await self._insert_done() + + async with pipeline_status_lock: + log_message = f"Completed processing file {current_file_number}/{total_files}: {file_path}" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + except Exception as e: + # Log error and update pipeline status + error_msg = f"Failed to process document {doc_id}: {traceback.format_exc()}" + + logger.error(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append(error_msg) + + # Cancel other tasks as they are no longer meaningful + for task in [ + chunks_vdb_task, + entity_relation_task, + full_docs_task, + text_chunks_task, + ]: + if not task.done(): + task.cancel() + # Update document status to failed + await self.doc_status.upsert( + { + doc_id: { + "status": DocStatus.FAILED, + "error": str(e), + "content": status_doc.content, + "content_summary": status_doc.content_summary, + "content_length": status_doc.content_length, + "created_at": status_doc.created_at, + "updated_at": datetime.now().isoformat(), + "file_path": file_path, + } } - } + ) + + # 创建所有文档的处理任务 + doc_tasks = [] + for doc_id, status_doc in to_process_docs.items(): + doc_tasks.append( + process_document( + doc_id, + status_doc, + split_by_character, + split_by_character_only, + pipeline_status, + pipeline_status_lock, + semaphore, ) - except Exception as e: - # Log error and update pipeline status - error_msg = f"Failed to process document {doc_id}: {traceback.format_exc()}" - - logger.error(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append(error_msg) - - # Cancel other tasks as they are no longer meaningful - for task in [ - chunks_vdb_task, - entity_relation_task, - full_docs_task, - text_chunks_task, - ]: - if not task.done(): - task.cancel() - # Update document status to failed - await self.doc_status.upsert( - { - doc_id: { - "status": DocStatus.FAILED, - "error": str(e), - "content": status_doc.content, - "content_summary": status_doc.content_summary, - "content_length": status_doc.content_length, - "created_at": status_doc.created_at, - "updated_at": datetime.now().isoformat(), - "file_path": file_path, - } - } - ) - - # 3. iterate over batches - total_batches = len(docs_batches) - for batch_idx, docs_batch in enumerate(docs_batches): - current_batch = batch_idx + 1 - log_message = ( - f"Start processing batch {current_batch} of {total_batches}." ) - logger.info(log_message) - pipeline_status["cur_batch"] = current_batch - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - doc_tasks = [] - for doc_id, status_doc in docs_batch: - doc_tasks.append( - process_document( - doc_id, - status_doc, - split_by_character, - split_by_character_only, - pipeline_status, - pipeline_status_lock, - ) - ) - - # Process documents in one batch parallelly - await asyncio.gather(*doc_tasks) - await self._insert_done() - - log_message = f"Completed batch {current_batch} of {total_batches}." - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + # 等待所有文档处理完成 + await asyncio.gather(*doc_tasks) # Check if there's a pending request to process more documents (with lock) has_pending_request = False From fc425f1397338f546c236c7c9cc4531a0fe0eb74 Mon Sep 17 00:00:00 2001 From: yangdx Date: Thu, 24 Apr 2025 14:00:43 +0800 Subject: [PATCH 2/4] Send all found files to pipeline at once --- lightrag/api/routers/document_routes.py | 24 +++--------------------- 1 file changed, 3 insertions(+), 21 deletions(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 0111e237..de471f29 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -674,27 +674,9 @@ async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager): if not new_files: return - # Get MAX_PARALLEL_INSERT from global_args - max_parallel = global_args.max_parallel_insert - # Calculate batch size as 2 * MAX_PARALLEL_INSERT - batch_size = 2 * max_parallel - - # Process files in batches - for i in range(0, total_files, batch_size): - batch_files = new_files[i : i + batch_size] - batch_num = i // batch_size + 1 - total_batches = (total_files + batch_size - 1) // batch_size - - logger.info( - f"Processing batch {batch_num}/{total_batches} with {len(batch_files)} files" - ) - await pipeline_index_files(rag, batch_files) - - # Log progress - processed = min(i + batch_size, total_files) - logger.info( - f"Processed {processed}/{total_files} files ({processed/total_files*100:.1f}%)" - ) + # Process all files at once + await pipeline_index_files(rag, new_files) + logger.info(f"Processed {total_files}/{total_files} files (100.0%)") except Exception as e: logger.error(f"Error during scanning process: {str(e)}") From 3aab5b41f2b15880882bb86c4b5d31d63c0ba422 Mon Sep 17 00:00:00 2001 From: yangdx Date: Thu, 24 Apr 2025 14:15:10 +0800 Subject: [PATCH 3/4] Fix linting --- lightrag/api/routers/document_routes.py | 4 +-- lightrag/lightrag.py | 40 +++++++++++++------------ 2 files changed, 23 insertions(+), 21 deletions(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index de471f29..d4421cf6 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -675,8 +675,8 @@ async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager): return # Process all files at once - await pipeline_index_files(rag, new_files) - logger.info(f"Processed {total_files}/{total_files} files (100.0%)") + await pipeline_index_files(rag, new_files) + logger.info(f"Scanning process completed: {total_files} files Processed.") except Exception as e: logger.error(f"Error during scanning process: {str(e)}") diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 742db478..584d14b4 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -841,8 +841,8 @@ class LightRAG: "job_name": "Default Job", "job_start": datetime.now().isoformat(), "docs": 0, - "batchs": 0, # 将被重新定义为待处理的文件总数 - "cur_batch": 0, # 将被重新定义为当前处理的第几个文件 + "batchs": 0, # Total number of files to be processed + "cur_batch": 0, # Number of files already processed "request_pending": False, # Clear any previous request "latest_message": "", } @@ -870,10 +870,10 @@ class LightRAG: log_message = f"Processing {len(to_process_docs)} document(s)" logger.info(log_message) - # 更新 pipeline_status,batchs 现在表示待处理的文件总数 + # Update pipeline_status, batchs now represents the total number of files to be processed pipeline_status["docs"] = len(to_process_docs) pipeline_status["batchs"] = len(to_process_docs) - pipeline_status["cur_batch"] = 0 # 初始化为0,表示当前已处理的文件数量 + pipeline_status["cur_batch"] = 0 pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) @@ -887,9 +887,9 @@ class LightRAG: job_name = f"{path_prefix}[{total_files} files]" pipeline_status["job_name"] = job_name - # 创建一个计数器,用于跟踪已处理的文件数量 + # Create a counter to track the number of processed files processed_count = 0 - # 创建一个信号量,限制并发处理文件的数量 + # Create a semaphore to limit the number of concurrent file processing semaphore = asyncio.Semaphore(self.max_parallel_insert) async def process_document( @@ -902,21 +902,23 @@ class LightRAG: semaphore: asyncio.Semaphore, ) -> None: """Process single document""" - # 使用信号量控制并发 async with semaphore: nonlocal processed_count - # 获取并保存当前文件的序号 current_file_number = 0 try: # Get file path from status document - file_path = getattr(status_doc, "file_path", "unknown_source") + file_path = getattr( + status_doc, "file_path", "unknown_source" + ) async with pipeline_status_lock: - # 更新已处理文件数量并保存当前文件序号 + # Update processed file count and save current file number processed_count += 1 - current_file_number = processed_count # 保存当前文件的序号 + current_file_number = ( + processed_count # Save the current file number + ) pipeline_status["cur_batch"] = processed_count - + log_message = f"Processing file ({current_file_number}/{total_files}): {file_path}" logger.info(log_message) pipeline_status["history_messages"].append(log_message) @@ -998,16 +1000,16 @@ class LightRAG: } } ) - - # 每处理完一个文件,就调用一次 _insert_done + + # Call _insert_done after processing each file await self._insert_done() - + async with pipeline_status_lock: log_message = f"Completed processing file {current_file_number}/{total_files}: {file_path}" logger.info(log_message) pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) - + except Exception as e: # Log error and update pipeline status error_msg = f"Failed to process document {doc_id}: {traceback.format_exc()}" @@ -1042,7 +1044,7 @@ class LightRAG: } ) - # 创建所有文档的处理任务 + # Create processing tasks for all documents doc_tasks = [] for doc_id, status_doc in to_process_docs.items(): doc_tasks.append( @@ -1057,7 +1059,7 @@ class LightRAG: ) ) - # 等待所有文档处理完成 + # Wait for all document processing to complete await asyncio.gather(*doc_tasks) # Check if there's a pending request to process more documents (with lock) @@ -1113,7 +1115,7 @@ class LightRAG: ) except Exception as e: logger.error( - f"Failed to extract entities and relationships : {traceback.format_exc()} 。" + f"Failed to extract entities and relationships : {traceback.format_exc()}" ) raise e From 7f0997290165c913ac6843f11ece6bcf9a1c81a3 Mon Sep 17 00:00:00 2001 From: yangdx Date: Thu, 24 Apr 2025 15:46:25 +0800 Subject: [PATCH 4/4] Optimize error log --- lightrag/lightrag.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 584d14b4..e6507c37 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1114,9 +1114,11 @@ class LightRAG: llm_response_cache=self.llm_response_cache, ) except Exception as e: - logger.error( - f"Failed to extract entities and relationships : {traceback.format_exc()}" - ) + error_msg = f"Failed to extract entities and relationships: {str(e)}" + logger.error(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append(error_msg) raise e async def _insert_done(