Add custom KG insertion
This commit is contained in:
@@ -308,6 +308,108 @@ class LightRAG:
|
||||
tasks.append(cast(StorageNameSpace, storage_inst).index_done_callback())
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
def insert_custom_kg(self, custom_kg: dict):
|
||||
loop = always_get_an_event_loop()
|
||||
return loop.run_until_complete(self.ainsert_custom_kg(custom_kg))
|
||||
|
||||
async def ainsert_custom_kg(self, custom_kg: dict):
|
||||
update_storage = False
|
||||
try:
|
||||
# Insert entities into knowledge graph
|
||||
all_entities_data = []
|
||||
for entity_data in custom_kg.get("entities", []):
|
||||
entity_name = f'"{entity_data["entity_name"].upper()}"'
|
||||
entity_type = entity_data.get("entity_type", "UNKNOWN")
|
||||
description = entity_data.get("description", "No description provided")
|
||||
source_id = entity_data["source_id"]
|
||||
|
||||
# Prepare node data
|
||||
node_data = {
|
||||
"entity_type": entity_type,
|
||||
"description": description,
|
||||
"source_id": source_id,
|
||||
}
|
||||
# Insert node data into the knowledge graph
|
||||
await self.chunk_entity_relation_graph.upsert_node(
|
||||
entity_name, node_data=node_data
|
||||
)
|
||||
node_data["entity_name"] = entity_name
|
||||
all_entities_data.append(node_data)
|
||||
update_storage = True
|
||||
|
||||
# Insert relationships into knowledge graph
|
||||
all_relationships_data = []
|
||||
for relationship_data in custom_kg.get("relationships", []):
|
||||
src_id = f'"{relationship_data["src_id"].upper()}"'
|
||||
tgt_id = f'"{relationship_data["tgt_id"].upper()}"'
|
||||
description = relationship_data["description"]
|
||||
keywords = relationship_data["keywords"]
|
||||
weight = relationship_data.get("weight", 1.0)
|
||||
source_id = relationship_data["source_id"]
|
||||
|
||||
# Check if nodes exist in the knowledge graph
|
||||
for need_insert_id in [src_id, tgt_id]:
|
||||
if not (
|
||||
await self.chunk_entity_relation_graph.has_node(need_insert_id)
|
||||
):
|
||||
await self.chunk_entity_relation_graph.upsert_node(
|
||||
need_insert_id,
|
||||
node_data={
|
||||
"source_id": source_id,
|
||||
"description": "UNKNOWN",
|
||||
"entity_type": "UNKNOWN",
|
||||
},
|
||||
)
|
||||
|
||||
# Insert edge into the knowledge graph
|
||||
await self.chunk_entity_relation_graph.upsert_edge(
|
||||
src_id,
|
||||
tgt_id,
|
||||
edge_data={
|
||||
"weight": weight,
|
||||
"description": description,
|
||||
"keywords": keywords,
|
||||
"source_id": source_id,
|
||||
},
|
||||
)
|
||||
edge_data = {
|
||||
"src_id": src_id,
|
||||
"tgt_id": tgt_id,
|
||||
"description": description,
|
||||
"keywords": keywords,
|
||||
}
|
||||
all_relationships_data.append(edge_data)
|
||||
update_storage = True
|
||||
|
||||
# Insert entities into vector storage if needed
|
||||
if self.entities_vdb is not None:
|
||||
data_for_vdb = {
|
||||
compute_mdhash_id(dp["entity_name"], prefix="ent-"): {
|
||||
"content": dp["entity_name"] + dp["description"],
|
||||
"entity_name": dp["entity_name"],
|
||||
}
|
||||
for dp in all_entities_data
|
||||
}
|
||||
await self.entities_vdb.upsert(data_for_vdb)
|
||||
|
||||
# Insert relationships into vector storage if needed
|
||||
if self.relationships_vdb is not None:
|
||||
data_for_vdb = {
|
||||
compute_mdhash_id(dp["src_id"] + dp["tgt_id"], prefix="rel-"): {
|
||||
"src_id": dp["src_id"],
|
||||
"tgt_id": dp["tgt_id"],
|
||||
"content": dp["keywords"]
|
||||
+ dp["src_id"]
|
||||
+ dp["tgt_id"]
|
||||
+ dp["description"],
|
||||
}
|
||||
for dp in all_relationships_data
|
||||
}
|
||||
await self.relationships_vdb.upsert(data_for_vdb)
|
||||
finally:
|
||||
if update_storage:
|
||||
await self._insert_done()
|
||||
|
||||
def query(self, query: str, param: QueryParam = QueryParam()):
|
||||
loop = always_get_an_event_loop()
|
||||
return loop.run_until_complete(self.aquery(query, param))
|
||||
|
Reference in New Issue
Block a user