Merge branch 'main' into add-graph-search-mode

This commit is contained in:
yangdx
2025-03-06 10:23:38 +08:00
17 changed files with 2385 additions and 90 deletions

View File

@@ -1453,6 +1453,68 @@ class LightRAG:
]
)
def delete_by_relation(self, source_entity: str, target_entity: str) -> None:
"""Synchronously delete a relation between two entities.
Args:
source_entity: Name of the source entity
target_entity: Name of the target entity
"""
loop = always_get_an_event_loop()
return loop.run_until_complete(
self.adelete_by_relation(source_entity, target_entity)
)
async def adelete_by_relation(self, source_entity: str, target_entity: str) -> None:
"""Asynchronously delete a relation between two entities.
Args:
source_entity: Name of the source entity
target_entity: Name of the target entity
"""
try:
# Check if the relation exists
edge_exists = await self.chunk_entity_relation_graph.has_edge(
source_entity, target_entity
)
if not edge_exists:
logger.warning(
f"Relation from '{source_entity}' to '{target_entity}' does not exist"
)
return
# Delete relation from vector database
relation_id = compute_mdhash_id(
source_entity + target_entity, prefix="rel-"
)
await self.relationships_vdb.delete([relation_id])
# Delete relation from knowledge graph
await self.chunk_entity_relation_graph.remove_edges(
[(source_entity, target_entity)]
)
logger.info(
f"Successfully deleted relation from '{source_entity}' to '{target_entity}'"
)
await self._delete_relation_done()
except Exception as e:
logger.error(
f"Error while deleting relation from '{source_entity}' to '{target_entity}': {e}"
)
async def _delete_relation_done(self) -> None:
"""Callback after relation deletion is complete"""
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in [ # type: ignore
self.relationships_vdb,
self.chunk_entity_relation_graph,
]
]
)
def _get_content_summary(self, content: str, max_length: int = 100) -> str:
"""Get summary of document content
@@ -1547,51 +1609,57 @@ class LightRAG:
await self.text_chunks.delete(chunk_ids)
# 5. Find and process entities and relationships that have these chunks as source
# Get all nodes in the graph
nodes = self.chunk_entity_relation_graph._graph.nodes(data=True)
edges = self.chunk_entity_relation_graph._graph.edges(data=True)
# Track which entities and relationships need to be deleted or updated
# Get all nodes and edges from the graph storage using storage-agnostic methods
entities_to_delete = set()
entities_to_update = {} # entity_name -> new_source_id
relationships_to_delete = set()
relationships_to_update = {} # (src, tgt) -> new_source_id
# Process entities
for node, data in nodes:
if "source_id" in data:
# Process entities - use storage-agnostic methods
all_labels = await self.chunk_entity_relation_graph.get_all_labels()
for node_label in all_labels:
node_data = await self.chunk_entity_relation_graph.get_node(node_label)
if node_data and "source_id" in node_data:
# Split source_id using GRAPH_FIELD_SEP
sources = set(data["source_id"].split(GRAPH_FIELD_SEP))
sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP))
sources.difference_update(chunk_ids)
if not sources:
entities_to_delete.add(node)
entities_to_delete.add(node_label)
logger.debug(
f"Entity {node} marked for deletion - no remaining sources"
f"Entity {node_label} marked for deletion - no remaining sources"
)
else:
new_source_id = GRAPH_FIELD_SEP.join(sources)
entities_to_update[node] = new_source_id
entities_to_update[node_label] = new_source_id
logger.debug(
f"Entity {node} will be updated with new source_id: {new_source_id}"
f"Entity {node_label} will be updated with new source_id: {new_source_id}"
)
# Process relationships
for src, tgt, data in edges:
if "source_id" in data:
# Split source_id using GRAPH_FIELD_SEP
sources = set(data["source_id"].split(GRAPH_FIELD_SEP))
sources.difference_update(chunk_ids)
if not sources:
relationships_to_delete.add((src, tgt))
logger.debug(
f"Relationship {src}-{tgt} marked for deletion - no remaining sources"
)
else:
new_source_id = GRAPH_FIELD_SEP.join(sources)
relationships_to_update[(src, tgt)] = new_source_id
logger.debug(
f"Relationship {src}-{tgt} will be updated with new source_id: {new_source_id}"
for node_label in all_labels:
node_edges = await self.chunk_entity_relation_graph.get_node_edges(
node_label
)
if node_edges:
for src, tgt in node_edges:
edge_data = await self.chunk_entity_relation_graph.get_edge(
src, tgt
)
if edge_data and "source_id" in edge_data:
# Split source_id using GRAPH_FIELD_SEP
sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP))
sources.difference_update(chunk_ids)
if not sources:
relationships_to_delete.add((src, tgt))
logger.debug(
f"Relationship {src}-{tgt} marked for deletion - no remaining sources"
)
else:
new_source_id = GRAPH_FIELD_SEP.join(sources)
relationships_to_update[(src, tgt)] = new_source_id
logger.debug(
f"Relationship {src}-{tgt} will be updated with new source_id: {new_source_id}"
)
# Delete entities
if entities_to_delete:
@@ -1605,12 +1673,15 @@ class LightRAG:
# Update entities
for entity, new_source_id in entities_to_update.items():
node_data = self.chunk_entity_relation_graph._graph.nodes[entity]
node_data["source_id"] = new_source_id
await self.chunk_entity_relation_graph.upsert_node(entity, node_data)
logger.debug(
f"Updated entity {entity} with new source_id: {new_source_id}"
)
node_data = await self.chunk_entity_relation_graph.get_node(entity)
if node_data:
node_data["source_id"] = new_source_id
await self.chunk_entity_relation_graph.upsert_node(
entity, node_data
)
logger.debug(
f"Updated entity {entity} with new source_id: {new_source_id}"
)
# Delete relationships
if relationships_to_delete:
@@ -1628,12 +1699,15 @@ class LightRAG:
# Update relationships
for (src, tgt), new_source_id in relationships_to_update.items():
edge_data = self.chunk_entity_relation_graph._graph.edges[src, tgt]
edge_data["source_id"] = new_source_id
await self.chunk_entity_relation_graph.upsert_edge(src, tgt, edge_data)
logger.debug(
f"Updated relationship {src}-{tgt} with new source_id: {new_source_id}"
)
edge_data = await self.chunk_entity_relation_graph.get_edge(src, tgt)
if edge_data:
edge_data["source_id"] = new_source_id
await self.chunk_entity_relation_graph.upsert_edge(
src, tgt, edge_data
)
logger.debug(
f"Updated relationship {src}-{tgt} with new source_id: {new_source_id}"
)
# 6. Delete original document and status
await self.full_docs.delete([doc_id])
@@ -1925,6 +1999,9 @@ class LightRAG:
new_entity_name, new_node_data
)
# Store relationships that need to be updated
relations_to_update = []
# Get all edges related to the original entity
edges = await self.chunk_entity_relation_graph.get_node_edges(
entity_name
@@ -1940,10 +2017,16 @@ class LightRAG:
await self.chunk_entity_relation_graph.upsert_edge(
new_entity_name, target, edge_data
)
relations_to_update.append(
(new_entity_name, target, edge_data)
)
else: # target == entity_name
await self.chunk_entity_relation_graph.upsert_edge(
source, new_entity_name, edge_data
)
relations_to_update.append(
(source, new_entity_name, edge_data)
)
# Delete old entity
await self.chunk_entity_relation_graph.delete_node(entity_name)
@@ -1952,6 +2035,35 @@ class LightRAG:
old_entity_id = compute_mdhash_id(entity_name, prefix="ent-")
await self.entities_vdb.delete([old_entity_id])
# Update relationship vector representations
for src, tgt, edge_data in relations_to_update:
description = edge_data.get("description", "")
keywords = edge_data.get("keywords", "")
source_id = edge_data.get("source_id", "")
weight = float(edge_data.get("weight", 1.0))
# Create new content for embedding
content = f"{src}\t{tgt}\n{keywords}\n{description}"
# Calculate relationship ID
relation_id = compute_mdhash_id(src + tgt, prefix="rel-")
# Prepare data for vector database update
relation_data = {
relation_id: {
"content": content,
"src_id": src,
"tgt_id": tgt,
"source_id": source_id,
"description": description,
"keywords": keywords,
"weight": weight,
}
}
# Update vector database
await self.relationships_vdb.upsert(relation_data)
# Update working entity name to new name
entity_name = new_entity_name
else:
@@ -2062,7 +2174,7 @@ class LightRAG:
weight = float(new_edge_data.get("weight", 1.0))
# Create content for embedding
content = f"{keywords}\t{source_entity}\n{target_entity}\n{description}"
content = f"{source_entity}\t{target_entity}\n{keywords}\n{description}"
# Calculate relation ID
relation_id = compute_mdhash_id(
@@ -2326,3 +2438,392 @@ class LightRAG:
return loop.run_until_complete(
self.acreate_relation(source_entity, target_entity, relation_data)
)
async def amerge_entities(
self,
source_entities: list[str],
target_entity: str,
merge_strategy: dict[str, str] = None,
target_entity_data: dict[str, Any] = None,
) -> dict[str, Any]:
"""Asynchronously merge multiple entities into one entity.
Merges multiple source entities into a target entity, handling all relationships,
and updating both the knowledge graph and vector database.
Args:
source_entities: List of source entity names to merge
target_entity: Name of the target entity after merging
merge_strategy: Merge strategy configuration, e.g. {"description": "concatenate", "entity_type": "keep_first"}
Supported strategies:
- "concatenate": Concatenate all values (for text fields)
- "keep_first": Keep the first non-empty value
- "keep_last": Keep the last non-empty value
- "join_unique": Join all unique values (for fields separated by delimiter)
target_entity_data: Dictionary of specific values to set for the target entity,
overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"}
Returns:
Dictionary containing the merged entity information
"""
try:
# Default merge strategy
default_strategy = {
"description": "concatenate",
"entity_type": "keep_first",
"source_id": "join_unique",
}
merge_strategy = (
default_strategy
if merge_strategy is None
else {**default_strategy, **merge_strategy}
)
target_entity_data = (
{} if target_entity_data is None else target_entity_data
)
# 1. Check if all source entities exist
source_entities_data = {}
for entity_name in source_entities:
node_data = await self.chunk_entity_relation_graph.get_node(entity_name)
if not node_data:
raise ValueError(f"Source entity '{entity_name}' does not exist")
source_entities_data[entity_name] = node_data
# 2. Check if target entity exists and get its data if it does
target_exists = await self.chunk_entity_relation_graph.has_node(
target_entity
)
target_entity_data = {}
if target_exists:
target_entity_data = await self.chunk_entity_relation_graph.get_node(
target_entity
)
logger.info(
f"Target entity '{target_entity}' already exists, will merge data"
)
# 3. Merge entity data
merged_entity_data = self._merge_entity_attributes(
list(source_entities_data.values())
+ ([target_entity_data] if target_exists else []),
merge_strategy,
)
# Apply any explicitly provided target entity data (overrides merged data)
for key, value in target_entity_data.items():
merged_entity_data[key] = value
# 4. Get all relationships of the source entities
all_relations = []
for entity_name in source_entities:
# Get all relationships where this entity is the source
outgoing_edges = await self.chunk_entity_relation_graph.get_node_edges(
entity_name
)
if outgoing_edges:
for src, tgt in outgoing_edges:
# Ensure src is the current entity
if src == entity_name:
edge_data = await self.chunk_entity_relation_graph.get_edge(
src, tgt
)
all_relations.append(("outgoing", src, tgt, edge_data))
# Get all relationships where this entity is the target
incoming_edges = []
all_labels = await self.chunk_entity_relation_graph.get_all_labels()
for label in all_labels:
if label == entity_name:
continue
node_edges = await self.chunk_entity_relation_graph.get_node_edges(
label
)
for src, tgt in node_edges or []:
if tgt == entity_name:
incoming_edges.append((src, tgt))
for src, tgt in incoming_edges:
edge_data = await self.chunk_entity_relation_graph.get_edge(
src, tgt
)
all_relations.append(("incoming", src, tgt, edge_data))
# 5. Create or update the target entity
if not target_exists:
await self.chunk_entity_relation_graph.upsert_node(
target_entity, merged_entity_data
)
logger.info(f"Created new target entity '{target_entity}'")
else:
await self.chunk_entity_relation_graph.upsert_node(
target_entity, merged_entity_data
)
logger.info(f"Updated existing target entity '{target_entity}'")
# 6. Recreate all relationships, pointing to the target entity
relation_updates = {} # Track relationships that need to be merged
for rel_type, src, tgt, edge_data in all_relations:
new_src = target_entity if src in source_entities else src
new_tgt = target_entity if tgt in source_entities else tgt
# Skip relationships between source entities to avoid self-loops
if new_src == new_tgt:
logger.info(
f"Skipping relationship between source entities: {src} -> {tgt} to avoid self-loop"
)
continue
# Check if the same relationship already exists
relation_key = f"{new_src}|{new_tgt}"
if relation_key in relation_updates:
# Merge relationship data
existing_data = relation_updates[relation_key]["data"]
merged_relation = self._merge_relation_attributes(
[existing_data, edge_data],
{
"description": "concatenate",
"keywords": "join_unique",
"source_id": "join_unique",
"weight": "max",
},
)
relation_updates[relation_key]["data"] = merged_relation
logger.info(
f"Merged duplicate relationship: {new_src} -> {new_tgt}"
)
else:
relation_updates[relation_key] = {
"src": new_src,
"tgt": new_tgt,
"data": edge_data.copy(),
}
# Apply relationship updates
for rel_data in relation_updates.values():
await self.chunk_entity_relation_graph.upsert_edge(
rel_data["src"], rel_data["tgt"], rel_data["data"]
)
logger.info(
f"Created or updated relationship: {rel_data['src']} -> {rel_data['tgt']}"
)
# 7. Update entity vector representation
description = merged_entity_data.get("description", "")
source_id = merged_entity_data.get("source_id", "")
entity_type = merged_entity_data.get("entity_type", "")
content = target_entity + "\n" + description
entity_id = compute_mdhash_id(target_entity, prefix="ent-")
entity_data_for_vdb = {
entity_id: {
"content": content,
"entity_name": target_entity,
"source_id": source_id,
"description": description,
"entity_type": entity_type,
}
}
await self.entities_vdb.upsert(entity_data_for_vdb)
# 8. Update relationship vector representations
for rel_data in relation_updates.values():
src = rel_data["src"]
tgt = rel_data["tgt"]
edge_data = rel_data["data"]
description = edge_data.get("description", "")
keywords = edge_data.get("keywords", "")
source_id = edge_data.get("source_id", "")
weight = float(edge_data.get("weight", 1.0))
content = f"{keywords}\t{src}\n{tgt}\n{description}"
relation_id = compute_mdhash_id(src + tgt, prefix="rel-")
relation_data_for_vdb = {
relation_id: {
"content": content,
"src_id": src,
"tgt_id": tgt,
"source_id": source_id,
"description": description,
"keywords": keywords,
"weight": weight,
}
}
await self.relationships_vdb.upsert(relation_data_for_vdb)
# 9. Delete source entities
for entity_name in source_entities:
# Delete entity node
await self.chunk_entity_relation_graph.delete_node(entity_name)
# Delete record from vector database
entity_id = compute_mdhash_id(entity_name, prefix="ent-")
await self.entities_vdb.delete([entity_id])
logger.info(f"Deleted source entity '{entity_name}'")
# 10. Save changes
await self._merge_entities_done()
logger.info(
f"Successfully merged {len(source_entities)} entities into '{target_entity}'"
)
return await self.get_entity_info(target_entity, include_vector_data=True)
except Exception as e:
logger.error(f"Error merging entities: {e}")
raise
def merge_entities(
self,
source_entities: list[str],
target_entity: str,
merge_strategy: dict[str, str] = None,
target_entity_data: dict[str, Any] = None,
) -> dict[str, Any]:
"""Synchronously merge multiple entities into one entity.
Merges multiple source entities into a target entity, handling all relationships,
and updating both the knowledge graph and vector database.
Args:
source_entities: List of source entity names to merge
target_entity: Name of the target entity after merging
merge_strategy: Merge strategy configuration, e.g. {"description": "concatenate", "entity_type": "keep_first"}
target_entity_data: Dictionary of specific values to set for the target entity,
overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"}
Returns:
Dictionary containing the merged entity information
"""
loop = always_get_an_event_loop()
return loop.run_until_complete(
self.amerge_entities(
source_entities, target_entity, merge_strategy, target_entity_data
)
)
def _merge_entity_attributes(
self, entity_data_list: list[dict[str, Any]], merge_strategy: dict[str, str]
) -> dict[str, Any]:
"""Merge attributes from multiple entities.
Args:
entity_data_list: List of dictionaries containing entity data
merge_strategy: Merge strategy for each field
Returns:
Dictionary containing merged entity data
"""
merged_data = {}
# Collect all possible keys
all_keys = set()
for data in entity_data_list:
all_keys.update(data.keys())
# Merge values for each key
for key in all_keys:
# Get all values for this key
values = [data.get(key) for data in entity_data_list if data.get(key)]
if not values:
continue
# Merge values according to strategy
strategy = merge_strategy.get(key, "keep_first")
if strategy == "concatenate":
merged_data[key] = "\n\n".join(values)
elif strategy == "keep_first":
merged_data[key] = values[0]
elif strategy == "keep_last":
merged_data[key] = values[-1]
elif strategy == "join_unique":
# Handle fields separated by GRAPH_FIELD_SEP
unique_items = set()
for value in values:
items = value.split(GRAPH_FIELD_SEP)
unique_items.update(items)
merged_data[key] = GRAPH_FIELD_SEP.join(unique_items)
else:
# Default strategy
merged_data[key] = values[0]
return merged_data
def _merge_relation_attributes(
self, relation_data_list: list[dict[str, Any]], merge_strategy: dict[str, str]
) -> dict[str, Any]:
"""Merge attributes from multiple relationships.
Args:
relation_data_list: List of dictionaries containing relationship data
merge_strategy: Merge strategy for each field
Returns:
Dictionary containing merged relationship data
"""
merged_data = {}
# Collect all possible keys
all_keys = set()
for data in relation_data_list:
all_keys.update(data.keys())
# Merge values for each key
for key in all_keys:
# Get all values for this key
values = [
data.get(key)
for data in relation_data_list
if data.get(key) is not None
]
if not values:
continue
# Merge values according to strategy
strategy = merge_strategy.get(key, "keep_first")
if strategy == "concatenate":
merged_data[key] = "\n\n".join(str(v) for v in values)
elif strategy == "keep_first":
merged_data[key] = values[0]
elif strategy == "keep_last":
merged_data[key] = values[-1]
elif strategy == "join_unique":
# Handle fields separated by GRAPH_FIELD_SEP
unique_items = set()
for value in values:
items = str(value).split(GRAPH_FIELD_SEP)
unique_items.update(items)
merged_data[key] = GRAPH_FIELD_SEP.join(unique_items)
elif strategy == "max":
# For numeric fields like weight
try:
merged_data[key] = max(float(v) for v in values)
except (ValueError, TypeError):
merged_data[key] = values[0]
else:
# Default strategy
merged_data[key] = values[0]
return merged_data
async def _merge_entities_done(self) -> None:
"""Callback after entity merging is complete, ensures updates are persisted"""
await asyncio.gather(
*[
cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in [ # type: ignore
self.entities_vdb,
self.relationships_vdb,
self.chunk_entity_relation_graph,
]
]
)