From a809bc7945dd5b0ec64862ef527cc721628cf5c7 Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 4 Apr 2025 00:06:42 +0800 Subject: [PATCH] Optmize parallel processing on chunks extraction --- lightrag/operate.py | 151 ++++++++++++++++++-------------------------- 1 file changed, 63 insertions(+), 88 deletions(-) diff --git a/lightrag/operate.py b/lightrag/operate.py index bb0413b5..d14426ab 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -25,7 +25,6 @@ from .utils import ( CacheData, statistic_data, get_conversation_turns, - verbose_debug, ) from .base import ( BaseGraphStorage, @@ -441,6 +440,12 @@ async def extract_entities( processed_chunks = 0 total_chunks = len(ordered_chunks) + total_entities_count = 0 + total_relations_count = 0 + + # Get lock manager from shared storage + from .kg.shared_storage import get_graph_db_lock + graph_db_lock = get_graph_db_lock(enable_logging=False) async def _user_llm_func_with_cache( input_text: str, history_messages: list[dict[str, str]] = None @@ -539,7 +544,7 @@ async def extract_entities( chunk_key_dp (tuple[str, TextChunkSchema]): ("chunk-xxxxxx", {"tokens": int, "content": str, "full_doc_id": str, "chunk_order_index": int}) """ - nonlocal processed_chunks + nonlocal processed_chunks, total_entities_count, total_relations_count chunk_key = chunk_key_dp[0] chunk_dp = chunk_key_dp[1] content = chunk_dp["content"] @@ -597,100 +602,70 @@ async def extract_entities( async with pipeline_status_lock: pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) - return dict(maybe_nodes), dict(maybe_edges) + + # Use graph database lock to ensure atomic merges and updates + chunk_entities_data = [] + chunk_relationships_data = [] + + async with graph_db_lock: + # Process and update entities + for entity_name, entities in maybe_nodes.items(): + entity_data = await _merge_nodes_then_upsert( + entity_name, entities, knowledge_graph_inst, global_config + ) + chunk_entities_data.append(entity_data) + + # Process and update relationships + for edge_key, edges in maybe_edges.items(): + # Ensure edge direction consistency + sorted_edge_key = tuple(sorted(edge_key)) + edge_data = await _merge_edges_then_upsert( + sorted_edge_key[0], sorted_edge_key[1], edges, knowledge_graph_inst, global_config + ) + chunk_relationships_data.append(edge_data) + + # Update vector database (within the same lock to ensure atomicity) + if entity_vdb is not None and chunk_entities_data: + data_for_vdb = { + compute_mdhash_id(dp["entity_name"], prefix="ent-"): { + "entity_name": dp["entity_name"], + "entity_type": dp["entity_type"], + "content": f"{dp['entity_name']}\n{dp['description']}", + "source_id": dp["source_id"], + "file_path": dp.get("file_path", "unknown_source"), + } + for dp in chunk_entities_data + } + await entity_vdb.upsert(data_for_vdb) + + if relationships_vdb is not None and chunk_relationships_data: + data_for_vdb = { + compute_mdhash_id(dp["src_id"] + dp["tgt_id"], prefix="rel-"): { + "src_id": dp["src_id"], + "tgt_id": dp["tgt_id"], + "keywords": dp["keywords"], + "content": f"{dp['src_id']}\t{dp['tgt_id']}\n{dp['keywords']}\n{dp['description']}", + "source_id": dp["source_id"], + "file_path": dp.get("file_path", "unknown_source"), + } + for dp in chunk_relationships_data + } + await relationships_vdb.upsert(data_for_vdb) + + # Update counters + total_entities_count += len(chunk_entities_data) + total_relations_count += len(chunk_relationships_data) + # Handle all chunks in parallel tasks = [_process_single_content(c) for c in ordered_chunks] - results = await asyncio.gather(*tasks) + await asyncio.gather(*tasks) - maybe_nodes = defaultdict(list) - maybe_edges = defaultdict(list) - for m_nodes, m_edges in results: - for k, v in m_nodes.items(): - maybe_nodes[k].extend(v) - for k, v in m_edges.items(): - maybe_edges[tuple(sorted(k))].extend(v) - - from .kg.shared_storage import get_graph_db_lock - - graph_db_lock = get_graph_db_lock(enable_logging=False) - - # Ensure that nodes and edges are merged and upserted atomically - async with graph_db_lock: - # serial processing nodes under lock - all_entities_data = [] - for k, v in maybe_nodes.items(): - entity_data = await _merge_nodes_then_upsert(k, v, knowledge_graph_inst, global_config) - all_entities_data.append(entity_data) - - # serial processing edges under lock - all_relationships_data = [] - for k, v in maybe_edges.items(): - edge_data = await _merge_edges_then_upsert( - k[0], k[1], v, knowledge_graph_inst, global_config - ) - all_relationships_data.append(edge_data) - - if not (all_entities_data or all_relationships_data): - log_message = "Didn't extract any entities and relationships." - logger.info(log_message) - if pipeline_status is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - return - - if not all_entities_data: - log_message = "Didn't extract any entities" - logger.info(log_message) - if pipeline_status is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - if not all_relationships_data: - log_message = "Didn't extract any relationships" - logger.info(log_message) - if pipeline_status is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - log_message = f"Extracted {len(all_entities_data)} entities + {len(all_relationships_data)} relationships (deduplicated)" + log_message = f"Extracted {total_entities_count} entities + {total_relations_count} relationships (total)" logger.info(log_message) if pipeline_status is not None: async with pipeline_status_lock: pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) - verbose_debug( - f"New entities:{all_entities_data}, relationships:{all_relationships_data}" - ) - verbose_debug(f"New relationships:{all_relationships_data}") - - if entity_vdb is not None: - data_for_vdb = { - compute_mdhash_id(dp["entity_name"], prefix="ent-"): { - "entity_name": dp["entity_name"], - "entity_type": dp["entity_type"], - "content": f"{dp['entity_name']}\n{dp['description']}", - "source_id": dp["source_id"], - "file_path": dp.get("file_path", "unknown_source"), - } - for dp in all_entities_data - } - await entity_vdb.upsert(data_for_vdb) - - if relationships_vdb is not None: - data_for_vdb = { - compute_mdhash_id(dp["src_id"] + dp["tgt_id"], prefix="rel-"): { - "src_id": dp["src_id"], - "tgt_id": dp["tgt_id"], - "keywords": dp["keywords"], - "content": f"{dp['src_id']}\t{dp['tgt_id']}\n{dp['keywords']}\n{dp['description']}", - "source_id": dp["source_id"], - "file_path": dp.get("file_path", "unknown_source"), - } - for dp in all_relationships_data - } - await relationships_vdb.upsert(data_for_vdb) async def kg_query(