improved get status

This commit is contained in:
Yannick Stephan
2025-02-09 15:24:52 +01:00
parent 948d21b41d
commit 1949c4a2c6

View File

@@ -13,7 +13,6 @@ from .operate import (
kg_query_with_keywords,
mix_kg_vector_query,
naive_query,
# local_query,global_query,hybrid_query,,
)
from .utils import (
@@ -28,6 +27,7 @@ from .base import (
BaseGraphStorage,
BaseKVStorage,
BaseVectorStorage,
DocProcessingStatus,
DocStatus,
DocStatusStorage,
QueryParam,
@@ -396,7 +396,9 @@ class LightRAG:
split_by_character is None, this parameter is ignored.
"""
await self.apipeline_enqueue_documents(string_or_strings)
await self.apipeline_process_enqueue_documents(split_by_character, split_by_character_only)
await self.apipeline_process_enqueue_documents(
split_by_character, split_by_character_only
)
def insert_custom_chunks(self, full_text: str, text_chunks: list[str]):
loop = always_get_an_event_loop()
@@ -512,26 +514,6 @@ class LightRAG:
await self.doc_status.upsert(new_docs)
logger.info(f"Stored {len(new_docs)} new unique documents")
async def _get_pending_documents(self) -> list[str]:
"""Fetch all pending and failed documents."""
to_process_doc_keys: list[str] = []
# Fetch failed documents
failed_docs = await self.doc_status.get_failed_docs()
if failed_docs:
to_process_doc_keys.extend([doc["id"] for doc in failed_docs])
# Fetch pending documents
pending_docs = await self.doc_status.get_pending_docs()
if pending_docs:
to_process_doc_keys.extend([doc["id"] for doc in pending_docs])
if not to_process_doc_keys:
logger.info("All documents have been processed or are duplicates")
return []
return to_process_doc_keys
async def apipeline_process_enqueue_documents(
self,
split_by_character: str | None = None,
@@ -548,46 +530,53 @@ class LightRAG:
4. Update the document status
"""
# 1. get all pending and failed documents
pending_doc_ids = await self._get_pending_documents()
to_process_docs: dict[str, DocProcessingStatus] = {}
if not pending_doc_ids:
# Fetch failed documents
failed_docs = await self.doc_status.get_failed_docs()
to_process_docs.update(failed_docs)
pending_docs = await self.doc_status.get_pending_docs()
to_process_docs.update(pending_docs)
if not to_process_docs:
logger.info("All documents have been processed or are duplicates")
return
to_process_docs_ids = list(to_process_docs.keys())
# Get allready processed documents (text chunks and full docs)
text_chunks_processed_doc_ids = await self.text_chunks.filter_keys(
pending_doc_ids
)
full_docs_processed_doc_ids = await self.full_docs.filter_keys(pending_doc_ids)
text_chunks_processed_doc_ids = await self.text_chunks.filter_keys(to_process_docs_ids)
full_docs_processed_doc_ids = await self.full_docs.filter_keys(to_process_docs_ids)
# 2. split docs into chunks, insert chunks, update doc status
batch_size = self.addon_params.get("insert_batch_size", 10)
batch_docs_list = [
pending_doc_ids[i : i + batch_size]
for i in range(0, len(pending_doc_ids), batch_size)
list(to_process_docs.items())[i : i + batch_size]
for i in range(0, len(to_process_docs), batch_size)
]
# 3. iterate over batches
tasks: dict[str, list[Coroutine[Any, Any, None]]] = {}
for batch_idx, doc_ids in tqdm_async(
for batch_idx, ids_doc_processing_status in tqdm_async(
enumerate(batch_docs_list),
desc="Process Batches",
):
# 4. iterate over batch
for doc_id in tqdm_async(
doc_ids,
for id_doc_processing_status in tqdm_async(
ids_doc_processing_status,
desc=f"Process Batch {batch_idx}",
):
# Update status in processing
status_doc = await self.doc_status.get_by_id(doc_id)
id_doc, status_doc = id_doc_processing_status
await self.doc_status.upsert(
{
doc_id: {
id_doc: {
"status": DocStatus.PROCESSING,
"updated_at": datetime.now().isoformat(),
"content_summary": status_doc["content_summary"],
"content_length": status_doc["content_length"],
"created_at": status_doc["created_at"],
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
}
}
)
@@ -595,10 +584,10 @@ class LightRAG:
chunks: dict[str, Any] = {
compute_mdhash_id(dp["content"], prefix="chunk-"): {
**dp,
"full_doc_id": doc_id,
"full_doc_id": id_doc_processing_status,
}
for dp in self.chunking_func(
status_doc["content"],
status_doc.content,
split_by_character,
split_by_character_only,
self.chunk_overlap_token_size,
@@ -611,25 +600,26 @@ class LightRAG:
await self._process_entity_relation_graph(chunks)
await self.chunks_vdb.upsert(chunks)
tasks[id_doc] = []
# Check if document already processed the doc
if doc_id not in full_docs_processed_doc_ids:
tasks[doc_id].append(
if id_doc not in full_docs_processed_doc_ids:
tasks[id_doc].append(
self.full_docs.upsert(
{doc_id: {"content": status_doc["content"]}}
{id_doc: {"content": status_doc.content}}
)
)
# Check if chunks already processed the doc
if doc_id not in text_chunks_processed_doc_ids:
tasks[doc_id].append(self.text_chunks.upsert(chunks))
if id_doc not in text_chunks_processed_doc_ids:
tasks[id_doc].append(self.text_chunks.upsert(chunks))
# Process document (text chunks and full docs) in parallel
for doc_id, task in tasks.items():
for id_doc_processing_status, task in tasks.items():
try:
await asyncio.gather(*task)
await self.doc_status.upsert(
{
doc_id: {
id_doc_processing_status: {
"status": DocStatus.PROCESSED,
"chunks_count": len(chunks),
"updated_at": datetime.now().isoformat(),
@@ -639,10 +629,10 @@ class LightRAG:
await self._insert_done()
except Exception as e:
logger.error(f"Failed to process document {doc_id}: {str(e)}")
logger.error(f"Failed to process document {id_doc_processing_status}: {str(e)}")
await self.doc_status.upsert(
{
doc_id: {
id_doc_processing_status: {
"status": DocStatus.FAILED,
"error": str(e),
"updated_at": datetime.now().isoformat(),