diff --git a/README.md b/README.md index 2b0d9f4b..a3f2152f 100644 --- a/README.md +++ b/README.md @@ -750,6 +750,82 @@ rag.delete_by_entity("Project Gutenberg") rag.delete_by_doc_id("doc_id") ``` +## Edit Entities and Relations + +LightRAG now supports comprehensive knowledge graph management capabilities, allowing you to create, edit, and delete entities and relationships within your knowledge graph. + +### Create Entities and Relations + +```python +import asyncio +from lightrag import LightRAG + +# Initialize LightRAG +rag = LightRAG( + working_dir="your_working_dir", + embedding_func=your_embedding_function, + llm_model_func=your_llm_function +) + +# Create new entity +entity = rag.create_entity("Google", { + "description": "Google is a multinational technology company specializing in internet-related services and products.", + "entity_type": "company" +}) + +# Create another entity +product = rag.create_entity("Gmail", { + "description": "Gmail is an email service developed by Google.", + "entity_type": "product" +}) + +# Create relation between entities +relation = rag.create_relation("Google", "Gmail", { + "description": "Google develops and operates Gmail.", + "keywords": "develops operates service", + "weight": 2.0 +}) +``` + +### Edit Entities and Relations + +```python +# Edit an existing entity +updated_entity = rag.edit_entity("Google", { + "description": "Google is a subsidiary of Alphabet Inc., founded in 1998.", + "entity_type": "tech_company" +}) + +# Rename an entity (with all its relationships properly migrated) +renamed_entity = rag.edit_entity("Gmail", { + "entity_name": "Google Mail", + "description": "Google Mail (formerly Gmail) is an email service." +}) + +# Edit a relation between entities +updated_relation = rag.edit_relation("Google", "Google Mail", { + "description": "Google created and maintains Google Mail service.", + "keywords": "creates maintains email service", + "weight": 3.0 +}) +``` + +All operations are available in both synchronous and asynchronous versions. The asynchronous versions have the prefix "a" (e.g., `acreate_entity`, `aedit_relation`). + +#### Entity Operations + +- **create_entity**: Creates a new entity with specified attributes +- **edit_entity**: Updates an existing entity's attributes or renames it +- **delete_entity**: Removes an entity and all its relationships + +#### Relation Operations + +- **create_relation**: Creates a new relation between existing entities +- **edit_relation**: Updates an existing relation's attributes +- **delete_relation**: Removes a relation between entities + +These operations maintain data consistency across both the graph database and vector database components, ensuring your knowledge graph remains coherent. + ## Cache
diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 5470d2e2..8bcb6b51 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -967,7 +967,7 @@ class LightRAG: # Insert chunks into vector storage all_chunks_data: dict[str, dict[str, str]] = {} chunk_to_source_map: dict[str, str] = {} - for chunk_data in custom_kg.get("chunks", {}): + for chunk_data in custom_kg.get("chunks", []): chunk_content = self.clean_text(chunk_data["content"]) source_id = chunk_data["source_id"] tokens = len( @@ -997,9 +997,10 @@ class LightRAG: update_storage = True if all_chunks_data: - await self.chunks_vdb.upsert(all_chunks_data) - if all_chunks_data: - await self.text_chunks.upsert(all_chunks_data) + await asyncio.gather( + self.chunks_vdb.upsert(all_chunks_data), + self.text_chunks.upsert(all_chunks_data), + ) # Insert entities into knowledge graph all_entities_data: list[dict[str, str]] = [] @@ -1007,7 +1008,6 @@ class LightRAG: entity_name = entity_data["entity_name"] entity_type = entity_data.get("entity_type", "UNKNOWN") description = entity_data.get("description", "No description provided") - # source_id = entity_data["source_id"] source_chunk_id = entity_data.get("source_id", "UNKNOWN") source_id = chunk_to_source_map.get(source_chunk_id, "UNKNOWN") @@ -1039,7 +1039,6 @@ class LightRAG: description = relationship_data["description"] keywords = relationship_data["keywords"] weight = relationship_data.get("weight", 1.0) - # source_id = relationship_data["source_id"] source_chunk_id = relationship_data.get("source_id", "UNKNOWN") source_id = chunk_to_source_map.get(source_chunk_id, "UNKNOWN") @@ -1079,34 +1078,43 @@ class LightRAG: "tgt_id": tgt_id, "description": description, "keywords": keywords, + "source_id": source_id, + "weight": weight, } all_relationships_data.append(edge_data) update_storage = True - # Insert entities into vector storage if needed + # Insert entities into vector storage with consistent format data_for_vdb = { compute_mdhash_id(dp["entity_name"], prefix="ent-"): { - "content": dp["entity_name"] + dp["description"], + "content": dp["entity_name"] + "\n" + dp["description"], "entity_name": dp["entity_name"], + "source_id": dp["source_id"], + "description": dp["description"], + "entity_type": dp["entity_type"], } for dp in all_entities_data } await self.entities_vdb.upsert(data_for_vdb) - # Insert relationships into vector storage if needed + # Insert relationships into vector storage with consistent format data_for_vdb = { compute_mdhash_id(dp["src_id"] + dp["tgt_id"], prefix="rel-"): { "src_id": dp["src_id"], "tgt_id": dp["tgt_id"], - "content": dp["keywords"] - + dp["src_id"] - + dp["tgt_id"] - + dp["description"], + "source_id": dp["source_id"], + "content": f"{dp['keywords']}\t{dp['src_id']}\n{dp['tgt_id']}\n{dp['description']}", + "keywords": dp["keywords"], + "description": dp["description"], + "weight": dp["weight"], } for dp in all_relationships_data } await self.relationships_vdb.upsert(data_for_vdb) + except Exception as e: + logger.error(f"Error in ainsert_custom_kg: {e}") + raise finally: if update_storage: await self._insert_done() @@ -1759,3 +1767,461 @@ class LightRAG: def clear_cache(self, modes: list[str] | None = None) -> None: """Synchronous version of aclear_cache.""" return always_get_an_event_loop().run_until_complete(self.aclear_cache(modes)) + + async def aedit_entity( + self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True + ) -> dict[str, Any]: + """Asynchronously edit entity information. + + Updates entity information in the knowledge graph and re-embeds the entity in the vector database. + + Args: + entity_name: Name of the entity to edit + updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"} + allow_rename: Whether to allow entity renaming, defaults to True + + Returns: + Dictionary containing updated entity information + """ + try: + # 1. Get current entity information + node_data = await self.chunk_entity_relation_graph.get_node(entity_name) + if not node_data: + raise ValueError(f"Entity '{entity_name}' does not exist") + + # Check if entity is being renamed + new_entity_name = updated_data.get("entity_name", entity_name) + is_renaming = new_entity_name != entity_name + + # If renaming, check if new name already exists + if is_renaming: + if not allow_rename: + raise ValueError( + "Entity renaming is not allowed. Set allow_rename=True to enable this feature" + ) + + existing_node = await self.chunk_entity_relation_graph.get_node( + new_entity_name + ) + if existing_node: + raise ValueError( + f"Entity name '{new_entity_name}' already exists, cannot rename" + ) + + # 2. Update entity information in the graph + new_node_data = {**node_data, **updated_data} + if "entity_name" in new_node_data: + del new_node_data[ + "entity_name" + ] # Node data should not contain entity_name field + + # If renaming entity + if is_renaming: + logger.info(f"Renaming entity '{entity_name}' to '{new_entity_name}'") + + # Create new entity + await self.chunk_entity_relation_graph.upsert_node( + new_entity_name, new_node_data + ) + + # Get all edges related to the original entity + edges = await self.chunk_entity_relation_graph.get_node_edges( + entity_name + ) + if edges: + # Recreate edges for the new entity + for source, target in edges: + edge_data = await self.chunk_entity_relation_graph.get_edge( + source, target + ) + if edge_data: + if source == entity_name: + await self.chunk_entity_relation_graph.upsert_edge( + new_entity_name, target, edge_data + ) + else: # target == entity_name + await self.chunk_entity_relation_graph.upsert_edge( + source, new_entity_name, edge_data + ) + + # Delete old entity + await self.chunk_entity_relation_graph.delete_node(entity_name) + + # Delete old entity record from vector database + old_entity_id = compute_mdhash_id(entity_name, prefix="ent-") + await self.entities_vdb.delete([old_entity_id]) + + # Update working entity name to new name + entity_name = new_entity_name + else: + # If not renaming, directly update node data + await self.chunk_entity_relation_graph.upsert_node( + entity_name, new_node_data + ) + + # 3. Recalculate entity's vector representation and update vector database + description = new_node_data.get("description", "") + source_id = new_node_data.get("source_id", "") + entity_type = new_node_data.get("entity_type", "") + content = entity_name + "\n" + description + + # Calculate entity ID + entity_id = compute_mdhash_id(entity_name, prefix="ent-") + + # Prepare data for vector database update + entity_data = { + entity_id: { + "content": content, + "entity_name": entity_name, + "source_id": source_id, + "description": description, + "entity_type": entity_type, + } + } + + # Update vector database + await self.entities_vdb.upsert(entity_data) + + # 4. Save changes + await self._edit_entity_done() + + logger.info(f"Entity '{entity_name}' successfully updated") + return await self.get_entity_info(entity_name, include_vector_data=True) + except Exception as e: + logger.error(f"Error while editing entity '{entity_name}': {e}") + raise + + def edit_entity( + self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True + ) -> dict[str, Any]: + """Synchronously edit entity information. + + Updates entity information in the knowledge graph and re-embeds the entity in the vector database. + + Args: + entity_name: Name of the entity to edit + updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"} + allow_rename: Whether to allow entity renaming, defaults to True + + Returns: + Dictionary containing updated entity information + """ + loop = always_get_an_event_loop() + return loop.run_until_complete( + self.aedit_entity(entity_name, updated_data, allow_rename) + ) + + async def _edit_entity_done(self) -> None: + """Callback after entity editing is complete, ensures updates are persisted""" + await asyncio.gather( + *[ + cast(StorageNameSpace, storage_inst).index_done_callback() + for storage_inst in [ # type: ignore + self.entities_vdb, + self.chunk_entity_relation_graph, + ] + ] + ) + + async def aedit_relation( + self, source_entity: str, target_entity: str, updated_data: dict[str, Any] + ) -> dict[str, Any]: + """Asynchronously edit relation information. + + Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database. + + Args: + source_entity: Name of the source entity + target_entity: Name of the target entity + updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "keywords": "new keywords"} + + Returns: + Dictionary containing updated relation information + """ + try: + # 1. Get current relation information + edge_data = await self.chunk_entity_relation_graph.get_edge( + source_entity, target_entity + ) + if not edge_data: + raise ValueError( + f"Relation from '{source_entity}' to '{target_entity}' does not exist" + ) + + # 2. Update relation information in the graph + new_edge_data = {**edge_data, **updated_data} + await self.chunk_entity_relation_graph.upsert_edge( + source_entity, target_entity, new_edge_data + ) + + # 3. Recalculate relation's vector representation and update vector database + description = new_edge_data.get("description", "") + keywords = new_edge_data.get("keywords", "") + source_id = new_edge_data.get("source_id", "") + weight = float(new_edge_data.get("weight", 1.0)) + + # Create content for embedding + content = f"{keywords}\t{source_entity}\n{target_entity}\n{description}" + + # Calculate relation ID + relation_id = compute_mdhash_id( + source_entity + target_entity, prefix="rel-" + ) + + # Prepare data for vector database update + relation_data = { + relation_id: { + "content": content, + "src_id": source_entity, + "tgt_id": target_entity, + "source_id": source_id, + "description": description, + "keywords": keywords, + "weight": weight, + } + } + + # Update vector database + await self.relationships_vdb.upsert(relation_data) + + # 4. Save changes + await self._edit_relation_done() + + logger.info( + f"Relation from '{source_entity}' to '{target_entity}' successfully updated" + ) + return await self.get_relation_info( + source_entity, target_entity, include_vector_data=True + ) + except Exception as e: + logger.error( + f"Error while editing relation from '{source_entity}' to '{target_entity}': {e}" + ) + raise + + def edit_relation( + self, source_entity: str, target_entity: str, updated_data: dict[str, Any] + ) -> dict[str, Any]: + """Synchronously edit relation information. + + Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database. + + Args: + source_entity: Name of the source entity + target_entity: Name of the target entity + updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "keywords": "keywords"} + + Returns: + Dictionary containing updated relation information + """ + loop = always_get_an_event_loop() + return loop.run_until_complete( + self.aedit_relation(source_entity, target_entity, updated_data) + ) + + async def _edit_relation_done(self) -> None: + """Callback after relation editing is complete, ensures updates are persisted""" + await asyncio.gather( + *[ + cast(StorageNameSpace, storage_inst).index_done_callback() + for storage_inst in [ # type: ignore + self.relationships_vdb, + self.chunk_entity_relation_graph, + ] + ] + ) + + async def acreate_entity( + self, entity_name: str, entity_data: dict[str, Any] + ) -> dict[str, Any]: + """Asynchronously create a new entity. + + Creates a new entity in the knowledge graph and adds it to the vector database. + + Args: + entity_name: Name of the new entity + entity_data: Dictionary containing entity attributes, e.g. {"description": "description", "entity_type": "type"} + + Returns: + Dictionary containing created entity information + """ + try: + # Check if entity already exists + existing_node = await self.chunk_entity_relation_graph.get_node(entity_name) + if existing_node: + raise ValueError(f"Entity '{entity_name}' already exists") + + # Prepare node data with defaults if missing + node_data = { + "entity_type": entity_data.get("entity_type", "UNKNOWN"), + "description": entity_data.get("description", ""), + "source_id": entity_data.get("source_id", "manual"), + } + + # Add entity to knowledge graph + await self.chunk_entity_relation_graph.upsert_node(entity_name, node_data) + + # Prepare content for entity + description = node_data.get("description", "") + source_id = node_data.get("source_id", "") + entity_type = node_data.get("entity_type", "") + content = entity_name + "\n" + description + + # Calculate entity ID + entity_id = compute_mdhash_id(entity_name, prefix="ent-") + + # Prepare data for vector database update + entity_data_for_vdb = { + entity_id: { + "content": content, + "entity_name": entity_name, + "source_id": source_id, + "description": description, + "entity_type": entity_type, + } + } + + # Update vector database + await self.entities_vdb.upsert(entity_data_for_vdb) + + # Save changes + await self._edit_entity_done() + + logger.info(f"Entity '{entity_name}' successfully created") + return await self.get_entity_info(entity_name, include_vector_data=True) + except Exception as e: + logger.error(f"Error while creating entity '{entity_name}': {e}") + raise + + def create_entity( + self, entity_name: str, entity_data: dict[str, Any] + ) -> dict[str, Any]: + """Synchronously create a new entity. + + Creates a new entity in the knowledge graph and adds it to the vector database. + + Args: + entity_name: Name of the new entity + entity_data: Dictionary containing entity attributes, e.g. {"description": "description", "entity_type": "type"} + + Returns: + Dictionary containing created entity information + """ + loop = always_get_an_event_loop() + return loop.run_until_complete(self.acreate_entity(entity_name, entity_data)) + + async def acreate_relation( + self, source_entity: str, target_entity: str, relation_data: dict[str, Any] + ) -> dict[str, Any]: + """Asynchronously create a new relation between entities. + + Creates a new relation (edge) in the knowledge graph and adds it to the vector database. + + Args: + source_entity: Name of the source entity + target_entity: Name of the target entity + relation_data: Dictionary containing relation attributes, e.g. {"description": "description", "keywords": "keywords"} + + Returns: + Dictionary containing created relation information + """ + try: + # Check if both entities exist + source_exists = await self.chunk_entity_relation_graph.has_node( + source_entity + ) + target_exists = await self.chunk_entity_relation_graph.has_node( + target_entity + ) + + if not source_exists: + raise ValueError(f"Source entity '{source_entity}' does not exist") + if not target_exists: + raise ValueError(f"Target entity '{target_entity}' does not exist") + + # Check if relation already exists + existing_edge = await self.chunk_entity_relation_graph.get_edge( + source_entity, target_entity + ) + if existing_edge: + raise ValueError( + f"Relation from '{source_entity}' to '{target_entity}' already exists" + ) + + # Prepare edge data with defaults if missing + edge_data = { + "description": relation_data.get("description", ""), + "keywords": relation_data.get("keywords", ""), + "source_id": relation_data.get("source_id", "manual"), + "weight": float(relation_data.get("weight", 1.0)), + } + + # Add relation to knowledge graph + await self.chunk_entity_relation_graph.upsert_edge( + source_entity, target_entity, edge_data + ) + + # Prepare content for embedding + description = edge_data.get("description", "") + keywords = edge_data.get("keywords", "") + source_id = edge_data.get("source_id", "") + weight = edge_data.get("weight", 1.0) + + # Create content for embedding + content = f"{keywords}\t{source_entity}\n{target_entity}\n{description}" + + # Calculate relation ID + relation_id = compute_mdhash_id( + source_entity + target_entity, prefix="rel-" + ) + + # Prepare data for vector database update + relation_data_for_vdb = { + relation_id: { + "content": content, + "src_id": source_entity, + "tgt_id": target_entity, + "source_id": source_id, + "description": description, + "keywords": keywords, + "weight": weight, + } + } + + # Update vector database + await self.relationships_vdb.upsert(relation_data_for_vdb) + + # Save changes + await self._edit_relation_done() + + logger.info( + f"Relation from '{source_entity}' to '{target_entity}' successfully created" + ) + return await self.get_relation_info( + source_entity, target_entity, include_vector_data=True + ) + except Exception as e: + logger.error( + f"Error while creating relation from '{source_entity}' to '{target_entity}': {e}" + ) + raise + + def create_relation( + self, source_entity: str, target_entity: str, relation_data: dict[str, Any] + ) -> dict[str, Any]: + """Synchronously create a new relation between entities. + + Creates a new relation (edge) in the knowledge graph and adds it to the vector database. + + Args: + source_entity: Name of the source entity + target_entity: Name of the target entity + relation_data: Dictionary containing relation attributes, e.g. {"description": "description", "keywords": "keywords"} + + Returns: + Dictionary containing created relation information + """ + loop = always_get_an_event_loop() + return loop.run_until_complete( + self.acreate_relation(source_entity, target_entity, relation_data) + )