simplified process

This commit is contained in:
Yannick Stephan
2025-02-09 14:24:35 +01:00
parent 6a4a77bfe9
commit 5faae81417

View File

@@ -563,29 +563,29 @@ class LightRAG:
pending_doc_ids[i : i + batch_size] for i in range(0, len(pending_doc_ids), batch_size) pending_doc_ids[i : i + batch_size] for i in range(0, len(pending_doc_ids), batch_size)
] ]
batch_len = len(batch_docs_list) + 1 batch_len = len(batch_docs_list) + 1
# 3. iterate over batches # 3. iterate over batches
tasks: dict[str, list[Coroutine[Any, Any, None]]] = {} tasks: dict[str, list[Coroutine[Any, Any, None]]] = {}
for batch_idx, doc_ids in enumerate(batch_docs_list): for batch_idx, doc_ids in enumerate(batch_docs_list):
doc_status: dict[str, Any] = { # 4. iterate over batch
"status": DocStatus.PROCESSING,
"updated_at": datetime.now().isoformat(),
}
for doc_id in tqdm_async( for doc_id in tqdm_async(
doc_ids, doc_ids,
desc=f"Level 1 - Batch {batch_idx} / {batch_len}", desc=f"Level 1 - Batch {batch_idx} / {batch_len}",
): ):
doc = await self.doc_status.get_by_id(doc_id) # Update status in processing
doc_status.update( status_doc = await self.doc_status.get_by_id(doc_id)
await self.doc_status.upsert(
{ {
"content_summary": doc["content_summary"], doc_id: {
"content_length": doc["content_length"], "status": DocStatus.PROCESSING,
"created_at": doc["created_at"], "updated_at": datetime.now().isoformat(),
"content_summary": status_doc["content_summary"],
"content_length": status_doc["content_length"],
"created_at": status_doc["created_at"],
}
} }
) )
await self.doc_status.upsert({doc_id: doc_status})
# Generate chunks from document # Generate chunks from document
chunks: dict[str, Any] = { chunks: dict[str, Any] = {
compute_mdhash_id(dp["content"], prefix="chunk-"): { compute_mdhash_id(dp["content"], prefix="chunk-"): {
@@ -593,7 +593,7 @@ class LightRAG:
"full_doc_id": doc_id, "full_doc_id": doc_id,
} }
for dp in self.chunking_func( for dp in self.chunking_func(
doc["content"], status_doc["content"],
split_by_character, split_by_character,
split_by_character_only, split_by_character_only,
self.chunk_overlap_token_size, self.chunk_overlap_token_size,
@@ -601,57 +601,47 @@ class LightRAG:
self.tiktoken_model_name, self.tiktoken_model_name,
) )
} }
try:
# If fails it's failed on full doc and text chunks upset # Ensure chunk insertion and graph processing happen sequentially, not in parallel
if doc["status"] != DocStatus.FAILED:
# Ensure chunk insertion and graph processing happen sequentially
await self._process_entity_relation_graph(chunks) await self._process_entity_relation_graph(chunks)
await self.chunks_vdb.upsert(chunks) await self.chunks_vdb.upsert(chunks)
except Exception as e:
doc_status.update(
{
"status": DocStatus.PENDING,
"error": str(e),
"updated_at": datetime.now().isoformat(),
}
)
await self.doc_status.upsert({doc_id: doc_status})
# Check if document already processed the doc
if doc_id not in full_docs_processed_doc_ids: if doc_id not in full_docs_processed_doc_ids:
tasks[doc_id].append( tasks[doc_id].append(
self.full_docs.upsert({doc_id: {"content": doc["content"]}}) self.full_docs.upsert({doc_id: {"content": status_doc["content"]}})
) )
# check if chunks already processed the doc
if doc_id not in text_chunks_processed_doc_ids: if doc_id not in text_chunks_processed_doc_ids:
tasks[doc_id].append(self.text_chunks.upsert(chunks)) tasks[doc_id].append(self.text_chunks.upsert(chunks))
for doc_id, task in tasks.items(): for doc_id, task in tasks.items():
try: try:
await asyncio.gather(*task) await asyncio.gather(*task)
await self.doc_status.upsert(
# Update document status
doc_status.update(
{ {
doc_id: {
"status": DocStatus.PROCESSED, "status": DocStatus.PROCESSED,
"chunks_count": len(chunks), "chunks_count": len(chunks),
"updated_at": datetime.now().isoformat(), "updated_at": datetime.now().isoformat(),
} }
}
) )
await self.doc_status.upsert({doc_id: doc_status})
await self._insert_done() await self._insert_done()
except Exception as e: except Exception as e:
# Update status with failed information logger.error(
doc_status.update( f"Failed to process document {doc_id}: {str(e)}\n{traceback.format_exc()}"
)
await self.doc_status.upsert(
{ {
doc_id: {
"status": DocStatus.FAILED, "status": DocStatus.FAILED,
"error": str(e), "error": str(e),
"updated_at": datetime.now().isoformat(), "updated_at": datetime.now().isoformat(),
} }
) }
await self.doc_status.upsert({doc_id: doc_status})
logger.error(
f"Failed to process document {doc_id}: {str(e)}\n{traceback.format_exc()}"
) )
continue continue
@@ -674,102 +664,6 @@ class LightRAG:
logger.error("Failed to extract entities and relationships") logger.error("Failed to extract entities and relationships")
raise e raise e
# async def apipeline_process_extract_graph(self):
# """
# Process pending or failed chunks to extract entities and relationships.
# This method retrieves all chunks that are currently marked as pending or have previously failed.
# It then extracts entities and relationships from each chunk and updates the status accordingly.
# Steps:
# 1. Retrieve all pending and failed chunks.
# 2. For each chunk, attempt to extract entities and relationships.
# 3. Update the chunk's status to processed if successful, or failed if an error occurs.
# Raises:
# Exception: If there is an error during the extraction process.
# Returns:
# None
# """
# # 1. get all pending and failed chunks
# to_process_doc_keys: list[str] = []
# # Process failes
# to_process_docs = await self.doc_status.get_by_status(status=DocStatus.FAILED)
# if to_process_docs:
# to_process_doc_keys.extend([doc["id"] for doc in to_process_docs])
# # Process Pending
# to_process_docs = await self.doc_status.get_by_status(status=DocStatus.PENDING)
# if to_process_docs:
# to_process_doc_keys.extend([doc["id"] for doc in to_process_docs])
# if not to_process_doc_keys:
# logger.info("All documents have been processed or are duplicates")
# return
# # Process documents in batches
# batch_size = self.addon_params.get("insert_batch_size", 10)
# semaphore = asyncio.Semaphore(
# batch_size
# ) # Control the number of tasks that are processed simultaneously
# async def process_chunk(chunk_id: str):
# async with semaphore:
# chunks: dict[str, Any] = {
# i["id"]: i for i in await self.text_chunks.get_by_ids([chunk_id])
# }
# async def _process_chunk(chunk_id: str):
# chunks: dict[str, Any] = {
# i["id"]: i for i in await self.text_chunks.get_by_ids([chunk_id])
# }
# # Extract and store entities and relationships
# try:
# maybe_new_kg = await extract_entities(
# chunks,
# knowledge_graph_inst=self.chunk_entity_relation_graph,
# entity_vdb=self.entities_vdb,
# relationships_vdb=self.relationships_vdb,
# llm_response_cache=self.llm_response_cache,
# global_config=asdict(self),
# )
# if maybe_new_kg is None:
# logger.warning("No entities or relationships extracted!")
# # Update status to processed
# await self.text_chunks.upsert(chunks)
# await self.doc_status.upsert({chunk_id: {"status": DocStatus.PROCESSED}})
# except Exception as e:
# logger.error("Failed to extract entities and relationships")
# # Mark as failed if any step fails
# await self.doc_status.upsert({chunk_id: {"status": DocStatus.FAILED}})
# raise e
# with tqdm_async(
# total=len(to_process_doc_keys),
# desc="\nLevel 1 - Processing chunks",
# unit="chunk",
# position=0,
# ) as progress:
# tasks: list[asyncio.Task[None]] = []
# for chunk_id in to_process_doc_keys:
# task = asyncio.create_task(process_chunk(chunk_id))
# tasks.append(task)
# for future in asyncio.as_completed(tasks):
# await future
# progress.update(1)
# progress.set_postfix(
# {
# "LLM call": statistic_data["llm_call"],
# "LLM cache": statistic_data["llm_cache"],
# }
# )
# # Ensure all indexes are updated after each document
async def _insert_done(self): async def _insert_done(self):
tasks = [] tasks = []
for storage_inst in [ for storage_inst in [