make more clear
This commit is contained in:
@@ -509,6 +509,26 @@ class LightRAG:
|
|||||||
await self.doc_status.upsert(new_docs)
|
await self.doc_status.upsert(new_docs)
|
||||||
logger.info(f"Stored {len(new_docs)} new unique documents")
|
logger.info(f"Stored {len(new_docs)} new unique documents")
|
||||||
|
|
||||||
|
async def _get_pending_documents(self) -> list[str]:
|
||||||
|
"""Fetch all pending and failed documents."""
|
||||||
|
to_process_doc_keys: list[str] = []
|
||||||
|
|
||||||
|
# Fetch failed documents
|
||||||
|
failed_docs = await self.doc_status.get_by_status(status=DocStatus.FAILED)
|
||||||
|
if failed_docs:
|
||||||
|
to_process_doc_keys.extend([doc["id"] for doc in failed_docs])
|
||||||
|
|
||||||
|
# Fetch pending documents
|
||||||
|
pending_docs = await self.doc_status.get_by_status(status=DocStatus.PENDING)
|
||||||
|
if pending_docs:
|
||||||
|
to_process_doc_keys.extend([doc["id"] for doc in pending_docs])
|
||||||
|
|
||||||
|
if not to_process_doc_keys:
|
||||||
|
logger.info("All documents have been processed or are duplicates")
|
||||||
|
return []
|
||||||
|
|
||||||
|
return to_process_doc_keys
|
||||||
|
|
||||||
async def apipeline_process_chunks(
|
async def apipeline_process_chunks(
|
||||||
self,
|
self,
|
||||||
split_by_character: str | None = None,
|
split_by_character: str | None = None,
|
||||||
@@ -527,52 +547,36 @@ class LightRAG:
|
|||||||
when split_by_character is None, this parameter is ignored.
|
when split_by_character is None, this parameter is ignored.
|
||||||
"""
|
"""
|
||||||
# 1. get all pending and failed documents
|
# 1. get all pending and failed documents
|
||||||
to_process_doc_keys: list[str] = []
|
pending_doc_ids = await self._get_pending_documents()
|
||||||
|
|
||||||
# Process failes
|
if not pending_doc_ids:
|
||||||
to_process_docs = await self.doc_status.get_by_status(status=DocStatus.FAILED)
|
|
||||||
if to_process_docs:
|
|
||||||
to_process_doc_keys.extend([doc["id"] for doc in to_process_docs])
|
|
||||||
|
|
||||||
# Process Pending
|
|
||||||
to_process_docs = await self.doc_status.get_by_status(status=DocStatus.PENDING)
|
|
||||||
if to_process_docs:
|
|
||||||
to_process_doc_keys.extend([doc["id"] for doc in to_process_docs])
|
|
||||||
|
|
||||||
if not to_process_doc_keys:
|
|
||||||
logger.info("All documents have been processed or are duplicates")
|
|
||||||
return
|
|
||||||
|
|
||||||
# If included in text_chunks is all processed, return
|
|
||||||
new_docs = await self.doc_status.get_by_ids(to_process_doc_keys)
|
|
||||||
text_chunks_new_docs_ids = await self.text_chunks.filter_keys(
|
|
||||||
to_process_doc_keys
|
|
||||||
)
|
|
||||||
full_docs_new_docs_ids = await self.full_docs.filter_keys(to_process_doc_keys)
|
|
||||||
|
|
||||||
if not new_docs:
|
|
||||||
logger.info("All documents have been processed or are duplicates")
|
logger.info("All documents have been processed or are duplicates")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Get allready processed documents (text chunks and full docs)
|
||||||
|
text_chunks_processed_doc_ids = await self.text_chunks.filter_keys(pending_doc_ids)
|
||||||
|
full_docs_processed_doc_ids = await self.full_docs.filter_keys(pending_doc_ids)
|
||||||
|
|
||||||
# 2. split docs into chunks, insert chunks, update doc status
|
# 2. split docs into chunks, insert chunks, update doc status
|
||||||
batch_size = self.addon_params.get("insert_batch_size", 10)
|
batch_size = self.addon_params.get("insert_batch_size", 10)
|
||||||
batch_docs_list = [
|
batch_docs_list = [
|
||||||
new_docs[i : i + batch_size] for i in range(0, len(new_docs), batch_size)
|
pending_doc_ids[i : i + batch_size] for i in range(0, len(pending_doc_ids), batch_size)
|
||||||
]
|
]
|
||||||
for i, el in enumerate(batch_docs_list):
|
batch_len = len(batch_docs_list) + 1
|
||||||
items = ((k, v) for d in el for k, v in d.items())
|
# 3. iterate over batches
|
||||||
|
tasks: dict[str, list[Coroutine[Any, Any, None]]] = {}
|
||||||
tasks: dict[str, list[Coroutine[Any, Any, None]]] = {}
|
for batch_idx, doc_ids in enumerate(batch_docs_list):
|
||||||
|
|
||||||
doc_status: dict[str, Any] = {
|
doc_status: dict[str, Any] = {
|
||||||
"status": DocStatus.PROCESSING,
|
"status": DocStatus.PROCESSING,
|
||||||
"updated_at": datetime.now().isoformat(),
|
"updated_at": datetime.now().isoformat(),
|
||||||
}
|
}
|
||||||
|
|
||||||
for doc_id, doc in tqdm_async(
|
for doc_id in tqdm_async(
|
||||||
items,
|
doc_ids,
|
||||||
desc=f"Level 1 - Spliting doc in batch {i // len(batch_docs_list) + 1}",
|
desc=f"Level 1 - Batch {batch_idx} / {batch_len}",
|
||||||
):
|
):
|
||||||
|
doc = await self.doc_status.get_by_id(doc_id)
|
||||||
doc_status.update(
|
doc_status.update(
|
||||||
{
|
{
|
||||||
"content_summary": doc["content_summary"],
|
"content_summary": doc["content_summary"],
|
||||||
@@ -580,7 +584,6 @@ class LightRAG:
|
|||||||
"created_at": doc["created_at"],
|
"created_at": doc["created_at"],
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
await self.doc_status.upsert({doc_id: doc_status})
|
await self.doc_status.upsert({doc_id: doc_status})
|
||||||
|
|
||||||
# Generate chunks from document
|
# Generate chunks from document
|
||||||
@@ -614,12 +617,12 @@ class LightRAG:
|
|||||||
)
|
)
|
||||||
await self.doc_status.upsert({doc_id: doc_status})
|
await self.doc_status.upsert({doc_id: doc_status})
|
||||||
|
|
||||||
if doc_id not in full_docs_new_docs_ids:
|
if doc_id not in full_docs_processed_doc_ids:
|
||||||
tasks[doc_id].append(
|
tasks[doc_id].append(
|
||||||
self.full_docs.upsert({doc_id: {"content": doc["content"]}})
|
self.full_docs.upsert({doc_id: {"content": doc["content"]}})
|
||||||
)
|
)
|
||||||
|
|
||||||
if doc_id not in text_chunks_new_docs_ids:
|
if doc_id not in text_chunks_processed_doc_ids:
|
||||||
tasks[doc_id].append(self.text_chunks.upsert(chunks))
|
tasks[doc_id].append(self.text_chunks.upsert(chunks))
|
||||||
|
|
||||||
for doc_id, task in tasks.items():
|
for doc_id, task in tasks.items():
|
||||||
|
Reference in New Issue
Block a user