From 18c077040939c7e5a90a90e06af1b0da3c6911f6 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 9 Mar 2025 00:24:55 +0800 Subject: [PATCH] fix: duplicate nodes for same entity(label) problem in Neo4j - Add entity_id field as key in Neo4j nodes - Use entity_id for nodes retrival and upsert --- lightrag/kg/neo4j_impl.py | 106 ++++++++++++++++++++++++++------------ lightrag/operate.py | 2 + 2 files changed, 74 insertions(+), 34 deletions(-) diff --git a/lightrag/kg/neo4j_impl.py b/lightrag/kg/neo4j_impl.py index 1e46798a..0b660d68 100644 --- a/lightrag/kg/neo4j_impl.py +++ b/lightrag/kg/neo4j_impl.py @@ -280,12 +280,10 @@ class Neo4JStorage(BaseGraphStorage): database=self._DATABASE, default_access_mode="READ" ) as session: try: - query = f"MATCH (n:`{entity_name_label}`) RETURN n" - result = await session.run(query) + query = f"MATCH (n:`{entity_name_label}` {{entity_id: $entity_id}}) RETURN n" + result = await session.run(query, entity_id=entity_name_label) try: - records = await result.fetch( - 2 - ) # Get up to 2 records to check for duplicates + records = await result.fetch(2) # Get 2 records for duplication check if len(records) > 1: logger.warning( @@ -549,12 +547,14 @@ class Neo4JStorage(BaseGraphStorage): """ label = self._ensure_label(node_id) properties = node_data + if "entity_id" not in properties: + raise ValueError("Neo4j: node properties must contain an 'entity_id' field") try: async with self._driver.session(database=self._DATABASE) as session: async def execute_upsert(tx: AsyncManagedTransaction): query = f""" - MERGE (n:`{label}`) + MERGE (n:`{label}` {{entity_id: $properties.entity_id}}) SET n += $properties """ result = await tx.run(query, properties=properties) @@ -568,6 +568,56 @@ class Neo4JStorage(BaseGraphStorage): logger.error(f"Error during upsert: {str(e)}") raise + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10), + retry=retry_if_exception_type( + ( + neo4jExceptions.ServiceUnavailable, + neo4jExceptions.TransientError, + neo4jExceptions.WriteServiceUnavailable, + neo4jExceptions.ClientError, + ) + ), + ) + async def _get_unique_node_entity_id(self, node_label: str) -> str: + """ + Get the entity_id of a node with the given label, ensuring the node is unique. + + Args: + node_label (str): Label of the node to check + + Returns: + str: The entity_id of the unique node + + Raises: + ValueError: If no node with the given label exists or if multiple nodes have the same label + """ + async with self._driver.session( + database=self._DATABASE, default_access_mode="READ" + ) as session: + query = f""" + MATCH (n:`{node_label}`) + RETURN n, count(n) as node_count + """ + result = await session.run(query) + try: + records = await result.fetch(2) # We only need to know if there are 0, 1, or >1 nodes + + if not records or records[0]["node_count"] == 0: + raise ValueError(f"Neo4j: node with label '{node_label}' does not exist") + + if records[0]["node_count"] > 1: + raise ValueError(f"Neo4j: multiple nodes found with label '{node_label}', cannot determine unique node") + + node = records[0]["n"] + if "entity_id" not in node: + raise ValueError(f"Neo4j: node with label '{node_label}' does not have an entity_id property") + + return node["entity_id"] + finally: + await result.consume() # Ensure result is fully consumed + @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10), @@ -585,7 +635,8 @@ class Neo4JStorage(BaseGraphStorage): ) -> None: """ Upsert an edge and its properties between two nodes identified by their labels. - Checks if both source and target nodes exist before creating the edge. + Ensures both source and target nodes exist and are unique before creating the edge. + Uses entity_id property to uniquely identify nodes. Args: source_node_id (str): Label of the source node (used as identifier) @@ -593,52 +644,39 @@ class Neo4JStorage(BaseGraphStorage): edge_data (dict): Dictionary of properties to set on the edge Raises: - ValueError: If either source or target node does not exist + ValueError: If either source or target node does not exist or is not unique """ source_label = self._ensure_label(source_node_id) target_label = self._ensure_label(target_node_id) edge_properties = edge_data - # Check if both nodes exist - source_exists = await self.has_node(source_label) - target_exists = await self.has_node(target_label) - - if not source_exists: - raise ValueError( - f"Neo4j: source node with label '{source_label}' does not exist" - ) - if not target_exists: - raise ValueError( - f"Neo4j: target node with label '{target_label}' does not exist" - ) + # Get entity_ids for source and target nodes, ensuring they are unique + source_entity_id = await self._get_unique_node_entity_id(source_label) + target_entity_id = await self._get_unique_node_entity_id(target_label) try: async with self._driver.session(database=self._DATABASE) as session: async def execute_upsert(tx: AsyncManagedTransaction): query = f""" - MATCH (source:`{source_label}`) + MATCH (source:`{source_label}` {{entity_id: $source_entity_id}}) WITH source - MATCH (target:`{target_label}`) + MATCH (target:`{target_label}` {{entity_id: $target_entity_id}}) MERGE (source)-[r:DIRECTED]-(target) SET r += $properties RETURN r, source, target """ - result = await tx.run(query, properties=edge_properties) + result = await tx.run( + query, + source_entity_id=source_entity_id, + target_entity_id=target_entity_id, + properties=edge_properties + ) try: records = await result.fetch(100) - if len(records) > 1: - source_nodes = [dict(r['source']) for r in records] - target_nodes = [dict(r['target']) for r in records] - logger.warning( - f"Multiple edges created: found {len(records)} results for edge between " - f"source label '{source_label}' and target label '{target_label}'. " - f"Source nodes: {source_nodes}, " - f"Target nodes: {target_nodes}. " - "Using first edge only." - ) if records: logger.debug( - f"Upserted edge from '{source_label}' to '{target_label}' " + f"Upserted edge from '{source_label}' (entity_id: {source_entity_id}) " + f"to '{target_label}' (entity_id: {target_entity_id}) " f"with properties: {edge_properties}" ) finally: diff --git a/lightrag/operate.py b/lightrag/operate.py index f89a551d..fb7b27a0 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -220,6 +220,7 @@ async def _merge_nodes_then_upsert( entity_name, description, global_config ) node_data = dict( + entity_id=entity_name, entity_type=entity_type, description=description, source_id=source_id, @@ -301,6 +302,7 @@ async def _merge_edges_then_upsert( await knowledge_graph_inst.upsert_node( need_insert_id, node_data={ + "entity_id": need_insert_id, "source_id": source_id, "description": description, "entity_type": "UNKNOWN",