From 1949c4a2c635898fbae4d7bfb481f2b5d0bbedb7 Mon Sep 17 00:00:00 2001 From: Yannick Stephan Date: Sun, 9 Feb 2025 15:24:52 +0100 Subject: [PATCH] improved get status --- lightrag/lightrag.py | 96 ++++++++++++++++++++------------------------ 1 file changed, 43 insertions(+), 53 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 00174fcd..3bfa2649 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -13,7 +13,6 @@ from .operate import ( kg_query_with_keywords, mix_kg_vector_query, naive_query, - # local_query,global_query,hybrid_query,, ) from .utils import ( @@ -28,6 +27,7 @@ from .base import ( BaseGraphStorage, BaseKVStorage, BaseVectorStorage, + DocProcessingStatus, DocStatus, DocStatusStorage, QueryParam, @@ -396,7 +396,9 @@ class LightRAG: split_by_character is None, this parameter is ignored. """ await self.apipeline_enqueue_documents(string_or_strings) - await self.apipeline_process_enqueue_documents(split_by_character, split_by_character_only) + await self.apipeline_process_enqueue_documents( + split_by_character, split_by_character_only + ) def insert_custom_chunks(self, full_text: str, text_chunks: list[str]): loop = always_get_an_event_loop() @@ -468,12 +470,12 @@ class LightRAG: async def apipeline_enqueue_documents(self, string_or_strings: str | list[str]): """ Pipeline for Processing Documents - + 1. Remove duplicate contents from the list 2. Generate document IDs and initial status 3. Filter out already processed documents - 4. Enqueue document in status - """ + 4. Enqueue document in status + """ if isinstance(string_or_strings, str): string_or_strings = [string_or_strings] @@ -512,26 +514,6 @@ class LightRAG: await self.doc_status.upsert(new_docs) logger.info(f"Stored {len(new_docs)} new unique documents") - async def _get_pending_documents(self) -> list[str]: - """Fetch all pending and failed documents.""" - to_process_doc_keys: list[str] = [] - - # Fetch failed documents - failed_docs = await self.doc_status.get_failed_docs() - if failed_docs: - to_process_doc_keys.extend([doc["id"] for doc in failed_docs]) - - # Fetch pending documents - pending_docs = await self.doc_status.get_pending_docs() - if pending_docs: - to_process_doc_keys.extend([doc["id"] for doc in pending_docs]) - - if not to_process_doc_keys: - logger.info("All documents have been processed or are duplicates") - return [] - - return to_process_doc_keys - async def apipeline_process_enqueue_documents( self, split_by_character: str | None = None, @@ -548,46 +530,53 @@ class LightRAG: 4. Update the document status """ # 1. get all pending and failed documents - pending_doc_ids = await self._get_pending_documents() + to_process_docs: dict[str, DocProcessingStatus] = {} - if not pending_doc_ids: + # Fetch failed documents + failed_docs = await self.doc_status.get_failed_docs() + to_process_docs.update(failed_docs) + + pending_docs = await self.doc_status.get_pending_docs() + to_process_docs.update(pending_docs) + + if not to_process_docs: logger.info("All documents have been processed or are duplicates") - return + return + to_process_docs_ids = list(to_process_docs.keys()) # Get allready processed documents (text chunks and full docs) - text_chunks_processed_doc_ids = await self.text_chunks.filter_keys( - pending_doc_ids - ) - full_docs_processed_doc_ids = await self.full_docs.filter_keys(pending_doc_ids) + text_chunks_processed_doc_ids = await self.text_chunks.filter_keys(to_process_docs_ids) + full_docs_processed_doc_ids = await self.full_docs.filter_keys(to_process_docs_ids) # 2. split docs into chunks, insert chunks, update doc status batch_size = self.addon_params.get("insert_batch_size", 10) batch_docs_list = [ - pending_doc_ids[i : i + batch_size] - for i in range(0, len(pending_doc_ids), batch_size) + list(to_process_docs.items())[i : i + batch_size] + for i in range(0, len(to_process_docs), batch_size) ] # 3. iterate over batches tasks: dict[str, list[Coroutine[Any, Any, None]]] = {} - for batch_idx, doc_ids in tqdm_async( + for batch_idx, ids_doc_processing_status in tqdm_async( enumerate(batch_docs_list), desc="Process Batches", ): # 4. iterate over batch - for doc_id in tqdm_async( - doc_ids, + for id_doc_processing_status in tqdm_async( + ids_doc_processing_status, desc=f"Process Batch {batch_idx}", ): # Update status in processing - status_doc = await self.doc_status.get_by_id(doc_id) + id_doc, status_doc = id_doc_processing_status + await self.doc_status.upsert( { - doc_id: { + id_doc: { "status": DocStatus.PROCESSING, "updated_at": datetime.now().isoformat(), - "content_summary": status_doc["content_summary"], - "content_length": status_doc["content_length"], - "created_at": status_doc["created_at"], + "content_summary": status_doc.content_summary, + "content_length": status_doc.content_length, + "created_at": status_doc.created_at, } } ) @@ -595,10 +584,10 @@ class LightRAG: chunks: dict[str, Any] = { compute_mdhash_id(dp["content"], prefix="chunk-"): { **dp, - "full_doc_id": doc_id, + "full_doc_id": id_doc_processing_status, } for dp in self.chunking_func( - status_doc["content"], + status_doc.content, split_by_character, split_by_character_only, self.chunk_overlap_token_size, @@ -611,25 +600,26 @@ class LightRAG: await self._process_entity_relation_graph(chunks) await self.chunks_vdb.upsert(chunks) + tasks[id_doc] = [] # Check if document already processed the doc - if doc_id not in full_docs_processed_doc_ids: - tasks[doc_id].append( + if id_doc not in full_docs_processed_doc_ids: + tasks[id_doc].append( self.full_docs.upsert( - {doc_id: {"content": status_doc["content"]}} + {id_doc: {"content": status_doc.content}} ) ) # Check if chunks already processed the doc - if doc_id not in text_chunks_processed_doc_ids: - tasks[doc_id].append(self.text_chunks.upsert(chunks)) + if id_doc not in text_chunks_processed_doc_ids: + tasks[id_doc].append(self.text_chunks.upsert(chunks)) # Process document (text chunks and full docs) in parallel - for doc_id, task in tasks.items(): + for id_doc_processing_status, task in tasks.items(): try: await asyncio.gather(*task) await self.doc_status.upsert( { - doc_id: { + id_doc_processing_status: { "status": DocStatus.PROCESSED, "chunks_count": len(chunks), "updated_at": datetime.now().isoformat(), @@ -639,10 +629,10 @@ class LightRAG: await self._insert_done() except Exception as e: - logger.error(f"Failed to process document {doc_id}: {str(e)}") + logger.error(f"Failed to process document {id_doc_processing_status}: {str(e)}") await self.doc_status.upsert( { - doc_id: { + id_doc_processing_status: { "status": DocStatus.FAILED, "error": str(e), "updated_at": datetime.now().isoformat(),