diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 1b1afdfc..8d9c1678 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -706,10 +706,27 @@ class LightRAG: pipeline_status_lock = get_pipeline_status_lock() # Check if another process is already processing the queue - process_documents = False async with pipeline_status_lock: # Ensure only one worker is processing documents if not pipeline_status.get("busy", False): + # 先检查是否有需要处理的文档 + processing_docs, failed_docs, pending_docs = await asyncio.gather( + self.doc_status.get_docs_by_status(DocStatus.PROCESSING), + self.doc_status.get_docs_by_status(DocStatus.FAILED), + self.doc_status.get_docs_by_status(DocStatus.PENDING), + ) + + to_process_docs: dict[str, DocProcessingStatus] = {} + to_process_docs.update(processing_docs) + to_process_docs.update(failed_docs) + to_process_docs.update(pending_docs) + + # 如果没有需要处理的文档,直接返回,保留 pipeline_status 中的内容不变 + if not to_process_docs: + logger.info("No documents to process") + return + + # 有文档需要处理,更新 pipeline_status pipeline_status.update( { "busy": True, @@ -723,37 +740,18 @@ class LightRAG: } ) # Cleaning history_messages without breaking it as a shared list object - try: - del pipeline_status["history_messages"][:] - except Exception as e: - logger.error(f"Error clearing pipeline history_messages: {str(e)}") - - process_documents = True + del pipeline_status["history_messages"][:] else: # Another process is busy, just set request flag and return pipeline_status["request_pending"] = True logger.info( "Another process is already processing the document queue. Request queued." ) - - if not process_documents: - return + return try: # Process documents until no more documents or requests while True: - # 1. Get all pending, failed, and abnormally terminated processing documents. - processing_docs, failed_docs, pending_docs = await asyncio.gather( - self.doc_status.get_docs_by_status(DocStatus.PROCESSING), - self.doc_status.get_docs_by_status(DocStatus.FAILED), - self.doc_status.get_docs_by_status(DocStatus.PENDING), - ) - - to_process_docs: dict[str, DocProcessingStatus] = {} - to_process_docs.update(processing_docs) - to_process_docs.update(failed_docs) - to_process_docs.update(pending_docs) - if not to_process_docs: log_message = "All documents have been processed or are duplicates" logger.info(log_message) @@ -761,20 +759,18 @@ class LightRAG: pipeline_status["history_messages"].append(log_message) break - # Update pipeline status with document count (with lock) - pipeline_status["docs"] = len(to_process_docs) - # 2. split docs into chunks, insert chunks, update doc status docs_batches = [ list(to_process_docs.items())[i : i + self.max_parallel_insert] for i in range(0, len(to_process_docs), self.max_parallel_insert) ] - # Update pipeline status with batch information (directly, as it's atomic) - pipeline_status.update({"batchs": len(docs_batches), "cur_batch": 0}) - log_message = f"Number of batches to process: {len(docs_batches)}." logger.info(log_message) + + # Update pipeline status with current batch information + pipeline_status["docs"] += len(to_process_docs) + pipeline_status["batchs"] += len(docs_batches) pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) @@ -782,7 +778,7 @@ class LightRAG: # 3. iterate over batches for batch_idx, docs_batch in enumerate(docs_batches): # Update current batch in pipeline status (directly, as it's atomic) - pipeline_status["cur_batch"] = batch_idx + 1 + pipeline_status["cur_batch"] += 1 async def batch( batch_idx: int, @@ -895,6 +891,18 @@ class LightRAG: pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) + # 获取新的待处理文档 + processing_docs, failed_docs, pending_docs = await asyncio.gather( + self.doc_status.get_docs_by_status(DocStatus.PROCESSING), + self.doc_status.get_docs_by_status(DocStatus.FAILED), + self.doc_status.get_docs_by_status(DocStatus.PENDING), + ) + + to_process_docs = {} + to_process_docs.update(processing_docs) + to_process_docs.update(failed_docs) + to_process_docs.update(pending_docs) + finally: log_message = "Document processing pipeline completed" logger.info(log_message)