From 6a4a77bfe95a7afdbc35e1f950e12e3181ec9dee Mon Sep 17 00:00:00 2001 From: Yannick Stephan Date: Sun, 9 Feb 2025 14:11:11 +0100 Subject: [PATCH] make more clear --- lightrag/lightrag.py | 75 +++++++++++++++++++++++--------------------- 1 file changed, 39 insertions(+), 36 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index f3a3ac9a..f9196cf3 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -509,6 +509,26 @@ class LightRAG: await self.doc_status.upsert(new_docs) logger.info(f"Stored {len(new_docs)} new unique documents") + async def _get_pending_documents(self) -> list[str]: + """Fetch all pending and failed documents.""" + to_process_doc_keys: list[str] = [] + + # Fetch failed documents + failed_docs = await self.doc_status.get_by_status(status=DocStatus.FAILED) + if failed_docs: + to_process_doc_keys.extend([doc["id"] for doc in failed_docs]) + + # Fetch pending documents + pending_docs = await self.doc_status.get_by_status(status=DocStatus.PENDING) + if pending_docs: + to_process_doc_keys.extend([doc["id"] for doc in pending_docs]) + + if not to_process_doc_keys: + logger.info("All documents have been processed or are duplicates") + return [] + + return to_process_doc_keys + async def apipeline_process_chunks( self, split_by_character: str | None = None, @@ -527,52 +547,36 @@ class LightRAG: when split_by_character is None, this parameter is ignored. """ # 1. get all pending and failed documents - to_process_doc_keys: list[str] = [] - - # Process failes - to_process_docs = await self.doc_status.get_by_status(status=DocStatus.FAILED) - if to_process_docs: - to_process_doc_keys.extend([doc["id"] for doc in to_process_docs]) - - # Process Pending - to_process_docs = await self.doc_status.get_by_status(status=DocStatus.PENDING) - if to_process_docs: - to_process_doc_keys.extend([doc["id"] for doc in to_process_docs]) - - if not to_process_doc_keys: - logger.info("All documents have been processed or are duplicates") - return - - # If included in text_chunks is all processed, return - new_docs = await self.doc_status.get_by_ids(to_process_doc_keys) - text_chunks_new_docs_ids = await self.text_chunks.filter_keys( - to_process_doc_keys - ) - full_docs_new_docs_ids = await self.full_docs.filter_keys(to_process_doc_keys) - - if not new_docs: + pending_doc_ids = await self._get_pending_documents() + + if not pending_doc_ids: logger.info("All documents have been processed or are duplicates") return + + # Get allready processed documents (text chunks and full docs) + text_chunks_processed_doc_ids = await self.text_chunks.filter_keys(pending_doc_ids) + full_docs_processed_doc_ids = await self.full_docs.filter_keys(pending_doc_ids) # 2. split docs into chunks, insert chunks, update doc status batch_size = self.addon_params.get("insert_batch_size", 10) batch_docs_list = [ - new_docs[i : i + batch_size] for i in range(0, len(new_docs), batch_size) + pending_doc_ids[i : i + batch_size] for i in range(0, len(pending_doc_ids), batch_size) ] - for i, el in enumerate(batch_docs_list): - items = ((k, v) for d in el for k, v in d.items()) - - tasks: dict[str, list[Coroutine[Any, Any, None]]] = {} - + batch_len = len(batch_docs_list) + 1 + # 3. iterate over batches + tasks: dict[str, list[Coroutine[Any, Any, None]]] = {} + for batch_idx, doc_ids in enumerate(batch_docs_list): + doc_status: dict[str, Any] = { "status": DocStatus.PROCESSING, "updated_at": datetime.now().isoformat(), } - for doc_id, doc in tqdm_async( - items, - desc=f"Level 1 - Spliting doc in batch {i // len(batch_docs_list) + 1}", + for doc_id in tqdm_async( + doc_ids, + desc=f"Level 1 - Batch {batch_idx} / {batch_len}", ): + doc = await self.doc_status.get_by_id(doc_id) doc_status.update( { "content_summary": doc["content_summary"], @@ -580,7 +584,6 @@ class LightRAG: "created_at": doc["created_at"], } ) - await self.doc_status.upsert({doc_id: doc_status}) # Generate chunks from document @@ -614,12 +617,12 @@ class LightRAG: ) await self.doc_status.upsert({doc_id: doc_status}) - if doc_id not in full_docs_new_docs_ids: + if doc_id not in full_docs_processed_doc_ids: tasks[doc_id].append( self.full_docs.upsert({doc_id: {"content": doc["content"]}}) ) - if doc_id not in text_chunks_new_docs_ids: + if doc_id not in text_chunks_processed_doc_ids: tasks[doc_id].append(self.text_chunks.upsert(chunks)) for doc_id, task in tasks.items():