updated naming
This commit is contained in:
@@ -396,7 +396,7 @@ class LightRAG:
|
|||||||
split_by_character is None, this parameter is ignored.
|
split_by_character is None, this parameter is ignored.
|
||||||
"""
|
"""
|
||||||
await self.apipeline_process_documents(string_or_strings)
|
await self.apipeline_process_documents(string_or_strings)
|
||||||
await self.apipeline_process_chunks(split_by_character, split_by_character_only)
|
await self.apipeline_process_enqueue_documents(split_by_character, split_by_character_only)
|
||||||
|
|
||||||
def insert_custom_chunks(self, full_text: str, text_chunks: list[str]):
|
def insert_custom_chunks(self, full_text: str, text_chunks: list[str]):
|
||||||
loop = always_get_an_event_loop()
|
loop = always_get_an_event_loop()
|
||||||
@@ -465,7 +465,7 @@ class LightRAG:
|
|||||||
if update_storage:
|
if update_storage:
|
||||||
await self._insert_done()
|
await self._insert_done()
|
||||||
|
|
||||||
async def apipeline_process_documents(self, string_or_strings: str | list[str]):
|
async def apipeline_enqueue_documents(self, string_or_strings: str | list[str]):
|
||||||
"""Pipeline process documents
|
"""Pipeline process documents
|
||||||
|
|
||||||
1. Remove duplicate contents from the list
|
1. Remove duplicate contents from the list
|
||||||
@@ -505,7 +505,7 @@ class LightRAG:
|
|||||||
logger.info("All documents have been processed or are duplicates")
|
logger.info("All documents have been processed or are duplicates")
|
||||||
return
|
return
|
||||||
|
|
||||||
# 4. Store original document
|
# 4. Store status document
|
||||||
await self.doc_status.upsert(new_docs)
|
await self.doc_status.upsert(new_docs)
|
||||||
logger.info(f"Stored {len(new_docs)} new unique documents")
|
logger.info(f"Stored {len(new_docs)} new unique documents")
|
||||||
|
|
||||||
@@ -529,22 +529,20 @@ class LightRAG:
|
|||||||
|
|
||||||
return to_process_doc_keys
|
return to_process_doc_keys
|
||||||
|
|
||||||
async def apipeline_process_chunks(
|
async def apipeline_process_enqueue_documents(
|
||||||
self,
|
self,
|
||||||
split_by_character: str | None = None,
|
split_by_character: str | None = None,
|
||||||
split_by_character_only: bool = False,
|
split_by_character_only: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Pipeline process chunks
|
"""
|
||||||
|
Process pending documents by splitting them into chunks, processing
|
||||||
|
each chunk for entity and relation extraction, and updating the
|
||||||
|
document status.
|
||||||
|
|
||||||
1. Get pending documents
|
1. Get all pending and failed documents
|
||||||
2. Split documents into chunks
|
2. Split document content into chunks
|
||||||
3. Insert chunks
|
3. Process each chunk for entity and relation extraction
|
||||||
|
4. Update the document status
|
||||||
Args:
|
|
||||||
split_by_character (str | None): If not None, split the string by character, if chunk longer than
|
|
||||||
chunk_size, split the sub chunk by token size.
|
|
||||||
split_by_character_only (bool): If split_by_character_only is True, split the string by character only,
|
|
||||||
when split_by_character is None, this parameter is ignored.
|
|
||||||
"""
|
"""
|
||||||
# 1. get all pending and failed documents
|
# 1. get all pending and failed documents
|
||||||
pending_doc_ids = await self._get_pending_documents()
|
pending_doc_ids = await self._get_pending_documents()
|
||||||
@@ -612,10 +610,11 @@ class LightRAG:
|
|||||||
self.full_docs.upsert({doc_id: {"content": status_doc["content"]}})
|
self.full_docs.upsert({doc_id: {"content": status_doc["content"]}})
|
||||||
)
|
)
|
||||||
|
|
||||||
# check if chunks already processed the doc
|
# Check if chunks already processed the doc
|
||||||
if doc_id not in text_chunks_processed_doc_ids:
|
if doc_id not in text_chunks_processed_doc_ids:
|
||||||
tasks[doc_id].append(self.text_chunks.upsert(chunks))
|
tasks[doc_id].append(self.text_chunks.upsert(chunks))
|
||||||
|
|
||||||
|
# Process document (text chunks and full docs) in parallel
|
||||||
for doc_id, task in tasks.items():
|
for doc_id, task in tasks.items():
|
||||||
try:
|
try:
|
||||||
await asyncio.gather(*task)
|
await asyncio.gather(*task)
|
||||||
|
Reference in New Issue
Block a user