From 6b240fa9b2d0d62761500df29bb2529171d6da01 Mon Sep 17 00:00:00 2001 From: yangdx Date: Thu, 3 Apr 2025 21:33:46 +0800 Subject: [PATCH] Serialize merge precess to prevent race conditions --- lightrag/kg/neo4j_impl.py | 4 ++-- lightrag/kg/postgres_impl.py | 3 +++ lightrag/operate.py | 26 ++++++++++++-------------- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/lightrag/kg/neo4j_impl.py b/lightrag/kg/neo4j_impl.py index 72978b60..82bb54a2 100644 --- a/lightrag/kg/neo4j_impl.py +++ b/lightrag/kg/neo4j_impl.py @@ -832,8 +832,8 @@ class Neo4JStorage(BaseGraphStorage): seen_edges.add(edge_id) logger.info( - f"Process {os.getpid()} graph query return: {len(result.nodes)} nodes, {len(result.edges)} edges" - ) + f"Subgraph query successful | Node count: {len(result.nodes)} | Edge count: {len(result.edges)}" + ) except neo4jExceptions.ClientError as e: logger.warning(f"APOC plugin error: {str(e)}") diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index b1b5dee8..7ae4476f 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -1612,6 +1612,9 @@ class PGGraphStorage(BaseGraphStorage): is_truncated=is_truncated, ) + logger.info( + f"Subgraph query successful | Node count: {len(kg.nodes)} | Edge count: {len(kg.edges)}" + ) return kg async def drop(self) -> dict[str, str]: diff --git a/lightrag/operate.py b/lightrag/operate.py index e0351075..bb0413b5 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -616,21 +616,19 @@ async def extract_entities( # Ensure that nodes and edges are merged and upserted atomically async with graph_db_lock: - all_entities_data = await asyncio.gather( - *[ - _merge_nodes_then_upsert(k, v, knowledge_graph_inst, global_config) - for k, v in maybe_nodes.items() - ] - ) + # serial processing nodes under lock + all_entities_data = [] + for k, v in maybe_nodes.items(): + entity_data = await _merge_nodes_then_upsert(k, v, knowledge_graph_inst, global_config) + all_entities_data.append(entity_data) - all_relationships_data = await asyncio.gather( - *[ - _merge_edges_then_upsert( - k[0], k[1], v, knowledge_graph_inst, global_config - ) - for k, v in maybe_edges.items() - ] - ) + # serial processing edges under lock + all_relationships_data = [] + for k, v in maybe_edges.items(): + edge_data = await _merge_edges_then_upsert( + k[0], k[1], v, knowledge_graph_inst, global_config + ) + all_relationships_data.append(edge_data) if not (all_entities_data or all_relationships_data): log_message = "Didn't extract any entities and relationships."