Add a feature that allows modifying nodes and relationships.
This commit is contained in:
76
README.md
76
README.md
@@ -750,6 +750,82 @@ rag.delete_by_entity("Project Gutenberg")
|
|||||||
rag.delete_by_doc_id("doc_id")
|
rag.delete_by_doc_id("doc_id")
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Edit Entities and Relations
|
||||||
|
|
||||||
|
LightRAG now supports comprehensive knowledge graph management capabilities, allowing you to create, edit, and delete entities and relationships within your knowledge graph.
|
||||||
|
|
||||||
|
### Create Entities and Relations
|
||||||
|
|
||||||
|
```python
|
||||||
|
import asyncio
|
||||||
|
from lightrag import LightRAG
|
||||||
|
|
||||||
|
# Initialize LightRAG
|
||||||
|
rag = LightRAG(
|
||||||
|
working_dir="your_working_dir",
|
||||||
|
embedding_func=your_embedding_function,
|
||||||
|
llm_model_func=your_llm_function
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create new entity
|
||||||
|
entity = rag.create_entity("Google", {
|
||||||
|
"description": "Google is a multinational technology company specializing in internet-related services and products.",
|
||||||
|
"entity_type": "company"
|
||||||
|
})
|
||||||
|
|
||||||
|
# Create another entity
|
||||||
|
product = rag.create_entity("Gmail", {
|
||||||
|
"description": "Gmail is an email service developed by Google.",
|
||||||
|
"entity_type": "product"
|
||||||
|
})
|
||||||
|
|
||||||
|
# Create relation between entities
|
||||||
|
relation = rag.create_relation("Google", "Gmail", {
|
||||||
|
"description": "Google develops and operates Gmail.",
|
||||||
|
"keywords": "develops operates service",
|
||||||
|
"weight": 2.0
|
||||||
|
})
|
||||||
|
```
|
||||||
|
|
||||||
|
### Edit Entities and Relations
|
||||||
|
|
||||||
|
```python
|
||||||
|
# Edit an existing entity
|
||||||
|
updated_entity = rag.edit_entity("Google", {
|
||||||
|
"description": "Google is a subsidiary of Alphabet Inc., founded in 1998.",
|
||||||
|
"entity_type": "tech_company"
|
||||||
|
})
|
||||||
|
|
||||||
|
# Rename an entity (with all its relationships properly migrated)
|
||||||
|
renamed_entity = rag.edit_entity("Gmail", {
|
||||||
|
"entity_name": "Google Mail",
|
||||||
|
"description": "Google Mail (formerly Gmail) is an email service."
|
||||||
|
})
|
||||||
|
|
||||||
|
# Edit a relation between entities
|
||||||
|
updated_relation = rag.edit_relation("Google", "Google Mail", {
|
||||||
|
"description": "Google created and maintains Google Mail service.",
|
||||||
|
"keywords": "creates maintains email service",
|
||||||
|
"weight": 3.0
|
||||||
|
})
|
||||||
|
```
|
||||||
|
|
||||||
|
All operations are available in both synchronous and asynchronous versions. The asynchronous versions have the prefix "a" (e.g., `acreate_entity`, `aedit_relation`).
|
||||||
|
|
||||||
|
#### Entity Operations
|
||||||
|
|
||||||
|
- **create_entity**: Creates a new entity with specified attributes
|
||||||
|
- **edit_entity**: Updates an existing entity's attributes or renames it
|
||||||
|
- **delete_entity**: Removes an entity and all its relationships
|
||||||
|
|
||||||
|
#### Relation Operations
|
||||||
|
|
||||||
|
- **create_relation**: Creates a new relation between existing entities
|
||||||
|
- **edit_relation**: Updates an existing relation's attributes
|
||||||
|
- **delete_relation**: Removes a relation between entities
|
||||||
|
|
||||||
|
These operations maintain data consistency across both the graph database and vector database components, ensuring your knowledge graph remains coherent.
|
||||||
|
|
||||||
## Cache
|
## Cache
|
||||||
|
|
||||||
<details>
|
<details>
|
||||||
|
@@ -967,7 +967,7 @@ class LightRAG:
|
|||||||
# Insert chunks into vector storage
|
# Insert chunks into vector storage
|
||||||
all_chunks_data: dict[str, dict[str, str]] = {}
|
all_chunks_data: dict[str, dict[str, str]] = {}
|
||||||
chunk_to_source_map: dict[str, str] = {}
|
chunk_to_source_map: dict[str, str] = {}
|
||||||
for chunk_data in custom_kg.get("chunks", {}):
|
for chunk_data in custom_kg.get("chunks", []):
|
||||||
chunk_content = self.clean_text(chunk_data["content"])
|
chunk_content = self.clean_text(chunk_data["content"])
|
||||||
source_id = chunk_data["source_id"]
|
source_id = chunk_data["source_id"]
|
||||||
tokens = len(
|
tokens = len(
|
||||||
@@ -997,9 +997,10 @@ class LightRAG:
|
|||||||
update_storage = True
|
update_storage = True
|
||||||
|
|
||||||
if all_chunks_data:
|
if all_chunks_data:
|
||||||
await self.chunks_vdb.upsert(all_chunks_data)
|
await asyncio.gather(
|
||||||
if all_chunks_data:
|
self.chunks_vdb.upsert(all_chunks_data),
|
||||||
await self.text_chunks.upsert(all_chunks_data)
|
self.text_chunks.upsert(all_chunks_data),
|
||||||
|
)
|
||||||
|
|
||||||
# Insert entities into knowledge graph
|
# Insert entities into knowledge graph
|
||||||
all_entities_data: list[dict[str, str]] = []
|
all_entities_data: list[dict[str, str]] = []
|
||||||
@@ -1007,7 +1008,6 @@ class LightRAG:
|
|||||||
entity_name = entity_data["entity_name"]
|
entity_name = entity_data["entity_name"]
|
||||||
entity_type = entity_data.get("entity_type", "UNKNOWN")
|
entity_type = entity_data.get("entity_type", "UNKNOWN")
|
||||||
description = entity_data.get("description", "No description provided")
|
description = entity_data.get("description", "No description provided")
|
||||||
# source_id = entity_data["source_id"]
|
|
||||||
source_chunk_id = entity_data.get("source_id", "UNKNOWN")
|
source_chunk_id = entity_data.get("source_id", "UNKNOWN")
|
||||||
source_id = chunk_to_source_map.get(source_chunk_id, "UNKNOWN")
|
source_id = chunk_to_source_map.get(source_chunk_id, "UNKNOWN")
|
||||||
|
|
||||||
@@ -1039,7 +1039,6 @@ class LightRAG:
|
|||||||
description = relationship_data["description"]
|
description = relationship_data["description"]
|
||||||
keywords = relationship_data["keywords"]
|
keywords = relationship_data["keywords"]
|
||||||
weight = relationship_data.get("weight", 1.0)
|
weight = relationship_data.get("weight", 1.0)
|
||||||
# source_id = relationship_data["source_id"]
|
|
||||||
source_chunk_id = relationship_data.get("source_id", "UNKNOWN")
|
source_chunk_id = relationship_data.get("source_id", "UNKNOWN")
|
||||||
source_id = chunk_to_source_map.get(source_chunk_id, "UNKNOWN")
|
source_id = chunk_to_source_map.get(source_chunk_id, "UNKNOWN")
|
||||||
|
|
||||||
@@ -1079,34 +1078,43 @@ class LightRAG:
|
|||||||
"tgt_id": tgt_id,
|
"tgt_id": tgt_id,
|
||||||
"description": description,
|
"description": description,
|
||||||
"keywords": keywords,
|
"keywords": keywords,
|
||||||
|
"source_id": source_id,
|
||||||
|
"weight": weight,
|
||||||
}
|
}
|
||||||
all_relationships_data.append(edge_data)
|
all_relationships_data.append(edge_data)
|
||||||
update_storage = True
|
update_storage = True
|
||||||
|
|
||||||
# Insert entities into vector storage if needed
|
# Insert entities into vector storage with consistent format
|
||||||
data_for_vdb = {
|
data_for_vdb = {
|
||||||
compute_mdhash_id(dp["entity_name"], prefix="ent-"): {
|
compute_mdhash_id(dp["entity_name"], prefix="ent-"): {
|
||||||
"content": dp["entity_name"] + dp["description"],
|
"content": dp["entity_name"] + "\n" + dp["description"],
|
||||||
"entity_name": dp["entity_name"],
|
"entity_name": dp["entity_name"],
|
||||||
|
"source_id": dp["source_id"],
|
||||||
|
"description": dp["description"],
|
||||||
|
"entity_type": dp["entity_type"],
|
||||||
}
|
}
|
||||||
for dp in all_entities_data
|
for dp in all_entities_data
|
||||||
}
|
}
|
||||||
await self.entities_vdb.upsert(data_for_vdb)
|
await self.entities_vdb.upsert(data_for_vdb)
|
||||||
|
|
||||||
# Insert relationships into vector storage if needed
|
# Insert relationships into vector storage with consistent format
|
||||||
data_for_vdb = {
|
data_for_vdb = {
|
||||||
compute_mdhash_id(dp["src_id"] + dp["tgt_id"], prefix="rel-"): {
|
compute_mdhash_id(dp["src_id"] + dp["tgt_id"], prefix="rel-"): {
|
||||||
"src_id": dp["src_id"],
|
"src_id": dp["src_id"],
|
||||||
"tgt_id": dp["tgt_id"],
|
"tgt_id": dp["tgt_id"],
|
||||||
"content": dp["keywords"]
|
"source_id": dp["source_id"],
|
||||||
+ dp["src_id"]
|
"content": f"{dp['keywords']}\t{dp['src_id']}\n{dp['tgt_id']}\n{dp['description']}",
|
||||||
+ dp["tgt_id"]
|
"keywords": dp["keywords"],
|
||||||
+ dp["description"],
|
"description": dp["description"],
|
||||||
|
"weight": dp["weight"],
|
||||||
}
|
}
|
||||||
for dp in all_relationships_data
|
for dp in all_relationships_data
|
||||||
}
|
}
|
||||||
await self.relationships_vdb.upsert(data_for_vdb)
|
await self.relationships_vdb.upsert(data_for_vdb)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in ainsert_custom_kg: {e}")
|
||||||
|
raise
|
||||||
finally:
|
finally:
|
||||||
if update_storage:
|
if update_storage:
|
||||||
await self._insert_done()
|
await self._insert_done()
|
||||||
@@ -1759,3 +1767,461 @@ class LightRAG:
|
|||||||
def clear_cache(self, modes: list[str] | None = None) -> None:
|
def clear_cache(self, modes: list[str] | None = None) -> None:
|
||||||
"""Synchronous version of aclear_cache."""
|
"""Synchronous version of aclear_cache."""
|
||||||
return always_get_an_event_loop().run_until_complete(self.aclear_cache(modes))
|
return always_get_an_event_loop().run_until_complete(self.aclear_cache(modes))
|
||||||
|
|
||||||
|
async def aedit_entity(
|
||||||
|
self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Asynchronously edit entity information.
|
||||||
|
|
||||||
|
Updates entity information in the knowledge graph and re-embeds the entity in the vector database.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
entity_name: Name of the entity to edit
|
||||||
|
updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"}
|
||||||
|
allow_rename: Whether to allow entity renaming, defaults to True
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary containing updated entity information
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# 1. Get current entity information
|
||||||
|
node_data = await self.chunk_entity_relation_graph.get_node(entity_name)
|
||||||
|
if not node_data:
|
||||||
|
raise ValueError(f"Entity '{entity_name}' does not exist")
|
||||||
|
|
||||||
|
# Check if entity is being renamed
|
||||||
|
new_entity_name = updated_data.get("entity_name", entity_name)
|
||||||
|
is_renaming = new_entity_name != entity_name
|
||||||
|
|
||||||
|
# If renaming, check if new name already exists
|
||||||
|
if is_renaming:
|
||||||
|
if not allow_rename:
|
||||||
|
raise ValueError(
|
||||||
|
"Entity renaming is not allowed. Set allow_rename=True to enable this feature"
|
||||||
|
)
|
||||||
|
|
||||||
|
existing_node = await self.chunk_entity_relation_graph.get_node(
|
||||||
|
new_entity_name
|
||||||
|
)
|
||||||
|
if existing_node:
|
||||||
|
raise ValueError(
|
||||||
|
f"Entity name '{new_entity_name}' already exists, cannot rename"
|
||||||
|
)
|
||||||
|
|
||||||
|
# 2. Update entity information in the graph
|
||||||
|
new_node_data = {**node_data, **updated_data}
|
||||||
|
if "entity_name" in new_node_data:
|
||||||
|
del new_node_data[
|
||||||
|
"entity_name"
|
||||||
|
] # Node data should not contain entity_name field
|
||||||
|
|
||||||
|
# If renaming entity
|
||||||
|
if is_renaming:
|
||||||
|
logger.info(f"Renaming entity '{entity_name}' to '{new_entity_name}'")
|
||||||
|
|
||||||
|
# Create new entity
|
||||||
|
await self.chunk_entity_relation_graph.upsert_node(
|
||||||
|
new_entity_name, new_node_data
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get all edges related to the original entity
|
||||||
|
edges = await self.chunk_entity_relation_graph.get_node_edges(
|
||||||
|
entity_name
|
||||||
|
)
|
||||||
|
if edges:
|
||||||
|
# Recreate edges for the new entity
|
||||||
|
for source, target in edges:
|
||||||
|
edge_data = await self.chunk_entity_relation_graph.get_edge(
|
||||||
|
source, target
|
||||||
|
)
|
||||||
|
if edge_data:
|
||||||
|
if source == entity_name:
|
||||||
|
await self.chunk_entity_relation_graph.upsert_edge(
|
||||||
|
new_entity_name, target, edge_data
|
||||||
|
)
|
||||||
|
else: # target == entity_name
|
||||||
|
await self.chunk_entity_relation_graph.upsert_edge(
|
||||||
|
source, new_entity_name, edge_data
|
||||||
|
)
|
||||||
|
|
||||||
|
# Delete old entity
|
||||||
|
await self.chunk_entity_relation_graph.delete_node(entity_name)
|
||||||
|
|
||||||
|
# Delete old entity record from vector database
|
||||||
|
old_entity_id = compute_mdhash_id(entity_name, prefix="ent-")
|
||||||
|
await self.entities_vdb.delete([old_entity_id])
|
||||||
|
|
||||||
|
# Update working entity name to new name
|
||||||
|
entity_name = new_entity_name
|
||||||
|
else:
|
||||||
|
# If not renaming, directly update node data
|
||||||
|
await self.chunk_entity_relation_graph.upsert_node(
|
||||||
|
entity_name, new_node_data
|
||||||
|
)
|
||||||
|
|
||||||
|
# 3. Recalculate entity's vector representation and update vector database
|
||||||
|
description = new_node_data.get("description", "")
|
||||||
|
source_id = new_node_data.get("source_id", "")
|
||||||
|
entity_type = new_node_data.get("entity_type", "")
|
||||||
|
content = entity_name + "\n" + description
|
||||||
|
|
||||||
|
# Calculate entity ID
|
||||||
|
entity_id = compute_mdhash_id(entity_name, prefix="ent-")
|
||||||
|
|
||||||
|
# Prepare data for vector database update
|
||||||
|
entity_data = {
|
||||||
|
entity_id: {
|
||||||
|
"content": content,
|
||||||
|
"entity_name": entity_name,
|
||||||
|
"source_id": source_id,
|
||||||
|
"description": description,
|
||||||
|
"entity_type": entity_type,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Update vector database
|
||||||
|
await self.entities_vdb.upsert(entity_data)
|
||||||
|
|
||||||
|
# 4. Save changes
|
||||||
|
await self._edit_entity_done()
|
||||||
|
|
||||||
|
logger.info(f"Entity '{entity_name}' successfully updated")
|
||||||
|
return await self.get_entity_info(entity_name, include_vector_data=True)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error while editing entity '{entity_name}': {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def edit_entity(
|
||||||
|
self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Synchronously edit entity information.
|
||||||
|
|
||||||
|
Updates entity information in the knowledge graph and re-embeds the entity in the vector database.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
entity_name: Name of the entity to edit
|
||||||
|
updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"}
|
||||||
|
allow_rename: Whether to allow entity renaming, defaults to True
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary containing updated entity information
|
||||||
|
"""
|
||||||
|
loop = always_get_an_event_loop()
|
||||||
|
return loop.run_until_complete(
|
||||||
|
self.aedit_entity(entity_name, updated_data, allow_rename)
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _edit_entity_done(self) -> None:
|
||||||
|
"""Callback after entity editing is complete, ensures updates are persisted"""
|
||||||
|
await asyncio.gather(
|
||||||
|
*[
|
||||||
|
cast(StorageNameSpace, storage_inst).index_done_callback()
|
||||||
|
for storage_inst in [ # type: ignore
|
||||||
|
self.entities_vdb,
|
||||||
|
self.chunk_entity_relation_graph,
|
||||||
|
]
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
async def aedit_relation(
|
||||||
|
self, source_entity: str, target_entity: str, updated_data: dict[str, Any]
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Asynchronously edit relation information.
|
||||||
|
|
||||||
|
Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
source_entity: Name of the source entity
|
||||||
|
target_entity: Name of the target entity
|
||||||
|
updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "keywords": "new keywords"}
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary containing updated relation information
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# 1. Get current relation information
|
||||||
|
edge_data = await self.chunk_entity_relation_graph.get_edge(
|
||||||
|
source_entity, target_entity
|
||||||
|
)
|
||||||
|
if not edge_data:
|
||||||
|
raise ValueError(
|
||||||
|
f"Relation from '{source_entity}' to '{target_entity}' does not exist"
|
||||||
|
)
|
||||||
|
|
||||||
|
# 2. Update relation information in the graph
|
||||||
|
new_edge_data = {**edge_data, **updated_data}
|
||||||
|
await self.chunk_entity_relation_graph.upsert_edge(
|
||||||
|
source_entity, target_entity, new_edge_data
|
||||||
|
)
|
||||||
|
|
||||||
|
# 3. Recalculate relation's vector representation and update vector database
|
||||||
|
description = new_edge_data.get("description", "")
|
||||||
|
keywords = new_edge_data.get("keywords", "")
|
||||||
|
source_id = new_edge_data.get("source_id", "")
|
||||||
|
weight = float(new_edge_data.get("weight", 1.0))
|
||||||
|
|
||||||
|
# Create content for embedding
|
||||||
|
content = f"{keywords}\t{source_entity}\n{target_entity}\n{description}"
|
||||||
|
|
||||||
|
# Calculate relation ID
|
||||||
|
relation_id = compute_mdhash_id(
|
||||||
|
source_entity + target_entity, prefix="rel-"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Prepare data for vector database update
|
||||||
|
relation_data = {
|
||||||
|
relation_id: {
|
||||||
|
"content": content,
|
||||||
|
"src_id": source_entity,
|
||||||
|
"tgt_id": target_entity,
|
||||||
|
"source_id": source_id,
|
||||||
|
"description": description,
|
||||||
|
"keywords": keywords,
|
||||||
|
"weight": weight,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Update vector database
|
||||||
|
await self.relationships_vdb.upsert(relation_data)
|
||||||
|
|
||||||
|
# 4. Save changes
|
||||||
|
await self._edit_relation_done()
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Relation from '{source_entity}' to '{target_entity}' successfully updated"
|
||||||
|
)
|
||||||
|
return await self.get_relation_info(
|
||||||
|
source_entity, target_entity, include_vector_data=True
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
f"Error while editing relation from '{source_entity}' to '{target_entity}': {e}"
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
|
def edit_relation(
|
||||||
|
self, source_entity: str, target_entity: str, updated_data: dict[str, Any]
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Synchronously edit relation information.
|
||||||
|
|
||||||
|
Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
source_entity: Name of the source entity
|
||||||
|
target_entity: Name of the target entity
|
||||||
|
updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "keywords": "keywords"}
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary containing updated relation information
|
||||||
|
"""
|
||||||
|
loop = always_get_an_event_loop()
|
||||||
|
return loop.run_until_complete(
|
||||||
|
self.aedit_relation(source_entity, target_entity, updated_data)
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _edit_relation_done(self) -> None:
|
||||||
|
"""Callback after relation editing is complete, ensures updates are persisted"""
|
||||||
|
await asyncio.gather(
|
||||||
|
*[
|
||||||
|
cast(StorageNameSpace, storage_inst).index_done_callback()
|
||||||
|
for storage_inst in [ # type: ignore
|
||||||
|
self.relationships_vdb,
|
||||||
|
self.chunk_entity_relation_graph,
|
||||||
|
]
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
async def acreate_entity(
|
||||||
|
self, entity_name: str, entity_data: dict[str, Any]
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Asynchronously create a new entity.
|
||||||
|
|
||||||
|
Creates a new entity in the knowledge graph and adds it to the vector database.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
entity_name: Name of the new entity
|
||||||
|
entity_data: Dictionary containing entity attributes, e.g. {"description": "description", "entity_type": "type"}
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary containing created entity information
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Check if entity already exists
|
||||||
|
existing_node = await self.chunk_entity_relation_graph.get_node(entity_name)
|
||||||
|
if existing_node:
|
||||||
|
raise ValueError(f"Entity '{entity_name}' already exists")
|
||||||
|
|
||||||
|
# Prepare node data with defaults if missing
|
||||||
|
node_data = {
|
||||||
|
"entity_type": entity_data.get("entity_type", "UNKNOWN"),
|
||||||
|
"description": entity_data.get("description", ""),
|
||||||
|
"source_id": entity_data.get("source_id", "manual"),
|
||||||
|
}
|
||||||
|
|
||||||
|
# Add entity to knowledge graph
|
||||||
|
await self.chunk_entity_relation_graph.upsert_node(entity_name, node_data)
|
||||||
|
|
||||||
|
# Prepare content for entity
|
||||||
|
description = node_data.get("description", "")
|
||||||
|
source_id = node_data.get("source_id", "")
|
||||||
|
entity_type = node_data.get("entity_type", "")
|
||||||
|
content = entity_name + "\n" + description
|
||||||
|
|
||||||
|
# Calculate entity ID
|
||||||
|
entity_id = compute_mdhash_id(entity_name, prefix="ent-")
|
||||||
|
|
||||||
|
# Prepare data for vector database update
|
||||||
|
entity_data_for_vdb = {
|
||||||
|
entity_id: {
|
||||||
|
"content": content,
|
||||||
|
"entity_name": entity_name,
|
||||||
|
"source_id": source_id,
|
||||||
|
"description": description,
|
||||||
|
"entity_type": entity_type,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Update vector database
|
||||||
|
await self.entities_vdb.upsert(entity_data_for_vdb)
|
||||||
|
|
||||||
|
# Save changes
|
||||||
|
await self._edit_entity_done()
|
||||||
|
|
||||||
|
logger.info(f"Entity '{entity_name}' successfully created")
|
||||||
|
return await self.get_entity_info(entity_name, include_vector_data=True)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error while creating entity '{entity_name}': {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def create_entity(
|
||||||
|
self, entity_name: str, entity_data: dict[str, Any]
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Synchronously create a new entity.
|
||||||
|
|
||||||
|
Creates a new entity in the knowledge graph and adds it to the vector database.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
entity_name: Name of the new entity
|
||||||
|
entity_data: Dictionary containing entity attributes, e.g. {"description": "description", "entity_type": "type"}
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary containing created entity information
|
||||||
|
"""
|
||||||
|
loop = always_get_an_event_loop()
|
||||||
|
return loop.run_until_complete(self.acreate_entity(entity_name, entity_data))
|
||||||
|
|
||||||
|
async def acreate_relation(
|
||||||
|
self, source_entity: str, target_entity: str, relation_data: dict[str, Any]
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Asynchronously create a new relation between entities.
|
||||||
|
|
||||||
|
Creates a new relation (edge) in the knowledge graph and adds it to the vector database.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
source_entity: Name of the source entity
|
||||||
|
target_entity: Name of the target entity
|
||||||
|
relation_data: Dictionary containing relation attributes, e.g. {"description": "description", "keywords": "keywords"}
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary containing created relation information
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Check if both entities exist
|
||||||
|
source_exists = await self.chunk_entity_relation_graph.has_node(
|
||||||
|
source_entity
|
||||||
|
)
|
||||||
|
target_exists = await self.chunk_entity_relation_graph.has_node(
|
||||||
|
target_entity
|
||||||
|
)
|
||||||
|
|
||||||
|
if not source_exists:
|
||||||
|
raise ValueError(f"Source entity '{source_entity}' does not exist")
|
||||||
|
if not target_exists:
|
||||||
|
raise ValueError(f"Target entity '{target_entity}' does not exist")
|
||||||
|
|
||||||
|
# Check if relation already exists
|
||||||
|
existing_edge = await self.chunk_entity_relation_graph.get_edge(
|
||||||
|
source_entity, target_entity
|
||||||
|
)
|
||||||
|
if existing_edge:
|
||||||
|
raise ValueError(
|
||||||
|
f"Relation from '{source_entity}' to '{target_entity}' already exists"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Prepare edge data with defaults if missing
|
||||||
|
edge_data = {
|
||||||
|
"description": relation_data.get("description", ""),
|
||||||
|
"keywords": relation_data.get("keywords", ""),
|
||||||
|
"source_id": relation_data.get("source_id", "manual"),
|
||||||
|
"weight": float(relation_data.get("weight", 1.0)),
|
||||||
|
}
|
||||||
|
|
||||||
|
# Add relation to knowledge graph
|
||||||
|
await self.chunk_entity_relation_graph.upsert_edge(
|
||||||
|
source_entity, target_entity, edge_data
|
||||||
|
)
|
||||||
|
|
||||||
|
# Prepare content for embedding
|
||||||
|
description = edge_data.get("description", "")
|
||||||
|
keywords = edge_data.get("keywords", "")
|
||||||
|
source_id = edge_data.get("source_id", "")
|
||||||
|
weight = edge_data.get("weight", 1.0)
|
||||||
|
|
||||||
|
# Create content for embedding
|
||||||
|
content = f"{keywords}\t{source_entity}\n{target_entity}\n{description}"
|
||||||
|
|
||||||
|
# Calculate relation ID
|
||||||
|
relation_id = compute_mdhash_id(
|
||||||
|
source_entity + target_entity, prefix="rel-"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Prepare data for vector database update
|
||||||
|
relation_data_for_vdb = {
|
||||||
|
relation_id: {
|
||||||
|
"content": content,
|
||||||
|
"src_id": source_entity,
|
||||||
|
"tgt_id": target_entity,
|
||||||
|
"source_id": source_id,
|
||||||
|
"description": description,
|
||||||
|
"keywords": keywords,
|
||||||
|
"weight": weight,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Update vector database
|
||||||
|
await self.relationships_vdb.upsert(relation_data_for_vdb)
|
||||||
|
|
||||||
|
# Save changes
|
||||||
|
await self._edit_relation_done()
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Relation from '{source_entity}' to '{target_entity}' successfully created"
|
||||||
|
)
|
||||||
|
return await self.get_relation_info(
|
||||||
|
source_entity, target_entity, include_vector_data=True
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
f"Error while creating relation from '{source_entity}' to '{target_entity}': {e}"
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
|
def create_relation(
|
||||||
|
self, source_entity: str, target_entity: str, relation_data: dict[str, Any]
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Synchronously create a new relation between entities.
|
||||||
|
|
||||||
|
Creates a new relation (edge) in the knowledge graph and adds it to the vector database.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
source_entity: Name of the source entity
|
||||||
|
target_entity: Name of the target entity
|
||||||
|
relation_data: Dictionary containing relation attributes, e.g. {"description": "description", "keywords": "keywords"}
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary containing created relation information
|
||||||
|
"""
|
||||||
|
loop = always_get_an_event_loop()
|
||||||
|
return loop.run_until_complete(
|
||||||
|
self.acreate_relation(source_entity, target_entity, relation_data)
|
||||||
|
)
|
||||||
|
Reference in New Issue
Block a user