From abcdcd5a73b15bb49b2b6c5d302a78901f6c6b37 Mon Sep 17 00:00:00 2001 From: Yannick Stephan Date: Sun, 9 Feb 2025 14:36:49 +0100 Subject: [PATCH] cleaned docs --- examples/lightrag_oracle_demo.py | 5 ++- lightrag/lightrag.py | 53 +++++++++++++++++--------------- 2 files changed, 31 insertions(+), 27 deletions(-) diff --git a/examples/lightrag_oracle_demo.py b/examples/lightrag_oracle_demo.py index 47020fd6..f5269fae 100644 --- a/examples/lightrag_oracle_demo.py +++ b/examples/lightrag_oracle_demo.py @@ -121,9 +121,8 @@ async def main(): texts = [x for x in all_text.split("\n") if x] # New mode use pipeline - await rag.apipeline_process_documents(texts) - await rag.apipeline_process_chunks() - await rag.apipeline_process_extract_graph() + await rag.apipeline_enqueue_documents(texts) + await rag.apipeline_process_enqueue_documents() # Old method use ainsert # await rag.ainsert(texts) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index b2049d7f..ef4a9db5 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -395,7 +395,9 @@ class LightRAG: split_by_character is None, this parameter is ignored. """ await self.apipeline_process_documents(string_or_strings) - await self.apipeline_process_enqueue_documents(split_by_character, split_by_character_only) + await self.apipeline_process_enqueue_documents( + split_by_character, split_by_character_only + ) def insert_custom_chunks(self, full_text: str, text_chunks: list[str]): loop = always_get_an_event_loop() @@ -511,23 +513,23 @@ class LightRAG: async def _get_pending_documents(self) -> list[str]: """Fetch all pending and failed documents.""" to_process_doc_keys: list[str] = [] - + # Fetch failed documents failed_docs = await self.doc_status.get_by_status(status=DocStatus.FAILED) if failed_docs: to_process_doc_keys.extend([doc["id"] for doc in failed_docs]) - + # Fetch pending documents pending_docs = await self.doc_status.get_by_status(status=DocStatus.PENDING) if pending_docs: to_process_doc_keys.extend([doc["id"] for doc in pending_docs]) - + if not to_process_doc_keys: logger.info("All documents have been processed or are duplicates") return [] - + return to_process_doc_keys - + async def apipeline_process_enqueue_documents( self, split_by_character: str | None = None, @@ -535,36 +537,39 @@ class LightRAG: ) -> None: """ Process pending documents by splitting them into chunks, processing - each chunk for entity and relation extraction, and updating the + each chunk for entity and relation extraction, and updating the document status. - + 1. Get all pending and failed documents 2. Split document content into chunks 3. Process each chunk for entity and relation extraction 4. Update the document status - """ + """ # 1. get all pending and failed documents pending_doc_ids = await self._get_pending_documents() - + if not pending_doc_ids: logger.info("All documents have been processed or are duplicates") return - + # Get allready processed documents (text chunks and full docs) - text_chunks_processed_doc_ids = await self.text_chunks.filter_keys(pending_doc_ids) + text_chunks_processed_doc_ids = await self.text_chunks.filter_keys( + pending_doc_ids + ) full_docs_processed_doc_ids = await self.full_docs.filter_keys(pending_doc_ids) # 2. split docs into chunks, insert chunks, update doc status batch_size = self.addon_params.get("insert_batch_size", 10) batch_docs_list = [ - pending_doc_ids[i : i + batch_size] for i in range(0, len(pending_doc_ids), batch_size) + pending_doc_ids[i : i + batch_size] + for i in range(0, len(pending_doc_ids), batch_size) ] - + # 3. iterate over batches tasks: dict[str, list[Coroutine[Any, Any, None]]] = {} for batch_idx, doc_ids in tqdm_async( enumerate(batch_docs_list), - desc=f"Process Batches", + desc="Process Batches", ): # 4. iterate over batch for doc_id in tqdm_async( @@ -580,7 +585,7 @@ class LightRAG: "updated_at": datetime.now().isoformat(), "content_summary": status_doc["content_summary"], "content_length": status_doc["content_length"], - "created_at": status_doc["created_at"], + "created_at": status_doc["created_at"], } } ) @@ -599,22 +604,24 @@ class LightRAG: self.tiktoken_model_name, ) } - - # Ensure chunk insertion and graph processing happen sequentially, not in parallel + + # Ensure chunk insertion and graph processing happen sequentially, not in parallel await self._process_entity_relation_graph(chunks) await self.chunks_vdb.upsert(chunks) # Check if document already processed the doc if doc_id not in full_docs_processed_doc_ids: tasks[doc_id].append( - self.full_docs.upsert({doc_id: {"content": status_doc["content"]}}) + self.full_docs.upsert( + {doc_id: {"content": status_doc["content"]}} + ) ) - + # Check if chunks already processed the doc if doc_id not in text_chunks_processed_doc_ids: tasks[doc_id].append(self.text_chunks.upsert(chunks)) - # Process document (text chunks and full docs) in parallel + # Process document (text chunks and full docs) in parallel for doc_id, task in tasks.items(): try: await asyncio.gather(*task) @@ -630,9 +637,7 @@ class LightRAG: await self._insert_done() except Exception as e: - logger.error( - f"Failed to process document {doc_id}: {str(e)}" - ) + logger.error(f"Failed to process document {doc_id}: {str(e)}") await self.doc_status.upsert( { doc_id: {