cleaned docs

This commit is contained in:
Yannick Stephan
2025-02-09 14:36:49 +01:00
parent 37943a65a3
commit abcdcd5a73
2 changed files with 31 additions and 27 deletions

View File

@@ -121,9 +121,8 @@ async def main():
texts = [x for x in all_text.split("\n") if x] texts = [x for x in all_text.split("\n") if x]
# New mode use pipeline # New mode use pipeline
await rag.apipeline_process_documents(texts) await rag.apipeline_enqueue_documents(texts)
await rag.apipeline_process_chunks() await rag.apipeline_process_enqueue_documents()
await rag.apipeline_process_extract_graph()
# Old method use ainsert # Old method use ainsert
# await rag.ainsert(texts) # await rag.ainsert(texts)

View File

@@ -395,7 +395,9 @@ class LightRAG:
split_by_character is None, this parameter is ignored. split_by_character is None, this parameter is ignored.
""" """
await self.apipeline_process_documents(string_or_strings) await self.apipeline_process_documents(string_or_strings)
await self.apipeline_process_enqueue_documents(split_by_character, split_by_character_only) await self.apipeline_process_enqueue_documents(
split_by_character, split_by_character_only
)
def insert_custom_chunks(self, full_text: str, text_chunks: list[str]): def insert_custom_chunks(self, full_text: str, text_chunks: list[str]):
loop = always_get_an_event_loop() loop = always_get_an_event_loop()
@@ -511,23 +513,23 @@ class LightRAG:
async def _get_pending_documents(self) -> list[str]: async def _get_pending_documents(self) -> list[str]:
"""Fetch all pending and failed documents.""" """Fetch all pending and failed documents."""
to_process_doc_keys: list[str] = [] to_process_doc_keys: list[str] = []
# Fetch failed documents # Fetch failed documents
failed_docs = await self.doc_status.get_by_status(status=DocStatus.FAILED) failed_docs = await self.doc_status.get_by_status(status=DocStatus.FAILED)
if failed_docs: if failed_docs:
to_process_doc_keys.extend([doc["id"] for doc in failed_docs]) to_process_doc_keys.extend([doc["id"] for doc in failed_docs])
# Fetch pending documents # Fetch pending documents
pending_docs = await self.doc_status.get_by_status(status=DocStatus.PENDING) pending_docs = await self.doc_status.get_by_status(status=DocStatus.PENDING)
if pending_docs: if pending_docs:
to_process_doc_keys.extend([doc["id"] for doc in pending_docs]) to_process_doc_keys.extend([doc["id"] for doc in pending_docs])
if not to_process_doc_keys: if not to_process_doc_keys:
logger.info("All documents have been processed or are duplicates") logger.info("All documents have been processed or are duplicates")
return [] return []
return to_process_doc_keys return to_process_doc_keys
async def apipeline_process_enqueue_documents( async def apipeline_process_enqueue_documents(
self, self,
split_by_character: str | None = None, split_by_character: str | None = None,
@@ -535,36 +537,39 @@ class LightRAG:
) -> None: ) -> None:
""" """
Process pending documents by splitting them into chunks, processing Process pending documents by splitting them into chunks, processing
each chunk for entity and relation extraction, and updating the each chunk for entity and relation extraction, and updating the
document status. document status.
1. Get all pending and failed documents 1. Get all pending and failed documents
2. Split document content into chunks 2. Split document content into chunks
3. Process each chunk for entity and relation extraction 3. Process each chunk for entity and relation extraction
4. Update the document status 4. Update the document status
""" """
# 1. get all pending and failed documents # 1. get all pending and failed documents
pending_doc_ids = await self._get_pending_documents() pending_doc_ids = await self._get_pending_documents()
if not pending_doc_ids: if not pending_doc_ids:
logger.info("All documents have been processed or are duplicates") logger.info("All documents have been processed or are duplicates")
return return
# Get allready processed documents (text chunks and full docs) # Get allready processed documents (text chunks and full docs)
text_chunks_processed_doc_ids = await self.text_chunks.filter_keys(pending_doc_ids) text_chunks_processed_doc_ids = await self.text_chunks.filter_keys(
pending_doc_ids
)
full_docs_processed_doc_ids = await self.full_docs.filter_keys(pending_doc_ids) full_docs_processed_doc_ids = await self.full_docs.filter_keys(pending_doc_ids)
# 2. split docs into chunks, insert chunks, update doc status # 2. split docs into chunks, insert chunks, update doc status
batch_size = self.addon_params.get("insert_batch_size", 10) batch_size = self.addon_params.get("insert_batch_size", 10)
batch_docs_list = [ batch_docs_list = [
pending_doc_ids[i : i + batch_size] for i in range(0, len(pending_doc_ids), batch_size) pending_doc_ids[i : i + batch_size]
for i in range(0, len(pending_doc_ids), batch_size)
] ]
# 3. iterate over batches # 3. iterate over batches
tasks: dict[str, list[Coroutine[Any, Any, None]]] = {} tasks: dict[str, list[Coroutine[Any, Any, None]]] = {}
for batch_idx, doc_ids in tqdm_async( for batch_idx, doc_ids in tqdm_async(
enumerate(batch_docs_list), enumerate(batch_docs_list),
desc=f"Process Batches", desc="Process Batches",
): ):
# 4. iterate over batch # 4. iterate over batch
for doc_id in tqdm_async( for doc_id in tqdm_async(
@@ -580,7 +585,7 @@ class LightRAG:
"updated_at": datetime.now().isoformat(), "updated_at": datetime.now().isoformat(),
"content_summary": status_doc["content_summary"], "content_summary": status_doc["content_summary"],
"content_length": status_doc["content_length"], "content_length": status_doc["content_length"],
"created_at": status_doc["created_at"], "created_at": status_doc["created_at"],
} }
} }
) )
@@ -599,22 +604,24 @@ class LightRAG:
self.tiktoken_model_name, self.tiktoken_model_name,
) )
} }
# Ensure chunk insertion and graph processing happen sequentially, not in parallel # Ensure chunk insertion and graph processing happen sequentially, not in parallel
await self._process_entity_relation_graph(chunks) await self._process_entity_relation_graph(chunks)
await self.chunks_vdb.upsert(chunks) await self.chunks_vdb.upsert(chunks)
# Check if document already processed the doc # Check if document already processed the doc
if doc_id not in full_docs_processed_doc_ids: if doc_id not in full_docs_processed_doc_ids:
tasks[doc_id].append( tasks[doc_id].append(
self.full_docs.upsert({doc_id: {"content": status_doc["content"]}}) self.full_docs.upsert(
{doc_id: {"content": status_doc["content"]}}
)
) )
# Check if chunks already processed the doc # Check if chunks already processed the doc
if doc_id not in text_chunks_processed_doc_ids: if doc_id not in text_chunks_processed_doc_ids:
tasks[doc_id].append(self.text_chunks.upsert(chunks)) tasks[doc_id].append(self.text_chunks.upsert(chunks))
# Process document (text chunks and full docs) in parallel # Process document (text chunks and full docs) in parallel
for doc_id, task in tasks.items(): for doc_id, task in tasks.items():
try: try:
await asyncio.gather(*task) await asyncio.gather(*task)
@@ -630,9 +637,7 @@ class LightRAG:
await self._insert_done() await self._insert_done()
except Exception as e: except Exception as e:
logger.error( logger.error(f"Failed to process document {doc_id}: {str(e)}")
f"Failed to process document {doc_id}: {str(e)}"
)
await self.doc_status.upsert( await self.doc_status.upsert(
{ {
doc_id: { doc_id: {