From 8cf9f04dbcb498ea73a6dbe7b84accb42f30678f Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 4 Mar 2025 14:59:50 +0800 Subject: [PATCH 1/6] Improved task handling and error management in LightRAG - Created tasks with references for cancellation - Prioritized entity relation task execution - Implemented task cancellation on failure - Added error logging to pipeline_status --- lightrag/lightrag.py | 39 +++++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index a36934e8..f15a7a3a 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -845,7 +845,8 @@ class LightRAG: ) } # Process document (text chunks and full docs) in parallel - tasks = [ + # Create tasks with references for potential cancellation + doc_status_task = asyncio.create_task( self.doc_status.upsert( { doc_id: { @@ -857,15 +858,20 @@ class LightRAG: "created_at": status_doc.created_at, } } - ), - self.chunks_vdb.upsert(chunks), - self._process_entity_relation_graph(chunks), - self.full_docs.upsert( - {doc_id: {"content": status_doc.content}} - ), - self.text_chunks.upsert(chunks), - ] + ) + ) + chunks_vdb_task = asyncio.create_task(self.chunks_vdb.upsert(chunks)) + entity_relation_task = asyncio.create_task(self._process_entity_relation_graph(chunks)) + full_docs_task = asyncio.create_task( + self.full_docs.upsert({doc_id: {"content": status_doc.content}}) + ) + text_chunks_task = asyncio.create_task(self.text_chunks.upsert(chunks)) + + tasks = [doc_status_task, chunks_vdb_task, entity_relation_task, full_docs_task, text_chunks_task] try: + # Wait for entity_relation_task first as it's critical + await entity_relation_task + # If successful, wait for other tasks await asyncio.gather(*tasks) await self.doc_status.upsert( { @@ -881,9 +887,18 @@ class LightRAG: } ) except Exception as e: - logger.error( - f"Failed to process document {doc_id}: {str(e)}" - ) + # Log error and update pipeline status + error_msg = f"Failed to process document {doc_id}: {str(e)}" + logger.error(error_msg) + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append(error_msg) + + # Cancel other tasks as they are no longer meaningful + for task in [chunks_vdb_task, full_docs_task, text_chunks_task]: + if not task.done(): + task.cancel() + + # Update document status to failed await self.doc_status.upsert( { doc_id: { From 735231d8515842bcf648c41b0ba84326587445df Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 4 Mar 2025 15:30:52 +0800 Subject: [PATCH 2/6] No need the await entity_relation_task first --- lightrag/lightrag.py | 43 ++++++++++++++++++++++++++++++------------- 1 file changed, 30 insertions(+), 13 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index f15a7a3a..74121cf1 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -860,18 +860,28 @@ class LightRAG: } ) ) - chunks_vdb_task = asyncio.create_task(self.chunks_vdb.upsert(chunks)) - entity_relation_task = asyncio.create_task(self._process_entity_relation_graph(chunks)) - full_docs_task = asyncio.create_task( - self.full_docs.upsert({doc_id: {"content": status_doc.content}}) + chunks_vdb_task = asyncio.create_task( + self.chunks_vdb.upsert(chunks) ) - text_chunks_task = asyncio.create_task(self.text_chunks.upsert(chunks)) - - tasks = [doc_status_task, chunks_vdb_task, entity_relation_task, full_docs_task, text_chunks_task] + entity_relation_task = asyncio.create_task( + self._process_entity_relation_graph(chunks) + ) + full_docs_task = asyncio.create_task( + self.full_docs.upsert( + {doc_id: {"content": status_doc.content}} + ) + ) + text_chunks_task = asyncio.create_task( + self.text_chunks.upsert(chunks) + ) + tasks = [ + doc_status_task, + chunks_vdb_task, + entity_relation_task, + full_docs_task, + text_chunks_task, + ] try: - # Wait for entity_relation_task first as it's critical - await entity_relation_task - # If successful, wait for other tasks await asyncio.gather(*tasks) await self.doc_status.upsert( { @@ -888,13 +898,20 @@ class LightRAG: ) except Exception as e: # Log error and update pipeline status - error_msg = f"Failed to process document {doc_id}: {str(e)}" + error_msg = ( + f"Failed to process document {doc_id}: {str(e)}" + ) logger.error(error_msg) pipeline_status["latest_message"] = error_msg pipeline_status["history_messages"].append(error_msg) # Cancel other tasks as they are no longer meaningful - for task in [chunks_vdb_task, full_docs_task, text_chunks_task]: + for task in [ + chunks_vdb_task, + entity_relation_task, + full_docs_task, + text_chunks_task, + ]: if not task.done(): task.cancel() @@ -941,7 +958,7 @@ class LightRAG: pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) - # 获取新的待处理文档 + # Check for pending documents again processing_docs, failed_docs, pending_docs = await asyncio.gather( self.doc_status.get_docs_by_status(DocStatus.PROCESSING), self.doc_status.get_docs_by_status(DocStatus.FAILED), From 5e7ef39998c4cc6a7ca0b8bd3fd8300c12a2edee Mon Sep 17 00:00:00 2001 From: zrguo <49157727+LarFii@users.noreply.github.com> Date: Wed, 5 Mar 2025 15:12:01 +0800 Subject: [PATCH 3/6] Update operate.py --- lightrag/operate.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lightrag/operate.py b/lightrag/operate.py index 7db42284..30983145 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -1242,9 +1242,11 @@ async def _find_most_related_text_unit_from_entities( all_text_units_lookup = {} tasks = [] + for index, (this_text_units, this_edges) in enumerate(zip(text_units, edges)): for c_id in this_text_units: if c_id not in all_text_units_lookup: + all_text_units_lookup[c_id] = index tasks.append((c_id, index, this_edges)) results = await asyncio.gather( From 06fc65d9a0168f01ba657548fea55f54c35c2a6b Mon Sep 17 00:00:00 2001 From: zrguo <49157727+LarFii@users.noreply.github.com> Date: Wed, 5 Mar 2025 16:26:28 +0800 Subject: [PATCH 4/6] Revert "[EVO] Add language configuration to environment and argument parsing" This reverts commit a688b8822a8a9eb3853781bb5c71029a22aa5396. --- env.example | 2 +- lightrag/api/lightrag_server.py | 3 --- lightrag/api/utils_api.py | 1 - 3 files changed, 1 insertion(+), 5 deletions(-) diff --git a/env.example b/env.example index 294a5b68..99909ac6 100644 --- a/env.example +++ b/env.example @@ -48,7 +48,7 @@ # CHUNK_OVERLAP_SIZE=100 # MAX_TOKENS=32768 # Max tokens send to LLM for summarization # MAX_TOKEN_SUMMARY=500 # Max tokens for entity or relations summary -# LANGUAGE=English +# SUMMARY_LANGUAGE=English # MAX_EMBED_TOKENS=8192 ### LLM Configuration (Use valid host. For local services installed with docker, you can use host.docker.internal) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 775dc5e3..8ad232f0 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -331,9 +331,6 @@ def create_app(args): "use_llm_check": False, }, namespace_prefix=args.namespace_prefix, - addon_params={ - "language": args.language, - }, auto_manage_storages_states=False, ) else: # azure_openai diff --git a/lightrag/api/utils_api.py b/lightrag/api/utils_api.py index f865682b..ed1250d4 100644 --- a/lightrag/api/utils_api.py +++ b/lightrag/api/utils_api.py @@ -340,7 +340,6 @@ def parse_args(is_uvicorn_mode: bool = False) -> argparse.Namespace: # Inject chunk configuration args.chunk_size = get_env_value("CHUNK_SIZE", 1200, int) args.chunk_overlap_size = get_env_value("CHUNK_OVERLAP_SIZE", 100, int) - args.language = get_env_value("LANGUAGE", "English") ollama_server_infos.LIGHTRAG_MODEL = args.simulated_model_name From 649164c3e692659aeabc966e53e0af415ca15863 Mon Sep 17 00:00:00 2001 From: zrguo <49157727+LarFii@users.noreply.github.com> Date: Wed, 5 Mar 2025 16:55:09 +0800 Subject: [PATCH 5/6] Update lightrag.py --- lightrag/lightrag.py | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index e8e468af..f81ade0b 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1945,6 +1945,9 @@ class LightRAG: new_entity_name, new_node_data ) + # Store relationships that need to be updated + relations_to_update = [] + # Get all edges related to the original entity edges = await self.chunk_entity_relation_graph.get_node_edges( entity_name @@ -1960,10 +1963,12 @@ class LightRAG: await self.chunk_entity_relation_graph.upsert_edge( new_entity_name, target, edge_data ) + relations_to_update.append((new_entity_name, target, edge_data)) else: # target == entity_name await self.chunk_entity_relation_graph.upsert_edge( source, new_entity_name, edge_data ) + relations_to_update.append((source, new_entity_name, edge_data)) # Delete old entity await self.chunk_entity_relation_graph.delete_node(entity_name) @@ -1972,6 +1977,35 @@ class LightRAG: old_entity_id = compute_mdhash_id(entity_name, prefix="ent-") await self.entities_vdb.delete([old_entity_id]) + # Update relationship vector representations + for src, tgt, edge_data in relations_to_update: + description = edge_data.get("description", "") + keywords = edge_data.get("keywords", "") + source_id = edge_data.get("source_id", "") + weight = float(edge_data.get("weight", 1.0)) + + # Create new content for embedding + content = f"{src}\t{tgt}\n{keywords}\n{description}" + + # Calculate relationship ID + relation_id = compute_mdhash_id(src + tgt, prefix="rel-") + + # Prepare data for vector database update + relation_data = { + relation_id: { + "content": content, + "src_id": src, + "tgt_id": tgt, + "source_id": source_id, + "description": description, + "keywords": keywords, + "weight": weight, + } + } + + # Update vector database + await self.relationships_vdb.upsert(relation_data) + # Update working entity name to new name entity_name = new_entity_name else: @@ -2082,7 +2116,7 @@ class LightRAG: weight = float(new_edge_data.get("weight", 1.0)) # Create content for embedding - content = f"{keywords}\t{source_entity}\n{target_entity}\n{description}" + content = f"{source_entity}\t{target_entity}\n{keywords}\n{description}" # Calculate relation ID relation_id = compute_mdhash_id( From 917dc39334d99120f98ede2c7cb13c70bdf78e24 Mon Sep 17 00:00:00 2001 From: zrguo <49157727+LarFii@users.noreply.github.com> Date: Wed, 5 Mar 2025 17:00:01 +0800 Subject: [PATCH 6/6] fix linting --- lightrag/lightrag.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index f81ade0b..5c060658 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1963,12 +1963,16 @@ class LightRAG: await self.chunk_entity_relation_graph.upsert_edge( new_entity_name, target, edge_data ) - relations_to_update.append((new_entity_name, target, edge_data)) + relations_to_update.append( + (new_entity_name, target, edge_data) + ) else: # target == entity_name await self.chunk_entity_relation_graph.upsert_edge( source, new_entity_name, edge_data ) - relations_to_update.append((source, new_entity_name, edge_data)) + relations_to_update.append( + (source, new_entity_name, edge_data) + ) # Delete old entity await self.chunk_entity_relation_graph.delete_node(entity_name) @@ -1983,13 +1987,13 @@ class LightRAG: keywords = edge_data.get("keywords", "") source_id = edge_data.get("source_id", "") weight = float(edge_data.get("weight", 1.0)) - + # Create new content for embedding content = f"{src}\t{tgt}\n{keywords}\n{description}" - + # Calculate relationship ID relation_id = compute_mdhash_id(src + tgt, prefix="rel-") - + # Prepare data for vector database update relation_data = { relation_id: { @@ -2002,7 +2006,7 @@ class LightRAG: "weight": weight, } } - + # Update vector database await self.relationships_vdb.upsert(relation_data)