fixed pipe

This commit is contained in:
Yannick Stephan
2025-02-08 23:52:27 +01:00
parent 020fdecc73
commit 2929d1fc39

View File

@@ -586,7 +586,7 @@ class LightRAG:
if update_storage: if update_storage:
await self._insert_done() await self._insert_done()
async def apipeline_process_documents(self, string_or_strings): async def apipeline_process_documents(self, string_or_strings: str | list[str]):
"""Input list remove duplicates, generate document IDs and initial pendding status, filter out already stored documents, store docs """Input list remove duplicates, generate document IDs and initial pendding status, filter out already stored documents, store docs
Args: Args:
string_or_strings: Single document string or list of document strings string_or_strings: Single document string or list of document strings
@@ -628,20 +628,24 @@ class LightRAG:
# 4. Store original document # 4. Store original document
for doc_id, doc in new_docs.items(): for doc_id, doc in new_docs.items():
await self.full_docs.upsert({doc_id: {"content": doc["content"]}}) await self.full_docs.upsert(
await self.full_docs.change_status(doc_id, DocStatus.PENDING) {
doc_id: {
"content": doc["content"],
"status": DocStatus.PENDING
}
}
)
logger.info(f"Stored {len(new_docs)} new unique documents") logger.info(f"Stored {len(new_docs)} new unique documents")
async def apipeline_process_chunks(self): async def apipeline_process_chunks(self):
"""Get pendding documents, split into chunks,insert chunks""" """Get pendding documents, split into chunks,insert chunks"""
# 1. get all pending and failed documents # 1. get all pending and failed documents
_todo_doc_keys = [] _todo_doc_keys = []
_failed_doc = await self.full_docs.get_by_status_and_ids(
status=DocStatus.FAILED, ids=None _failed_doc = await self.full_docs.get_by_status_and_ids(status=DocStatus.FAILED)
) _pendding_doc = await self.full_docs.get_by_status_and_ids(status=DocStatus.PENDING)
_pendding_doc = await self.full_docs.get_by_status_and_ids(
status=DocStatus.PENDING, ids=None
)
if _failed_doc: if _failed_doc:
_todo_doc_keys.extend([doc["id"] for doc in _failed_doc]) _todo_doc_keys.extend([doc["id"] for doc in _failed_doc])
if _pendding_doc: if _pendding_doc:
@@ -671,7 +675,7 @@ class LightRAG:
compute_mdhash_id(dp["content"], prefix="chunk-"): { compute_mdhash_id(dp["content"], prefix="chunk-"): {
**dp, **dp,
"full_doc_id": doc_id, "full_doc_id": doc_id,
"status": DocStatus.PENDING, "status": DocStatus.PROCESSED,
} }
for dp in chunking_by_token_size( for dp in chunking_by_token_size(
doc["content"], doc["content"],
@@ -681,17 +685,15 @@ class LightRAG:
) )
} }
chunk_cnt += len(chunks) chunk_cnt += len(chunks)
await self.text_chunks.upsert(chunks)
await self.text_chunks.change_status(doc_id, DocStatus.PROCESSING)
try: try:
# Store chunks in vector database # Store chunks in vector database
await self.chunks_vdb.upsert(chunks) await self.chunks_vdb.upsert(chunks)
# Update doc status # Update doc status
await self.full_docs.change_status(doc_id, DocStatus.PROCESSED) await self.text_chunks.upsert({**chunks, "status": DocStatus.PENDING})
except Exception as e: except Exception as e:
# Mark as failed if any step fails # Mark as failed if any step fails
await self.full_docs.change_status(doc_id, DocStatus.FAILED) await self.text_chunks.upsert({**chunks, "status": DocStatus.FAILED})
raise e raise e
except Exception as e: except Exception as e:
import traceback import traceback
@@ -705,12 +707,8 @@ class LightRAG:
"""Get pendding or failed chunks, extract entities and relationships from each chunk""" """Get pendding or failed chunks, extract entities and relationships from each chunk"""
# 1. get all pending and failed chunks # 1. get all pending and failed chunks
_todo_chunk_keys = [] _todo_chunk_keys = []
_failed_chunks = await self.text_chunks.get_by_status_and_ids( _failed_chunks = await self.text_chunks.get_by_status_and_ids(status=DocStatus.FAILED)
status=DocStatus.FAILED, ids=None _pendding_chunks = await self.text_chunks.get_by_status_and_ids(status=DocStatus.PENDING)
)
_pendding_chunks = await self.text_chunks.get_by_status_and_ids(
status=DocStatus.PENDING, ids=None
)
if _failed_chunks: if _failed_chunks:
_todo_chunk_keys.extend([doc["id"] for doc in _failed_chunks]) _todo_chunk_keys.extend([doc["id"] for doc in _failed_chunks])
if _pendding_chunks: if _pendding_chunks:
@@ -744,11 +742,11 @@ class LightRAG:
if maybe_new_kg is None: if maybe_new_kg is None:
logger.info("No entities or relationships extracted!") logger.info("No entities or relationships extracted!")
# Update status to processed # Update status to processed
await self.text_chunks.change_status(chunk_id, DocStatus.PROCESSED) await self.text_chunks.upsert({chunk_id: {"status": DocStatus.PROCESSED}})
except Exception as e: except Exception as e:
logger.error("Failed to extract entities and relationships") logger.error("Failed to extract entities and relationships")
# Mark as failed if any step fails # Mark as failed if any step fails
await self.text_chunks.change_status(chunk_id, DocStatus.FAILED) await self.text_chunks.upsert({chunk_id: {"status": DocStatus.FAILED}})
raise e raise e
with tqdm_async( with tqdm_async(