diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 45352e21..742db478 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -841,8 +841,8 @@ class LightRAG: "job_name": "Default Job", "job_start": datetime.now().isoformat(), "docs": 0, - "batchs": 0, - "cur_batch": 0, + "batchs": 0, # 将被重新定义为待处理的文件总数 + "cur_batch": 0, # 将被重新定义为当前处理的第几个文件 "request_pending": False, # Clear any previous request "latest_message": "", } @@ -867,18 +867,13 @@ class LightRAG: pipeline_status["history_messages"].append(log_message) break - # 2. split docs into chunks, insert chunks, update doc status - docs_batches = [ - list(to_process_docs.items())[i : i + self.max_parallel_insert] - for i in range(0, len(to_process_docs), self.max_parallel_insert) - ] - - log_message = f"Processing {len(to_process_docs)} document(s) in {len(docs_batches)} batches" + log_message = f"Processing {len(to_process_docs)} document(s)" logger.info(log_message) - # Update pipeline status with current batch information + # 更新 pipeline_status,batchs 现在表示待处理的文件总数 pipeline_status["docs"] = len(to_process_docs) - pipeline_status["batchs"] = len(docs_batches) + pipeline_status["batchs"] = len(to_process_docs) + pipeline_status["cur_batch"] = 0 # 初始化为0,表示当前已处理的文件数量 pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) @@ -892,6 +887,11 @@ class LightRAG: job_name = f"{path_prefix}[{total_files} files]" pipeline_status["job_name"] = job_name + # 创建一个计数器,用于跟踪已处理的文件数量 + processed_count = 0 + # 创建一个信号量,限制并发处理文件的数量 + semaphore = asyncio.Semaphore(self.max_parallel_insert) + async def process_document( doc_id: str, status_doc: DocProcessingStatus, @@ -899,45 +899,95 @@ class LightRAG: split_by_character_only: bool, pipeline_status: dict, pipeline_status_lock: asyncio.Lock, + semaphore: asyncio.Semaphore, ) -> None: """Process single document""" - try: - # Get file path from status document - file_path = getattr(status_doc, "file_path", "unknown_source") + # 使用信号量控制并发 + async with semaphore: + nonlocal processed_count + # 获取并保存当前文件的序号 + current_file_number = 0 + try: + # Get file path from status document + file_path = getattr(status_doc, "file_path", "unknown_source") - async with pipeline_status_lock: - log_message = f"Processing file: {file_path}" - logger.info(log_message) - pipeline_status["history_messages"].append(log_message) - log_message = f"Processing d-id: {doc_id}" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + async with pipeline_status_lock: + # 更新已处理文件数量并保存当前文件序号 + processed_count += 1 + current_file_number = processed_count # 保存当前文件的序号 + pipeline_status["cur_batch"] = processed_count + + log_message = f"Processing file ({current_file_number}/{total_files}): {file_path}" + logger.info(log_message) + pipeline_status["history_messages"].append(log_message) + log_message = f"Processing d-id: {doc_id}" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) - # Generate chunks from document - chunks: dict[str, Any] = { - compute_mdhash_id(dp["content"], prefix="chunk-"): { - **dp, - "full_doc_id": doc_id, - "file_path": file_path, # Add file path to each chunk + # Generate chunks from document + chunks: dict[str, Any] = { + compute_mdhash_id(dp["content"], prefix="chunk-"): { + **dp, + "full_doc_id": doc_id, + "file_path": file_path, # Add file path to each chunk + } + for dp in self.chunking_func( + self.tokenizer, + status_doc.content, + split_by_character, + split_by_character_only, + self.chunk_overlap_token_size, + self.chunk_token_size, + ) } - for dp in self.chunking_func( - self.tokenizer, - status_doc.content, - split_by_character, - split_by_character_only, - self.chunk_overlap_token_size, - self.chunk_token_size, - ) - } - # Process document (text chunks and full docs) in parallel - # Create tasks with references for potential cancellation - doc_status_task = asyncio.create_task( - self.doc_status.upsert( + # Process document (text chunks and full docs) in parallel + # Create tasks with references for potential cancellation + doc_status_task = asyncio.create_task( + self.doc_status.upsert( + { + doc_id: { + "status": DocStatus.PROCESSING, + "chunks_count": len(chunks), + "content": status_doc.content, + "content_summary": status_doc.content_summary, + "content_length": status_doc.content_length, + "created_at": status_doc.created_at, + "updated_at": datetime.now().isoformat(), + "file_path": file_path, + } + } + ) + ) + chunks_vdb_task = asyncio.create_task( + self.chunks_vdb.upsert(chunks) + ) + entity_relation_task = asyncio.create_task( + self._process_entity_relation_graph( + chunks, pipeline_status, pipeline_status_lock + ) + ) + full_docs_task = asyncio.create_task( + self.full_docs.upsert( + {doc_id: {"content": status_doc.content}} + ) + ) + text_chunks_task = asyncio.create_task( + self.text_chunks.upsert(chunks) + ) + tasks = [ + doc_status_task, + chunks_vdb_task, + entity_relation_task, + full_docs_task, + text_chunks_task, + ] + await asyncio.gather(*tasks) + await self.doc_status.upsert( { doc_id: { - "status": DocStatus.PROCESSING, + "status": DocStatus.PROCESSED, "chunks_count": len(chunks), "content": status_doc.content, "content_summary": status_doc.content_summary, @@ -948,112 +998,67 @@ class LightRAG: } } ) - ) - chunks_vdb_task = asyncio.create_task( - self.chunks_vdb.upsert(chunks) - ) - entity_relation_task = asyncio.create_task( - self._process_entity_relation_graph( - chunks, pipeline_status, pipeline_status_lock - ) - ) - full_docs_task = asyncio.create_task( - self.full_docs.upsert( - {doc_id: {"content": status_doc.content}} - ) - ) - text_chunks_task = asyncio.create_task( - self.text_chunks.upsert(chunks) - ) - tasks = [ - doc_status_task, - chunks_vdb_task, - entity_relation_task, - full_docs_task, - text_chunks_task, - ] - await asyncio.gather(*tasks) - await self.doc_status.upsert( - { - doc_id: { - "status": DocStatus.PROCESSED, - "chunks_count": len(chunks), - "content": status_doc.content, - "content_summary": status_doc.content_summary, - "content_length": status_doc.content_length, - "created_at": status_doc.created_at, - "updated_at": datetime.now().isoformat(), - "file_path": file_path, + + # 每处理完一个文件,就调用一次 _insert_done + await self._insert_done() + + async with pipeline_status_lock: + log_message = f"Completed processing file {current_file_number}/{total_files}: {file_path}" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + except Exception as e: + # Log error and update pipeline status + error_msg = f"Failed to process document {doc_id}: {traceback.format_exc()}" + + logger.error(error_msg) + async with pipeline_status_lock: + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append(error_msg) + + # Cancel other tasks as they are no longer meaningful + for task in [ + chunks_vdb_task, + entity_relation_task, + full_docs_task, + text_chunks_task, + ]: + if not task.done(): + task.cancel() + # Update document status to failed + await self.doc_status.upsert( + { + doc_id: { + "status": DocStatus.FAILED, + "error": str(e), + "content": status_doc.content, + "content_summary": status_doc.content_summary, + "content_length": status_doc.content_length, + "created_at": status_doc.created_at, + "updated_at": datetime.now().isoformat(), + "file_path": file_path, + } } - } + ) + + # 创建所有文档的处理任务 + doc_tasks = [] + for doc_id, status_doc in to_process_docs.items(): + doc_tasks.append( + process_document( + doc_id, + status_doc, + split_by_character, + split_by_character_only, + pipeline_status, + pipeline_status_lock, + semaphore, ) - except Exception as e: - # Log error and update pipeline status - error_msg = f"Failed to process document {doc_id}: {traceback.format_exc()}" - - logger.error(error_msg) - async with pipeline_status_lock: - pipeline_status["latest_message"] = error_msg - pipeline_status["history_messages"].append(error_msg) - - # Cancel other tasks as they are no longer meaningful - for task in [ - chunks_vdb_task, - entity_relation_task, - full_docs_task, - text_chunks_task, - ]: - if not task.done(): - task.cancel() - # Update document status to failed - await self.doc_status.upsert( - { - doc_id: { - "status": DocStatus.FAILED, - "error": str(e), - "content": status_doc.content, - "content_summary": status_doc.content_summary, - "content_length": status_doc.content_length, - "created_at": status_doc.created_at, - "updated_at": datetime.now().isoformat(), - "file_path": file_path, - } - } - ) - - # 3. iterate over batches - total_batches = len(docs_batches) - for batch_idx, docs_batch in enumerate(docs_batches): - current_batch = batch_idx + 1 - log_message = ( - f"Start processing batch {current_batch} of {total_batches}." ) - logger.info(log_message) - pipeline_status["cur_batch"] = current_batch - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - doc_tasks = [] - for doc_id, status_doc in docs_batch: - doc_tasks.append( - process_document( - doc_id, - status_doc, - split_by_character, - split_by_character_only, - pipeline_status, - pipeline_status_lock, - ) - ) - - # Process documents in one batch parallelly - await asyncio.gather(*doc_tasks) - await self._insert_done() - - log_message = f"Completed batch {current_batch} of {total_batches}." - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + # 等待所有文档处理完成 + await asyncio.gather(*doc_tasks) # Check if there's a pending request to process more documents (with lock) has_pending_request = False