Add merge entities
This commit is contained in:
70
README.md
70
README.md
@@ -849,6 +849,76 @@ All operations are available in both synchronous and asynchronous versions. The
|
||||
|
||||
These operations maintain data consistency across both the graph database and vector database components, ensuring your knowledge graph remains coherent.
|
||||
|
||||
## Entity Merging
|
||||
|
||||
<details>
|
||||
<summary> <b>Merge Entities and Their Relationships</b> </summary>
|
||||
|
||||
LightRAG now supports merging multiple entities into a single entity, automatically handling all relationships:
|
||||
|
||||
```python
|
||||
# Basic entity merging
|
||||
rag.merge_entities(
|
||||
source_entities=["Artificial Intelligence", "AI", "Machine Intelligence"],
|
||||
target_entity="AI Technology"
|
||||
)
|
||||
```
|
||||
|
||||
With custom merge strategy:
|
||||
|
||||
```python
|
||||
# Define custom merge strategy for different fields
|
||||
rag.merge_entities(
|
||||
source_entities=["John Smith", "Dr. Smith", "J. Smith"],
|
||||
target_entity="John Smith",
|
||||
merge_strategy={
|
||||
"description": "concatenate", # Combine all descriptions
|
||||
"entity_type": "keep_first", # Keep the entity type from the first entity
|
||||
"source_id": "join_unique" # Combine all unique source IDs
|
||||
}
|
||||
)
|
||||
```
|
||||
|
||||
With custom target entity data:
|
||||
|
||||
```python
|
||||
# Specify exact values for the merged entity
|
||||
rag.merge_entities(
|
||||
source_entities=["New York", "NYC", "Big Apple"],
|
||||
target_entity="New York City",
|
||||
target_entity_data={
|
||||
"entity_type": "LOCATION",
|
||||
"description": "New York City is the most populous city in the United States.",
|
||||
}
|
||||
)
|
||||
```
|
||||
|
||||
Advanced usage combining both approaches:
|
||||
|
||||
```python
|
||||
# Merge company entities with both strategy and custom data
|
||||
rag.merge_entities(
|
||||
source_entities=["Microsoft Corp", "Microsoft Corporation", "MSFT"],
|
||||
target_entity="Microsoft",
|
||||
merge_strategy={
|
||||
"description": "concatenate", # Combine all descriptions
|
||||
"source_id": "join_unique" # Combine source IDs
|
||||
},
|
||||
target_entity_data={
|
||||
"entity_type": "ORGANIZATION",
|
||||
}
|
||||
)
|
||||
```
|
||||
|
||||
When merging entities:
|
||||
* All relationships from source entities are redirected to the target entity
|
||||
* Duplicate relationships are intelligently merged
|
||||
* Self-relationships (loops) are prevented
|
||||
* Source entities are removed after merging
|
||||
* Relationship weights and attributes are preserved
|
||||
|
||||
</details>
|
||||
|
||||
## Cache
|
||||
|
||||
<details>
|
||||
|
@@ -2420,3 +2420,392 @@ class LightRAG:
|
||||
return loop.run_until_complete(
|
||||
self.acreate_relation(source_entity, target_entity, relation_data)
|
||||
)
|
||||
|
||||
async def amerge_entities(
|
||||
self,
|
||||
source_entities: list[str],
|
||||
target_entity: str,
|
||||
merge_strategy: dict[str, str] = None,
|
||||
target_entity_data: dict[str, Any] = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Asynchronously merge multiple entities into one entity.
|
||||
|
||||
Merges multiple source entities into a target entity, handling all relationships,
|
||||
and updating both the knowledge graph and vector database.
|
||||
|
||||
Args:
|
||||
source_entities: List of source entity names to merge
|
||||
target_entity: Name of the target entity after merging
|
||||
merge_strategy: Merge strategy configuration, e.g. {"description": "concatenate", "entity_type": "keep_first"}
|
||||
Supported strategies:
|
||||
- "concatenate": Concatenate all values (for text fields)
|
||||
- "keep_first": Keep the first non-empty value
|
||||
- "keep_last": Keep the last non-empty value
|
||||
- "join_unique": Join all unique values (for fields separated by delimiter)
|
||||
target_entity_data: Dictionary of specific values to set for the target entity,
|
||||
overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"}
|
||||
|
||||
Returns:
|
||||
Dictionary containing the merged entity information
|
||||
"""
|
||||
try:
|
||||
# Default merge strategy
|
||||
default_strategy = {
|
||||
"description": "concatenate",
|
||||
"entity_type": "keep_first",
|
||||
"source_id": "join_unique",
|
||||
}
|
||||
|
||||
merge_strategy = (
|
||||
default_strategy
|
||||
if merge_strategy is None
|
||||
else {**default_strategy, **merge_strategy}
|
||||
)
|
||||
target_entity_data = (
|
||||
{} if target_entity_data is None else target_entity_data
|
||||
)
|
||||
|
||||
# 1. Check if all source entities exist
|
||||
source_entities_data = {}
|
||||
for entity_name in source_entities:
|
||||
node_data = await self.chunk_entity_relation_graph.get_node(entity_name)
|
||||
if not node_data:
|
||||
raise ValueError(f"Source entity '{entity_name}' does not exist")
|
||||
source_entities_data[entity_name] = node_data
|
||||
|
||||
# 2. Check if target entity exists and get its data if it does
|
||||
target_exists = await self.chunk_entity_relation_graph.has_node(
|
||||
target_entity
|
||||
)
|
||||
target_entity_data = {}
|
||||
if target_exists:
|
||||
target_entity_data = await self.chunk_entity_relation_graph.get_node(
|
||||
target_entity
|
||||
)
|
||||
logger.info(
|
||||
f"Target entity '{target_entity}' already exists, will merge data"
|
||||
)
|
||||
|
||||
# 3. Merge entity data
|
||||
merged_entity_data = self._merge_entity_attributes(
|
||||
list(source_entities_data.values())
|
||||
+ ([target_entity_data] if target_exists else []),
|
||||
merge_strategy,
|
||||
)
|
||||
|
||||
# Apply any explicitly provided target entity data (overrides merged data)
|
||||
for key, value in target_entity_data.items():
|
||||
merged_entity_data[key] = value
|
||||
|
||||
# 4. Get all relationships of the source entities
|
||||
all_relations = []
|
||||
for entity_name in source_entities:
|
||||
# Get all relationships where this entity is the source
|
||||
outgoing_edges = await self.chunk_entity_relation_graph.get_node_edges(
|
||||
entity_name
|
||||
)
|
||||
if outgoing_edges:
|
||||
for src, tgt in outgoing_edges:
|
||||
# Ensure src is the current entity
|
||||
if src == entity_name:
|
||||
edge_data = await self.chunk_entity_relation_graph.get_edge(
|
||||
src, tgt
|
||||
)
|
||||
all_relations.append(("outgoing", src, tgt, edge_data))
|
||||
|
||||
# Get all relationships where this entity is the target
|
||||
incoming_edges = []
|
||||
all_labels = await self.chunk_entity_relation_graph.get_all_labels()
|
||||
for label in all_labels:
|
||||
if label == entity_name:
|
||||
continue
|
||||
node_edges = await self.chunk_entity_relation_graph.get_node_edges(
|
||||
label
|
||||
)
|
||||
for src, tgt in node_edges or []:
|
||||
if tgt == entity_name:
|
||||
incoming_edges.append((src, tgt))
|
||||
|
||||
for src, tgt in incoming_edges:
|
||||
edge_data = await self.chunk_entity_relation_graph.get_edge(
|
||||
src, tgt
|
||||
)
|
||||
all_relations.append(("incoming", src, tgt, edge_data))
|
||||
|
||||
# 5. Create or update the target entity
|
||||
if not target_exists:
|
||||
await self.chunk_entity_relation_graph.upsert_node(
|
||||
target_entity, merged_entity_data
|
||||
)
|
||||
logger.info(f"Created new target entity '{target_entity}'")
|
||||
else:
|
||||
await self.chunk_entity_relation_graph.upsert_node(
|
||||
target_entity, merged_entity_data
|
||||
)
|
||||
logger.info(f"Updated existing target entity '{target_entity}'")
|
||||
|
||||
# 6. Recreate all relationships, pointing to the target entity
|
||||
relation_updates = {} # Track relationships that need to be merged
|
||||
|
||||
for rel_type, src, tgt, edge_data in all_relations:
|
||||
new_src = target_entity if src in source_entities else src
|
||||
new_tgt = target_entity if tgt in source_entities else tgt
|
||||
|
||||
# Skip relationships between source entities to avoid self-loops
|
||||
if new_src == new_tgt:
|
||||
logger.info(
|
||||
f"Skipping relationship between source entities: {src} -> {tgt} to avoid self-loop"
|
||||
)
|
||||
continue
|
||||
|
||||
# Check if the same relationship already exists
|
||||
relation_key = f"{new_src}|{new_tgt}"
|
||||
if relation_key in relation_updates:
|
||||
# Merge relationship data
|
||||
existing_data = relation_updates[relation_key]["data"]
|
||||
merged_relation = self._merge_relation_attributes(
|
||||
[existing_data, edge_data],
|
||||
{
|
||||
"description": "concatenate",
|
||||
"keywords": "join_unique",
|
||||
"source_id": "join_unique",
|
||||
"weight": "max",
|
||||
},
|
||||
)
|
||||
relation_updates[relation_key]["data"] = merged_relation
|
||||
logger.info(
|
||||
f"Merged duplicate relationship: {new_src} -> {new_tgt}"
|
||||
)
|
||||
else:
|
||||
relation_updates[relation_key] = {
|
||||
"src": new_src,
|
||||
"tgt": new_tgt,
|
||||
"data": edge_data.copy(),
|
||||
}
|
||||
|
||||
# Apply relationship updates
|
||||
for rel_data in relation_updates.values():
|
||||
await self.chunk_entity_relation_graph.upsert_edge(
|
||||
rel_data["src"], rel_data["tgt"], rel_data["data"]
|
||||
)
|
||||
logger.info(
|
||||
f"Created or updated relationship: {rel_data['src']} -> {rel_data['tgt']}"
|
||||
)
|
||||
|
||||
# 7. Update entity vector representation
|
||||
description = merged_entity_data.get("description", "")
|
||||
source_id = merged_entity_data.get("source_id", "")
|
||||
entity_type = merged_entity_data.get("entity_type", "")
|
||||
content = target_entity + "\n" + description
|
||||
|
||||
entity_id = compute_mdhash_id(target_entity, prefix="ent-")
|
||||
entity_data_for_vdb = {
|
||||
entity_id: {
|
||||
"content": content,
|
||||
"entity_name": target_entity,
|
||||
"source_id": source_id,
|
||||
"description": description,
|
||||
"entity_type": entity_type,
|
||||
}
|
||||
}
|
||||
|
||||
await self.entities_vdb.upsert(entity_data_for_vdb)
|
||||
|
||||
# 8. Update relationship vector representations
|
||||
for rel_data in relation_updates.values():
|
||||
src = rel_data["src"]
|
||||
tgt = rel_data["tgt"]
|
||||
edge_data = rel_data["data"]
|
||||
|
||||
description = edge_data.get("description", "")
|
||||
keywords = edge_data.get("keywords", "")
|
||||
source_id = edge_data.get("source_id", "")
|
||||
weight = float(edge_data.get("weight", 1.0))
|
||||
|
||||
content = f"{keywords}\t{src}\n{tgt}\n{description}"
|
||||
relation_id = compute_mdhash_id(src + tgt, prefix="rel-")
|
||||
|
||||
relation_data_for_vdb = {
|
||||
relation_id: {
|
||||
"content": content,
|
||||
"src_id": src,
|
||||
"tgt_id": tgt,
|
||||
"source_id": source_id,
|
||||
"description": description,
|
||||
"keywords": keywords,
|
||||
"weight": weight,
|
||||
}
|
||||
}
|
||||
|
||||
await self.relationships_vdb.upsert(relation_data_for_vdb)
|
||||
|
||||
# 9. Delete source entities
|
||||
for entity_name in source_entities:
|
||||
# Delete entity node
|
||||
await self.chunk_entity_relation_graph.delete_node(entity_name)
|
||||
# Delete record from vector database
|
||||
entity_id = compute_mdhash_id(entity_name, prefix="ent-")
|
||||
await self.entities_vdb.delete([entity_id])
|
||||
logger.info(f"Deleted source entity '{entity_name}'")
|
||||
|
||||
# 10. Save changes
|
||||
await self._merge_entities_done()
|
||||
|
||||
logger.info(
|
||||
f"Successfully merged {len(source_entities)} entities into '{target_entity}'"
|
||||
)
|
||||
return await self.get_entity_info(target_entity, include_vector_data=True)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error merging entities: {e}")
|
||||
raise
|
||||
|
||||
def merge_entities(
|
||||
self,
|
||||
source_entities: list[str],
|
||||
target_entity: str,
|
||||
merge_strategy: dict[str, str] = None,
|
||||
target_entity_data: dict[str, Any] = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Synchronously merge multiple entities into one entity.
|
||||
|
||||
Merges multiple source entities into a target entity, handling all relationships,
|
||||
and updating both the knowledge graph and vector database.
|
||||
|
||||
Args:
|
||||
source_entities: List of source entity names to merge
|
||||
target_entity: Name of the target entity after merging
|
||||
merge_strategy: Merge strategy configuration, e.g. {"description": "concatenate", "entity_type": "keep_first"}
|
||||
target_entity_data: Dictionary of specific values to set for the target entity,
|
||||
overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"}
|
||||
|
||||
Returns:
|
||||
Dictionary containing the merged entity information
|
||||
"""
|
||||
loop = always_get_an_event_loop()
|
||||
return loop.run_until_complete(
|
||||
self.amerge_entities(
|
||||
source_entities, target_entity, merge_strategy, target_entity_data
|
||||
)
|
||||
)
|
||||
|
||||
def _merge_entity_attributes(
|
||||
self, entity_data_list: list[dict[str, Any]], merge_strategy: dict[str, str]
|
||||
) -> dict[str, Any]:
|
||||
"""Merge attributes from multiple entities.
|
||||
|
||||
Args:
|
||||
entity_data_list: List of dictionaries containing entity data
|
||||
merge_strategy: Merge strategy for each field
|
||||
|
||||
Returns:
|
||||
Dictionary containing merged entity data
|
||||
"""
|
||||
merged_data = {}
|
||||
|
||||
# Collect all possible keys
|
||||
all_keys = set()
|
||||
for data in entity_data_list:
|
||||
all_keys.update(data.keys())
|
||||
|
||||
# Merge values for each key
|
||||
for key in all_keys:
|
||||
# Get all values for this key
|
||||
values = [data.get(key) for data in entity_data_list if data.get(key)]
|
||||
|
||||
if not values:
|
||||
continue
|
||||
|
||||
# Merge values according to strategy
|
||||
strategy = merge_strategy.get(key, "keep_first")
|
||||
|
||||
if strategy == "concatenate":
|
||||
merged_data[key] = "\n\n".join(values)
|
||||
elif strategy == "keep_first":
|
||||
merged_data[key] = values[0]
|
||||
elif strategy == "keep_last":
|
||||
merged_data[key] = values[-1]
|
||||
elif strategy == "join_unique":
|
||||
# Handle fields separated by GRAPH_FIELD_SEP
|
||||
unique_items = set()
|
||||
for value in values:
|
||||
items = value.split(GRAPH_FIELD_SEP)
|
||||
unique_items.update(items)
|
||||
merged_data[key] = GRAPH_FIELD_SEP.join(unique_items)
|
||||
else:
|
||||
# Default strategy
|
||||
merged_data[key] = values[0]
|
||||
|
||||
return merged_data
|
||||
|
||||
def _merge_relation_attributes(
|
||||
self, relation_data_list: list[dict[str, Any]], merge_strategy: dict[str, str]
|
||||
) -> dict[str, Any]:
|
||||
"""Merge attributes from multiple relationships.
|
||||
|
||||
Args:
|
||||
relation_data_list: List of dictionaries containing relationship data
|
||||
merge_strategy: Merge strategy for each field
|
||||
|
||||
Returns:
|
||||
Dictionary containing merged relationship data
|
||||
"""
|
||||
merged_data = {}
|
||||
|
||||
# Collect all possible keys
|
||||
all_keys = set()
|
||||
for data in relation_data_list:
|
||||
all_keys.update(data.keys())
|
||||
|
||||
# Merge values for each key
|
||||
for key in all_keys:
|
||||
# Get all values for this key
|
||||
values = [
|
||||
data.get(key)
|
||||
for data in relation_data_list
|
||||
if data.get(key) is not None
|
||||
]
|
||||
|
||||
if not values:
|
||||
continue
|
||||
|
||||
# Merge values according to strategy
|
||||
strategy = merge_strategy.get(key, "keep_first")
|
||||
|
||||
if strategy == "concatenate":
|
||||
merged_data[key] = "\n\n".join(str(v) for v in values)
|
||||
elif strategy == "keep_first":
|
||||
merged_data[key] = values[0]
|
||||
elif strategy == "keep_last":
|
||||
merged_data[key] = values[-1]
|
||||
elif strategy == "join_unique":
|
||||
# Handle fields separated by GRAPH_FIELD_SEP
|
||||
unique_items = set()
|
||||
for value in values:
|
||||
items = str(value).split(GRAPH_FIELD_SEP)
|
||||
unique_items.update(items)
|
||||
merged_data[key] = GRAPH_FIELD_SEP.join(unique_items)
|
||||
elif strategy == "max":
|
||||
# For numeric fields like weight
|
||||
try:
|
||||
merged_data[key] = max(float(v) for v in values)
|
||||
except (ValueError, TypeError):
|
||||
merged_data[key] = values[0]
|
||||
else:
|
||||
# Default strategy
|
||||
merged_data[key] = values[0]
|
||||
|
||||
return merged_data
|
||||
|
||||
async def _merge_entities_done(self) -> None:
|
||||
"""Callback after entity merging is complete, ensures updates are persisted"""
|
||||
await asyncio.gather(
|
||||
*[
|
||||
cast(StorageNameSpace, storage_inst).index_done_callback()
|
||||
for storage_inst in [ # type: ignore
|
||||
self.entities_vdb,
|
||||
self.relationships_vdb,
|
||||
self.chunk_entity_relation_graph,
|
||||
]
|
||||
]
|
||||
)
|
||||
|
Reference in New Issue
Block a user