fixed the processed
This commit is contained in:
@@ -497,11 +497,8 @@ class LightRAG:
|
|||||||
|
|
||||||
# 3. Filter out already processed documents
|
# 3. Filter out already processed documents
|
||||||
add_doc_keys: set[str] = set()
|
add_doc_keys: set[str] = set()
|
||||||
for doc_id in new_docs.keys():
|
excluded_ids = await self.doc_status.all_keys()
|
||||||
current_doc = await self.doc_status.get_by_id(doc_id)
|
add_doc_keys = new_docs.keys() - excluded_ids
|
||||||
if not current_doc or current_doc["status"] == DocStatus.FAILED:
|
|
||||||
add_doc_keys.add(doc_id)
|
|
||||||
|
|
||||||
new_docs = {k: v for k, v in new_docs.items() if k in add_doc_keys}
|
new_docs = {k: v for k, v in new_docs.items() if k in add_doc_keys}
|
||||||
|
|
||||||
if not new_docs:
|
if not new_docs:
|
||||||
@@ -509,7 +506,7 @@ class LightRAG:
|
|||||||
return
|
return
|
||||||
|
|
||||||
# 4. Store original document
|
# 4. Store original document
|
||||||
await self.full_docs.upsert(new_docs)
|
await self.doc_status.upsert(new_docs)
|
||||||
logger.info(f"Stored {len(new_docs)} new unique documents")
|
logger.info(f"Stored {len(new_docs)} new unique documents")
|
||||||
|
|
||||||
async def apipeline_process_chunks(
|
async def apipeline_process_chunks(
|
||||||
@@ -533,21 +530,22 @@ class LightRAG:
|
|||||||
to_process_doc_keys: list[str] = []
|
to_process_doc_keys: list[str] = []
|
||||||
|
|
||||||
# Process failes
|
# Process failes
|
||||||
to_process_docs = await self.full_docs.get_by_status(status=DocStatus.FAILED)
|
to_process_docs = await self.doc_status.get_by_status(status=DocStatus.FAILED)
|
||||||
if to_process_docs:
|
if to_process_docs:
|
||||||
to_process_doc_keys.extend([doc["id"] for doc in to_process_docs])
|
to_process_doc_keys.extend([doc["id"] for doc in to_process_docs])
|
||||||
|
|
||||||
# Process Pending
|
# Process Pending
|
||||||
to_process_docs = await self.full_docs.get_by_status(status=DocStatus.PENDING)
|
to_process_docs = await self.doc_status.get_by_status(status=DocStatus.PENDING)
|
||||||
if to_process_docs:
|
if to_process_docs:
|
||||||
to_process_doc_keys.extend([doc["id"] for doc in to_process_docs])
|
to_process_doc_keys.extend([doc["id"] for doc in to_process_docs])
|
||||||
|
|
||||||
if not to_process_doc_keys:
|
if not to_process_doc_keys:
|
||||||
logger.info("All documents have been processed or are duplicates")
|
logger.info("All documents have been processed or are duplicates")
|
||||||
return
|
return
|
||||||
|
|
||||||
full_docs_ids = await self.full_docs.get_by_ids(to_process_doc_keys)
|
# If included in text_chunks is all processed, return
|
||||||
new_docs = {doc["id"]: doc for doc in full_docs_ids or []}
|
new_docs_ids = await self.text_chunks.filter_keys(to_process_doc_keys)
|
||||||
|
new_docs = await self.doc_status.get_by_ids(list(new_docs_ids))
|
||||||
|
|
||||||
if not new_docs:
|
if not new_docs:
|
||||||
logger.info("All documents have been processed or are duplicates")
|
logger.info("All documents have been processed or are duplicates")
|
||||||
@@ -555,12 +553,10 @@ class LightRAG:
|
|||||||
|
|
||||||
# 2. split docs into chunks, insert chunks, update doc status
|
# 2. split docs into chunks, insert chunks, update doc status
|
||||||
batch_size = self.addon_params.get("insert_batch_size", 10)
|
batch_size = self.addon_params.get("insert_batch_size", 10)
|
||||||
for i in range(0, len(new_docs), batch_size):
|
batch_docs_list = [new_docs[i:i+batch_size] for i in range(0, len(new_docs), batch_size)]
|
||||||
batch_docs = dict(list(new_docs.items())[i : i + batch_size])
|
for i, el in enumerate(batch_docs_list):
|
||||||
|
items = ((k, v) for d in el for k, v in d.items())
|
||||||
for doc_id, doc in tqdm_async(
|
for doc_id, doc in tqdm_async(items, desc=f"Level 1 - Spliting doc in batch {i // len(batch_docs_list) + 1}"):
|
||||||
batch_docs.items(), desc=f"Processing batch {i // batch_size + 1}"
|
|
||||||
):
|
|
||||||
doc_status: dict[str, Any] = {
|
doc_status: dict[str, Any] = {
|
||||||
"content_summary": doc["content_summary"],
|
"content_summary": doc["content_summary"],
|
||||||
"content_length": doc["content_length"],
|
"content_length": doc["content_length"],
|
||||||
@@ -570,7 +566,7 @@ class LightRAG:
|
|||||||
}
|
}
|
||||||
try:
|
try:
|
||||||
await self.doc_status.upsert({doc_id: doc_status})
|
await self.doc_status.upsert({doc_id: doc_status})
|
||||||
|
|
||||||
# Generate chunks from document
|
# Generate chunks from document
|
||||||
chunks: dict[str, Any] = {
|
chunks: dict[str, Any] = {
|
||||||
compute_mdhash_id(dp["content"], prefix="chunk-"): {
|
compute_mdhash_id(dp["content"], prefix="chunk-"): {
|
||||||
@@ -588,16 +584,21 @@ class LightRAG:
|
|||||||
}
|
}
|
||||||
|
|
||||||
# Update status with chunks information
|
# Update status with chunks information
|
||||||
|
|
||||||
|
await self._process_entity_relation_graph(chunks)
|
||||||
|
await self.chunks_vdb.upsert(chunks)
|
||||||
|
await self.text_chunks.upsert(chunks)
|
||||||
doc_status.update(
|
doc_status.update(
|
||||||
{
|
{
|
||||||
|
"status": DocStatus.PROCESSED,
|
||||||
"chunks_count": len(chunks),
|
"chunks_count": len(chunks),
|
||||||
"updated_at": datetime.now().isoformat(),
|
"updated_at": datetime.now().isoformat(),
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
await self.chunks_vdb.upsert(chunks)
|
|
||||||
await self.doc_status.upsert({doc_id: doc_status})
|
await self.doc_status.upsert({doc_id: doc_status})
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
# Update status with failed information
|
||||||
doc_status.update(
|
doc_status.update(
|
||||||
{
|
{
|
||||||
"status": DocStatus.FAILED,
|
"status": DocStatus.FAILED,
|
||||||
@@ -611,6 +612,26 @@ class LightRAG:
|
|||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
||||||
|
async def _process_entity_relation_graph(self, chunk: dict[str, Any]) -> None:
|
||||||
|
try:
|
||||||
|
new_kg = await extract_entities(
|
||||||
|
chunk,
|
||||||
|
knowledge_graph_inst=self.chunk_entity_relation_graph,
|
||||||
|
entity_vdb=self.entities_vdb,
|
||||||
|
relationships_vdb=self.relationships_vdb,
|
||||||
|
llm_response_cache=self.llm_response_cache,
|
||||||
|
global_config=asdict(self),
|
||||||
|
)
|
||||||
|
if new_kg is None:
|
||||||
|
logger.info("No entities or relationships extracted!")
|
||||||
|
else:
|
||||||
|
self.chunk_entity_relation_graph = new_kg
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to extract entities and relationships")
|
||||||
|
raise e
|
||||||
|
|
||||||
async def apipeline_process_extract_graph(self):
|
async def apipeline_process_extract_graph(self):
|
||||||
"""
|
"""
|
||||||
Process pending or failed chunks to extract entities and relationships.
|
Process pending or failed chunks to extract entities and relationships.
|
||||||
@@ -633,12 +654,12 @@ class LightRAG:
|
|||||||
to_process_doc_keys: list[str] = []
|
to_process_doc_keys: list[str] = []
|
||||||
|
|
||||||
# Process failes
|
# Process failes
|
||||||
to_process_docs = await self.full_docs.get_by_status(status=DocStatus.FAILED)
|
to_process_docs = await self.doc_status.get_by_status(status=DocStatus.FAILED)
|
||||||
if to_process_docs:
|
if to_process_docs:
|
||||||
to_process_doc_keys.extend([doc["id"] for doc in to_process_docs])
|
to_process_doc_keys.extend([doc["id"] for doc in to_process_docs])
|
||||||
|
|
||||||
# Process Pending
|
# Process Pending
|
||||||
to_process_docs = await self.full_docs.get_by_status(status=DocStatus.PENDING)
|
to_process_docs = await self.doc_status.get_by_status(status=DocStatus.PENDING)
|
||||||
if to_process_docs:
|
if to_process_docs:
|
||||||
to_process_doc_keys.extend([doc["id"] for doc in to_process_docs])
|
to_process_doc_keys.extend([doc["id"] for doc in to_process_docs])
|
||||||
|
|
||||||
@@ -658,29 +679,31 @@ class LightRAG:
|
|||||||
chunks: dict[str, Any] = {
|
chunks: dict[str, Any] = {
|
||||||
i["id"]: i for i in await self.text_chunks.get_by_ids([chunk_id])
|
i["id"]: i for i in await self.text_chunks.get_by_ids([chunk_id])
|
||||||
}
|
}
|
||||||
# Extract and store entities and relationships
|
async def _process_chunk(chunk_id: str):
|
||||||
try:
|
chunks: dict[str, Any] = {
|
||||||
maybe_new_kg = await extract_entities(
|
i["id"]: i for i in await self.text_chunks.get_by_ids([chunk_id])
|
||||||
chunks,
|
}
|
||||||
knowledge_graph_inst=self.chunk_entity_relation_graph,
|
|
||||||
entity_vdb=self.entities_vdb,
|
# Extract and store entities and relationships
|
||||||
relationships_vdb=self.relationships_vdb,
|
try:
|
||||||
llm_response_cache=self.llm_response_cache,
|
maybe_new_kg = await extract_entities(
|
||||||
global_config=asdict(self),
|
chunks,
|
||||||
)
|
knowledge_graph_inst=self.chunk_entity_relation_graph,
|
||||||
if maybe_new_kg is None:
|
entity_vdb=self.entities_vdb,
|
||||||
logger.warning("No entities or relationships extracted!")
|
relationships_vdb=self.relationships_vdb,
|
||||||
# Update status to processed
|
llm_response_cache=self.llm_response_cache,
|
||||||
await self.text_chunks.upsert(
|
global_config=asdict(self),
|
||||||
{chunk_id: {"status": DocStatus.PROCESSED}}
|
)
|
||||||
)
|
if maybe_new_kg is None:
|
||||||
except Exception as e:
|
logger.warning("No entities or relationships extracted!")
|
||||||
logger.error("Failed to extract entities and relationships")
|
# Update status to processed
|
||||||
# Mark as failed if any step fails
|
await self.text_chunks.upsert(chunks)
|
||||||
await self.text_chunks.upsert(
|
await self.doc_status.upsert({chunk_id: {"status": DocStatus.PROCESSED}})
|
||||||
{chunk_id: {"status": DocStatus.FAILED}}
|
except Exception as e:
|
||||||
)
|
logger.error("Failed to extract entities and relationships")
|
||||||
raise e
|
# Mark as failed if any step fails
|
||||||
|
await self.doc_status.upsert({chunk_id: {"status": DocStatus.FAILED}})
|
||||||
|
raise e
|
||||||
|
|
||||||
with tqdm_async(
|
with tqdm_async(
|
||||||
total=len(to_process_doc_keys),
|
total=len(to_process_doc_keys),
|
||||||
|
Reference in New Issue
Block a user