From 2929d1fc390e2a89c7ca0951817460cca08670e2 Mon Sep 17 00:00:00 2001 From: Yannick Stephan Date: Sat, 8 Feb 2025 23:52:27 +0100 Subject: [PATCH] fixed pipe --- lightrag/lightrag.py | 44 +++++++++++++++++++++----------------------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index f2b3ba68..0ae47d1f 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -586,7 +586,7 @@ class LightRAG: if update_storage: await self._insert_done() - async def apipeline_process_documents(self, string_or_strings): + async def apipeline_process_documents(self, string_or_strings: str | list[str]): """Input list remove duplicates, generate document IDs and initial pendding status, filter out already stored documents, store docs Args: string_or_strings: Single document string or list of document strings @@ -628,20 +628,24 @@ class LightRAG: # 4. Store original document for doc_id, doc in new_docs.items(): - await self.full_docs.upsert({doc_id: {"content": doc["content"]}}) - await self.full_docs.change_status(doc_id, DocStatus.PENDING) + await self.full_docs.upsert( + { + doc_id: { + "content": doc["content"], + "status": DocStatus.PENDING + } + } + ) logger.info(f"Stored {len(new_docs)} new unique documents") async def apipeline_process_chunks(self): """Get pendding documents, split into chunks,insert chunks""" # 1. get all pending and failed documents _todo_doc_keys = [] - _failed_doc = await self.full_docs.get_by_status_and_ids( - status=DocStatus.FAILED, ids=None - ) - _pendding_doc = await self.full_docs.get_by_status_and_ids( - status=DocStatus.PENDING, ids=None - ) + + _failed_doc = await self.full_docs.get_by_status_and_ids(status=DocStatus.FAILED) + _pendding_doc = await self.full_docs.get_by_status_and_ids(status=DocStatus.PENDING) + if _failed_doc: _todo_doc_keys.extend([doc["id"] for doc in _failed_doc]) if _pendding_doc: @@ -671,7 +675,7 @@ class LightRAG: compute_mdhash_id(dp["content"], prefix="chunk-"): { **dp, "full_doc_id": doc_id, - "status": DocStatus.PENDING, + "status": DocStatus.PROCESSED, } for dp in chunking_by_token_size( doc["content"], @@ -681,17 +685,15 @@ class LightRAG: ) } chunk_cnt += len(chunks) - await self.text_chunks.upsert(chunks) - await self.text_chunks.change_status(doc_id, DocStatus.PROCESSING) - + try: # Store chunks in vector database await self.chunks_vdb.upsert(chunks) # Update doc status - await self.full_docs.change_status(doc_id, DocStatus.PROCESSED) + await self.text_chunks.upsert({**chunks, "status": DocStatus.PENDING}) except Exception as e: # Mark as failed if any step fails - await self.full_docs.change_status(doc_id, DocStatus.FAILED) + await self.text_chunks.upsert({**chunks, "status": DocStatus.FAILED}) raise e except Exception as e: import traceback @@ -705,12 +707,8 @@ class LightRAG: """Get pendding or failed chunks, extract entities and relationships from each chunk""" # 1. get all pending and failed chunks _todo_chunk_keys = [] - _failed_chunks = await self.text_chunks.get_by_status_and_ids( - status=DocStatus.FAILED, ids=None - ) - _pendding_chunks = await self.text_chunks.get_by_status_and_ids( - status=DocStatus.PENDING, ids=None - ) + _failed_chunks = await self.text_chunks.get_by_status_and_ids(status=DocStatus.FAILED) + _pendding_chunks = await self.text_chunks.get_by_status_and_ids(status=DocStatus.PENDING) if _failed_chunks: _todo_chunk_keys.extend([doc["id"] for doc in _failed_chunks]) if _pendding_chunks: @@ -744,11 +742,11 @@ class LightRAG: if maybe_new_kg is None: logger.info("No entities or relationships extracted!") # Update status to processed - await self.text_chunks.change_status(chunk_id, DocStatus.PROCESSED) + await self.text_chunks.upsert({chunk_id: {"status": DocStatus.PROCESSED}}) except Exception as e: logger.error("Failed to extract entities and relationships") # Mark as failed if any step fails - await self.text_chunks.change_status(chunk_id, DocStatus.FAILED) + await self.text_chunks.upsert({chunk_id: {"status": DocStatus.FAILED}}) raise e with tqdm_async(