From ec0450f7121e82f7f5aee2fd6e96903187a33283 Mon Sep 17 00:00:00 2001 From: zrguo Date: Thu, 6 Mar 2025 00:53:23 +0800 Subject: [PATCH] Add merge entities --- README.md | 70 ++++++++ lightrag/lightrag.py | 389 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 459 insertions(+) diff --git a/README.md b/README.md index 57563a1f..00da54fb 100644 --- a/README.md +++ b/README.md @@ -849,6 +849,76 @@ All operations are available in both synchronous and asynchronous versions. The These operations maintain data consistency across both the graph database and vector database components, ensuring your knowledge graph remains coherent. +## Entity Merging + +
+ Merge Entities and Their Relationships + +LightRAG now supports merging multiple entities into a single entity, automatically handling all relationships: + +```python +# Basic entity merging +rag.merge_entities( + source_entities=["Artificial Intelligence", "AI", "Machine Intelligence"], + target_entity="AI Technology" +) +``` + +With custom merge strategy: + +```python +# Define custom merge strategy for different fields +rag.merge_entities( + source_entities=["John Smith", "Dr. Smith", "J. Smith"], + target_entity="John Smith", + merge_strategy={ + "description": "concatenate", # Combine all descriptions + "entity_type": "keep_first", # Keep the entity type from the first entity + "source_id": "join_unique" # Combine all unique source IDs + } +) +``` + +With custom target entity data: + +```python +# Specify exact values for the merged entity +rag.merge_entities( + source_entities=["New York", "NYC", "Big Apple"], + target_entity="New York City", + target_entity_data={ + "entity_type": "LOCATION", + "description": "New York City is the most populous city in the United States.", + } +) +``` + +Advanced usage combining both approaches: + +```python +# Merge company entities with both strategy and custom data +rag.merge_entities( + source_entities=["Microsoft Corp", "Microsoft Corporation", "MSFT"], + target_entity="Microsoft", + merge_strategy={ + "description": "concatenate", # Combine all descriptions + "source_id": "join_unique" # Combine source IDs + }, + target_entity_data={ + "entity_type": "ORGANIZATION", + } +) +``` + +When merging entities: +* All relationships from source entities are redirected to the target entity +* Duplicate relationships are intelligently merged +* Self-relationships (loops) are prevented +* Source entities are removed after merging +* Relationship weights and attributes are preserved + +
+ ## Cache
diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index e8c2bb9c..fa3a3a2b 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -2420,3 +2420,392 @@ class LightRAG: return loop.run_until_complete( self.acreate_relation(source_entity, target_entity, relation_data) ) + + async def amerge_entities( + self, + source_entities: list[str], + target_entity: str, + merge_strategy: dict[str, str] = None, + target_entity_data: dict[str, Any] = None, + ) -> dict[str, Any]: + """Asynchronously merge multiple entities into one entity. + + Merges multiple source entities into a target entity, handling all relationships, + and updating both the knowledge graph and vector database. + + Args: + source_entities: List of source entity names to merge + target_entity: Name of the target entity after merging + merge_strategy: Merge strategy configuration, e.g. {"description": "concatenate", "entity_type": "keep_first"} + Supported strategies: + - "concatenate": Concatenate all values (for text fields) + - "keep_first": Keep the first non-empty value + - "keep_last": Keep the last non-empty value + - "join_unique": Join all unique values (for fields separated by delimiter) + target_entity_data: Dictionary of specific values to set for the target entity, + overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"} + + Returns: + Dictionary containing the merged entity information + """ + try: + # Default merge strategy + default_strategy = { + "description": "concatenate", + "entity_type": "keep_first", + "source_id": "join_unique", + } + + merge_strategy = ( + default_strategy + if merge_strategy is None + else {**default_strategy, **merge_strategy} + ) + target_entity_data = ( + {} if target_entity_data is None else target_entity_data + ) + + # 1. Check if all source entities exist + source_entities_data = {} + for entity_name in source_entities: + node_data = await self.chunk_entity_relation_graph.get_node(entity_name) + if not node_data: + raise ValueError(f"Source entity '{entity_name}' does not exist") + source_entities_data[entity_name] = node_data + + # 2. Check if target entity exists and get its data if it does + target_exists = await self.chunk_entity_relation_graph.has_node( + target_entity + ) + target_entity_data = {} + if target_exists: + target_entity_data = await self.chunk_entity_relation_graph.get_node( + target_entity + ) + logger.info( + f"Target entity '{target_entity}' already exists, will merge data" + ) + + # 3. Merge entity data + merged_entity_data = self._merge_entity_attributes( + list(source_entities_data.values()) + + ([target_entity_data] if target_exists else []), + merge_strategy, + ) + + # Apply any explicitly provided target entity data (overrides merged data) + for key, value in target_entity_data.items(): + merged_entity_data[key] = value + + # 4. Get all relationships of the source entities + all_relations = [] + for entity_name in source_entities: + # Get all relationships where this entity is the source + outgoing_edges = await self.chunk_entity_relation_graph.get_node_edges( + entity_name + ) + if outgoing_edges: + for src, tgt in outgoing_edges: + # Ensure src is the current entity + if src == entity_name: + edge_data = await self.chunk_entity_relation_graph.get_edge( + src, tgt + ) + all_relations.append(("outgoing", src, tgt, edge_data)) + + # Get all relationships where this entity is the target + incoming_edges = [] + all_labels = await self.chunk_entity_relation_graph.get_all_labels() + for label in all_labels: + if label == entity_name: + continue + node_edges = await self.chunk_entity_relation_graph.get_node_edges( + label + ) + for src, tgt in node_edges or []: + if tgt == entity_name: + incoming_edges.append((src, tgt)) + + for src, tgt in incoming_edges: + edge_data = await self.chunk_entity_relation_graph.get_edge( + src, tgt + ) + all_relations.append(("incoming", src, tgt, edge_data)) + + # 5. Create or update the target entity + if not target_exists: + await self.chunk_entity_relation_graph.upsert_node( + target_entity, merged_entity_data + ) + logger.info(f"Created new target entity '{target_entity}'") + else: + await self.chunk_entity_relation_graph.upsert_node( + target_entity, merged_entity_data + ) + logger.info(f"Updated existing target entity '{target_entity}'") + + # 6. Recreate all relationships, pointing to the target entity + relation_updates = {} # Track relationships that need to be merged + + for rel_type, src, tgt, edge_data in all_relations: + new_src = target_entity if src in source_entities else src + new_tgt = target_entity if tgt in source_entities else tgt + + # Skip relationships between source entities to avoid self-loops + if new_src == new_tgt: + logger.info( + f"Skipping relationship between source entities: {src} -> {tgt} to avoid self-loop" + ) + continue + + # Check if the same relationship already exists + relation_key = f"{new_src}|{new_tgt}" + if relation_key in relation_updates: + # Merge relationship data + existing_data = relation_updates[relation_key]["data"] + merged_relation = self._merge_relation_attributes( + [existing_data, edge_data], + { + "description": "concatenate", + "keywords": "join_unique", + "source_id": "join_unique", + "weight": "max", + }, + ) + relation_updates[relation_key]["data"] = merged_relation + logger.info( + f"Merged duplicate relationship: {new_src} -> {new_tgt}" + ) + else: + relation_updates[relation_key] = { + "src": new_src, + "tgt": new_tgt, + "data": edge_data.copy(), + } + + # Apply relationship updates + for rel_data in relation_updates.values(): + await self.chunk_entity_relation_graph.upsert_edge( + rel_data["src"], rel_data["tgt"], rel_data["data"] + ) + logger.info( + f"Created or updated relationship: {rel_data['src']} -> {rel_data['tgt']}" + ) + + # 7. Update entity vector representation + description = merged_entity_data.get("description", "") + source_id = merged_entity_data.get("source_id", "") + entity_type = merged_entity_data.get("entity_type", "") + content = target_entity + "\n" + description + + entity_id = compute_mdhash_id(target_entity, prefix="ent-") + entity_data_for_vdb = { + entity_id: { + "content": content, + "entity_name": target_entity, + "source_id": source_id, + "description": description, + "entity_type": entity_type, + } + } + + await self.entities_vdb.upsert(entity_data_for_vdb) + + # 8. Update relationship vector representations + for rel_data in relation_updates.values(): + src = rel_data["src"] + tgt = rel_data["tgt"] + edge_data = rel_data["data"] + + description = edge_data.get("description", "") + keywords = edge_data.get("keywords", "") + source_id = edge_data.get("source_id", "") + weight = float(edge_data.get("weight", 1.0)) + + content = f"{keywords}\t{src}\n{tgt}\n{description}" + relation_id = compute_mdhash_id(src + tgt, prefix="rel-") + + relation_data_for_vdb = { + relation_id: { + "content": content, + "src_id": src, + "tgt_id": tgt, + "source_id": source_id, + "description": description, + "keywords": keywords, + "weight": weight, + } + } + + await self.relationships_vdb.upsert(relation_data_for_vdb) + + # 9. Delete source entities + for entity_name in source_entities: + # Delete entity node + await self.chunk_entity_relation_graph.delete_node(entity_name) + # Delete record from vector database + entity_id = compute_mdhash_id(entity_name, prefix="ent-") + await self.entities_vdb.delete([entity_id]) + logger.info(f"Deleted source entity '{entity_name}'") + + # 10. Save changes + await self._merge_entities_done() + + logger.info( + f"Successfully merged {len(source_entities)} entities into '{target_entity}'" + ) + return await self.get_entity_info(target_entity, include_vector_data=True) + + except Exception as e: + logger.error(f"Error merging entities: {e}") + raise + + def merge_entities( + self, + source_entities: list[str], + target_entity: str, + merge_strategy: dict[str, str] = None, + target_entity_data: dict[str, Any] = None, + ) -> dict[str, Any]: + """Synchronously merge multiple entities into one entity. + + Merges multiple source entities into a target entity, handling all relationships, + and updating both the knowledge graph and vector database. + + Args: + source_entities: List of source entity names to merge + target_entity: Name of the target entity after merging + merge_strategy: Merge strategy configuration, e.g. {"description": "concatenate", "entity_type": "keep_first"} + target_entity_data: Dictionary of specific values to set for the target entity, + overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"} + + Returns: + Dictionary containing the merged entity information + """ + loop = always_get_an_event_loop() + return loop.run_until_complete( + self.amerge_entities( + source_entities, target_entity, merge_strategy, target_entity_data + ) + ) + + def _merge_entity_attributes( + self, entity_data_list: list[dict[str, Any]], merge_strategy: dict[str, str] + ) -> dict[str, Any]: + """Merge attributes from multiple entities. + + Args: + entity_data_list: List of dictionaries containing entity data + merge_strategy: Merge strategy for each field + + Returns: + Dictionary containing merged entity data + """ + merged_data = {} + + # Collect all possible keys + all_keys = set() + for data in entity_data_list: + all_keys.update(data.keys()) + + # Merge values for each key + for key in all_keys: + # Get all values for this key + values = [data.get(key) for data in entity_data_list if data.get(key)] + + if not values: + continue + + # Merge values according to strategy + strategy = merge_strategy.get(key, "keep_first") + + if strategy == "concatenate": + merged_data[key] = "\n\n".join(values) + elif strategy == "keep_first": + merged_data[key] = values[0] + elif strategy == "keep_last": + merged_data[key] = values[-1] + elif strategy == "join_unique": + # Handle fields separated by GRAPH_FIELD_SEP + unique_items = set() + for value in values: + items = value.split(GRAPH_FIELD_SEP) + unique_items.update(items) + merged_data[key] = GRAPH_FIELD_SEP.join(unique_items) + else: + # Default strategy + merged_data[key] = values[0] + + return merged_data + + def _merge_relation_attributes( + self, relation_data_list: list[dict[str, Any]], merge_strategy: dict[str, str] + ) -> dict[str, Any]: + """Merge attributes from multiple relationships. + + Args: + relation_data_list: List of dictionaries containing relationship data + merge_strategy: Merge strategy for each field + + Returns: + Dictionary containing merged relationship data + """ + merged_data = {} + + # Collect all possible keys + all_keys = set() + for data in relation_data_list: + all_keys.update(data.keys()) + + # Merge values for each key + for key in all_keys: + # Get all values for this key + values = [ + data.get(key) + for data in relation_data_list + if data.get(key) is not None + ] + + if not values: + continue + + # Merge values according to strategy + strategy = merge_strategy.get(key, "keep_first") + + if strategy == "concatenate": + merged_data[key] = "\n\n".join(str(v) for v in values) + elif strategy == "keep_first": + merged_data[key] = values[0] + elif strategy == "keep_last": + merged_data[key] = values[-1] + elif strategy == "join_unique": + # Handle fields separated by GRAPH_FIELD_SEP + unique_items = set() + for value in values: + items = str(value).split(GRAPH_FIELD_SEP) + unique_items.update(items) + merged_data[key] = GRAPH_FIELD_SEP.join(unique_items) + elif strategy == "max": + # For numeric fields like weight + try: + merged_data[key] = max(float(v) for v in values) + except (ValueError, TypeError): + merged_data[key] = values[0] + else: + # Default strategy + merged_data[key] = values[0] + + return merged_data + + async def _merge_entities_done(self) -> None: + """Callback after entity merging is complete, ensures updates are persisted""" + await asyncio.gather( + *[ + cast(StorageNameSpace, storage_inst).index_done_callback() + for storage_inst in [ # type: ignore + self.entities_vdb, + self.relationships_vdb, + self.chunk_entity_relation_graph, + ] + ] + )