diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 19b1b1cb..9369376e 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -80,6 +80,18 @@ def initialize_share_data(workers: int = 1): # Mark as initialized _initialized = True + + # Initialize pipeline status for document indexing control + pipeline_namespace = get_namespace_data("pipeline_status") + pipeline_namespace.update({ + "busy": False, # Control concurrent processes + "job_name": "Default Job", # Current job name (indexing files/indexing texts) + "job_start": None, # Job start time + "docs": 0, # Total number of documents to be indexed + "batchs": 0, # Number of batches for processing documents + "cur_batch": 0, # Current processing batch + "request_pending": False, # Flag for pending request for processing + }) def try_initialize_namespace(namespace: str) -> bool: diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 72f31315..b95da952 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -273,8 +273,6 @@ class LightRAG: from lightrag.kg.shared_storage import ( initialize_share_data, - try_initialize_namespace, - get_namespace_data, ) initialize_share_data() @@ -672,117 +670,179 @@ class LightRAG: 3. Process each chunk for entity and relation extraction 4. Update the document status """ - # 1. Get all pending, failed, and abnormally terminated processing documents. - # Run the asynchronous status retrievals in parallel using asyncio.gather - processing_docs, failed_docs, pending_docs = await asyncio.gather( - self.doc_status.get_docs_by_status(DocStatus.PROCESSING), - self.doc_status.get_docs_by_status(DocStatus.FAILED), - self.doc_status.get_docs_by_status(DocStatus.PENDING), - ) - - to_process_docs: dict[str, DocProcessingStatus] = {} - to_process_docs.update(processing_docs) - to_process_docs.update(failed_docs) - to_process_docs.update(pending_docs) - - if not to_process_docs: - logger.info("All documents have been processed or are duplicates") + from lightrag.kg.shared_storage import get_namespace_data, get_storage_lock + + # Get pipeline status shared data and lock + pipeline_status = get_namespace_data("pipeline_status") + storage_lock = get_storage_lock() + + # Check if another process is already processing the queue + process_documents = False + with storage_lock: + if not pipeline_status.get("busy", False): + # No other process is busy, we can process documents + pipeline_status.update({ + "busy": True, + "job_name": "indexing files", + "job_start": datetime.now().isoformat(), + "docs": 0, + "batchs": 0, + "cur_batch": 0, + "request_pending": False # Clear any previous request + }) + process_documents = True + else: + # Another process is busy, just set request flag and return + pipeline_status["request_pending"] = True + logger.info("Another process is already processing the document queue. Request queued.") + + if not process_documents: return + + try: + # Process documents until no more documents or requests + while True: + # 1. Get all pending, failed, and abnormally terminated processing documents. + processing_docs, failed_docs, pending_docs = await asyncio.gather( + self.doc_status.get_docs_by_status(DocStatus.PROCESSING), + self.doc_status.get_docs_by_status(DocStatus.FAILED), + self.doc_status.get_docs_by_status(DocStatus.PENDING), + ) - # 2. split docs into chunks, insert chunks, update doc status - docs_batches = [ - list(to_process_docs.items())[i : i + self.max_parallel_insert] - for i in range(0, len(to_process_docs), self.max_parallel_insert) - ] + to_process_docs: dict[str, DocProcessingStatus] = {} + to_process_docs.update(processing_docs) + to_process_docs.update(failed_docs) + to_process_docs.update(pending_docs) - logger.info(f"Number of batches to process: {len(docs_batches)}.") + if not to_process_docs: + logger.info("All documents have been processed or are duplicates") + break - batches: list[Any] = [] - # 3. iterate over batches - for batch_idx, docs_batch in enumerate(docs_batches): + # Update pipeline status with document count (with lock) + with storage_lock: + pipeline_status["docs"] = len(to_process_docs) + + # 2. split docs into chunks, insert chunks, update doc status + docs_batches = [ + list(to_process_docs.items())[i : i + self.max_parallel_insert] + for i in range(0, len(to_process_docs), self.max_parallel_insert) + ] - async def batch( - batch_idx: int, - docs_batch: list[tuple[str, DocProcessingStatus]], - size_batch: int, - ) -> None: - logger.info(f"Start processing batch {batch_idx + 1} of {size_batch}.") - # 4. iterate over batch - for doc_id_processing_status in docs_batch: - doc_id, status_doc = doc_id_processing_status - # Generate chunks from document - chunks: dict[str, Any] = { - compute_mdhash_id(dp["content"], prefix="chunk-"): { - **dp, - "full_doc_id": doc_id, - } - for dp in self.chunking_func( - status_doc.content, - split_by_character, - split_by_character_only, - self.chunk_overlap_token_size, - self.chunk_token_size, - self.tiktoken_model_name, - ) - } - # Process document (text chunks and full docs) in parallel - tasks = [ - self.doc_status.upsert( - { - doc_id: { - "status": DocStatus.PROCESSING, - "updated_at": datetime.now().isoformat(), - "content": status_doc.content, - "content_summary": status_doc.content_summary, - "content_length": status_doc.content_length, - "created_at": status_doc.created_at, + # Update pipeline status with batch information (directly, as it's atomic) + pipeline_status.update({ + "batchs": len(docs_batches), + "cur_batch": 0 + }) + + logger.info(f"Number of batches to process: {len(docs_batches)}.") + + batches: list[Any] = [] + # 3. iterate over batches + for batch_idx, docs_batch in enumerate(docs_batches): + # Update current batch in pipeline status (directly, as it's atomic) + pipeline_status["cur_batch"] = batch_idx + 1 + + async def batch( + batch_idx: int, + docs_batch: list[tuple[str, DocProcessingStatus]], + size_batch: int, + ) -> None: + logger.info(f"Start processing batch {batch_idx + 1} of {size_batch}.") + # 4. iterate over batch + for doc_id_processing_status in docs_batch: + doc_id, status_doc = doc_id_processing_status + # Generate chunks from document + chunks: dict[str, Any] = { + compute_mdhash_id(dp["content"], prefix="chunk-"): { + **dp, + "full_doc_id": doc_id, } + for dp in self.chunking_func( + status_doc.content, + split_by_character, + split_by_character_only, + self.chunk_overlap_token_size, + self.chunk_token_size, + self.tiktoken_model_name, + ) } - ), - self.chunks_vdb.upsert(chunks), - self._process_entity_relation_graph(chunks), - self.full_docs.upsert( - {doc_id: {"content": status_doc.content}} - ), - self.text_chunks.upsert(chunks), - ] - try: - await asyncio.gather(*tasks) - await self.doc_status.upsert( - { - doc_id: { - "status": DocStatus.PROCESSED, - "chunks_count": len(chunks), - "content": status_doc.content, - "content_summary": status_doc.content_summary, - "content_length": status_doc.content_length, - "created_at": status_doc.created_at, - "updated_at": datetime.now().isoformat(), - } - } - ) - except Exception as e: - logger.error(f"Failed to process document {doc_id}: {str(e)}") - await self.doc_status.upsert( - { - doc_id: { - "status": DocStatus.FAILED, - "error": str(e), - "content": status_doc.content, - "content_summary": status_doc.content_summary, - "content_length": status_doc.content_length, - "created_at": status_doc.created_at, - "updated_at": datetime.now().isoformat(), - } - } - ) - continue - logger.info(f"Completed batch {batch_idx + 1} of {len(docs_batches)}.") + # Process document (text chunks and full docs) in parallel + tasks = [ + self.doc_status.upsert( + { + doc_id: { + "status": DocStatus.PROCESSING, + "updated_at": datetime.now().isoformat(), + "content": status_doc.content, + "content_summary": status_doc.content_summary, + "content_length": status_doc.content_length, + "created_at": status_doc.created_at, + } + } + ), + self.chunks_vdb.upsert(chunks), + self._process_entity_relation_graph(chunks), + self.full_docs.upsert( + {doc_id: {"content": status_doc.content}} + ), + self.text_chunks.upsert(chunks), + ] + try: + await asyncio.gather(*tasks) + await self.doc_status.upsert( + { + doc_id: { + "status": DocStatus.PROCESSED, + "chunks_count": len(chunks), + "content": status_doc.content, + "content_summary": status_doc.content_summary, + "content_length": status_doc.content_length, + "created_at": status_doc.created_at, + "updated_at": datetime.now().isoformat(), + } + } + ) + except Exception as e: + logger.error(f"Failed to process document {doc_id}: {str(e)}") + await self.doc_status.upsert( + { + doc_id: { + "status": DocStatus.FAILED, + "error": str(e), + "content": status_doc.content, + "content_summary": status_doc.content_summary, + "content_length": status_doc.content_length, + "created_at": status_doc.created_at, + "updated_at": datetime.now().isoformat(), + } + } + ) + continue + logger.info(f"Completed batch {batch_idx + 1} of {len(docs_batches)}.") - batches.append(batch(batch_idx, docs_batch, len(docs_batches))) + batches.append(batch(batch_idx, docs_batch, len(docs_batches))) - await asyncio.gather(*batches) - await self._insert_done() + await asyncio.gather(*batches) + await self._insert_done() + + # Check if there's a pending request to process more documents (with lock) + has_pending_request = False + with storage_lock: + has_pending_request = pipeline_status.get("request_pending", False) + if has_pending_request: + # Clear the request flag before checking for more documents + pipeline_status["request_pending"] = False + + if not has_pending_request: + break + + logger.info("Processing additional documents due to pending request") + + finally: + # Always reset busy status when done or if an exception occurs (with lock) + with storage_lock: + pipeline_status["busy"] = False + logger.info("Document processing pipeline completed") async def _process_entity_relation_graph(self, chunk: dict[str, Any]) -> None: try: