diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 54a41843..6a10a295 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -11,12 +11,18 @@ from functools import partial from typing import Any, AsyncIterator, Callable, Iterator, cast, final, Literal import pandas as pd +from .kg.shared_storage import get_graph_db_lock from lightrag.kg import ( STORAGES, verify_storage_implementation, ) +from lightrag.kg.shared_storage import ( + get_namespace_data, + get_pipeline_status_lock, +) + from .base import ( BaseGraphStorage, BaseKVStorage, @@ -779,10 +785,6 @@ class LightRAG: 3. Process each chunk for entity and relation extraction 4. Update the document status """ - from lightrag.kg.shared_storage import ( - get_namespace_data, - get_pipeline_status_lock, - ) # Get pipeline status shared data and lock pipeline_status = await get_namespace_data("pipeline_status") @@ -1431,19 +1433,21 @@ class LightRAG: loop = always_get_an_event_loop() return loop.run_until_complete(self.adelete_by_entity(entity_name)) - # TODO: Lock all KG relative DB to esure consistency across multiple processes async def adelete_by_entity(self, entity_name: str) -> None: - try: - await self.entities_vdb.delete_entity(entity_name) - await self.relationships_vdb.delete_entity_relation(entity_name) - await self.chunk_entity_relation_graph.delete_node(entity_name) + graph_db_lock = get_graph_db_lock(enable_logging=False) + # Use graph database lock to ensure atomic graph and vector db operations + async with graph_db_lock: + try: + await self.entities_vdb.delete_entity(entity_name) + await self.relationships_vdb.delete_entity_relation(entity_name) + await self.chunk_entity_relation_graph.delete_node(entity_name) - logger.info( - f"Entity '{entity_name}' and its relationships have been deleted." - ) - await self._delete_by_entity_done() - except Exception as e: - logger.error(f"Error while deleting entity '{entity_name}': {e}") + logger.info( + f"Entity '{entity_name}' and its relationships have been deleted." + ) + await self._delete_by_entity_done() + except Exception as e: + logger.error(f"Error while deleting entity '{entity_name}': {e}") async def _delete_by_entity_done(self) -> None: await asyncio.gather( @@ -1469,7 +1473,6 @@ class LightRAG: self.adelete_by_relation(source_entity, target_entity) ) - # TODO: Lock all KG relative DB to esure consistency across multiple processes async def adelete_by_relation(self, source_entity: str, target_entity: str) -> None: """Asynchronously delete a relation between two entities. @@ -1477,37 +1480,40 @@ class LightRAG: source_entity: Name of the source entity target_entity: Name of the target entity """ - try: - # TODO: check if has_edge function works on reverse relation - # Check if the relation exists - edge_exists = await self.chunk_entity_relation_graph.has_edge( - source_entity, target_entity - ) - if not edge_exists: - logger.warning( - f"Relation from '{source_entity}' to '{target_entity}' does not exist" + graph_db_lock = get_graph_db_lock(enable_logging=False) + # Use graph database lock to ensure atomic graph and vector db operations + async with graph_db_lock: + try: + # TODO: check if has_edge function works on reverse relation + # Check if the relation exists + edge_exists = await self.chunk_entity_relation_graph.has_edge( + source_entity, target_entity ) - return + if not edge_exists: + logger.warning( + f"Relation from '{source_entity}' to '{target_entity}' does not exist" + ) + return - # Delete relation from vector database - relation_id = compute_mdhash_id( - source_entity + target_entity, prefix="rel-" - ) - await self.relationships_vdb.delete([relation_id]) + # Delete relation from vector database + relation_id = compute_mdhash_id( + source_entity + target_entity, prefix="rel-" + ) + await self.relationships_vdb.delete([relation_id]) - # Delete relation from knowledge graph - await self.chunk_entity_relation_graph.remove_edges( - [(source_entity, target_entity)] - ) + # Delete relation from knowledge graph + await self.chunk_entity_relation_graph.remove_edges( + [(source_entity, target_entity)] + ) - logger.info( - f"Successfully deleted relation from '{source_entity}' to '{target_entity}'" - ) - await self._delete_relation_done() - except Exception as e: - logger.error( - f"Error while deleting relation from '{source_entity}' to '{target_entity}': {e}" - ) + logger.info( + f"Successfully deleted relation from '{source_entity}' to '{target_entity}'" + ) + await self._delete_relation_done() + except Exception as e: + logger.error( + f"Error while deleting relation from '{source_entity}' to '{target_entity}': {e}" + ) async def _delete_relation_done(self) -> None: """Callback after relation deletion is complete""" @@ -1539,7 +1545,8 @@ class LightRAG: """ return await self.doc_status.get_docs_by_status(status) - # TODO: Lock all KG relative DB to esure consistency across multiple processes + # TODO: Deprecated (Deleting documents can cause hallucinations in RAG.) + # Document delete is not working properly for most of the storage implementations. async def adelete_by_doc_id(self, doc_id: str) -> None: """Delete a document and all its related data @@ -1898,7 +1905,6 @@ class LightRAG: """Synchronous version of aclear_cache.""" return always_get_an_event_loop().run_until_complete(self.aclear_cache(modes)) - # TODO: Lock all KG relative DB to esure consistency across multiple processes async def aedit_entity( self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True ) -> dict[str, Any]: @@ -1914,169 +1920,172 @@ class LightRAG: Returns: Dictionary containing updated entity information """ - try: - # 1. Get current entity information - node_exists = await self.chunk_entity_relation_graph.has_node(entity_name) - if not node_exists: - raise ValueError(f"Entity '{entity_name}' does not exist") - node_data = await self.chunk_entity_relation_graph.get_node(entity_name) + graph_db_lock = get_graph_db_lock(enable_logging=False) + # Use graph database lock to ensure atomic graph and vector db operations + async with graph_db_lock: + try: + # 1. Get current entity information + node_exists = await self.chunk_entity_relation_graph.has_node(entity_name) + if not node_exists: + raise ValueError(f"Entity '{entity_name}' does not exist") + node_data = await self.chunk_entity_relation_graph.get_node(entity_name) - # Check if entity is being renamed - new_entity_name = updated_data.get("entity_name", entity_name) - is_renaming = new_entity_name != entity_name + # Check if entity is being renamed + new_entity_name = updated_data.get("entity_name", entity_name) + is_renaming = new_entity_name != entity_name - # If renaming, check if new name already exists - if is_renaming: - if not allow_rename: - raise ValueError( - "Entity renaming is not allowed. Set allow_rename=True to enable this feature" - ) - - existing_node = await self.chunk_entity_relation_graph.has_node( - new_entity_name - ) - if existing_node: - raise ValueError( - f"Entity name '{new_entity_name}' already exists, cannot rename" - ) - - # 2. Update entity information in the graph - new_node_data = {**node_data, **updated_data} - new_node_data["entity_id"] = new_entity_name - - if "entity_name" in new_node_data: - del new_node_data[ - "entity_name" - ] # Node data should not contain entity_name field - - # If renaming entity - if is_renaming: - logger.info(f"Renaming entity '{entity_name}' to '{new_entity_name}'") - - # Create new entity - await self.chunk_entity_relation_graph.upsert_node( - new_entity_name, new_node_data - ) - - # Store relationships that need to be updated - relations_to_update = [] - relations_to_delete = [] - # Get all edges related to the original entity - edges = await self.chunk_entity_relation_graph.get_node_edges( - entity_name - ) - if edges: - # Recreate edges for the new entity - for source, target in edges: - edge_data = await self.chunk_entity_relation_graph.get_edge( - source, target + # If renaming, check if new name already exists + if is_renaming: + if not allow_rename: + raise ValueError( + "Entity renaming is not allowed. Set allow_rename=True to enable this feature" ) - if edge_data: - relations_to_delete.append( - compute_mdhash_id(source + target, prefix="rel-") + + existing_node = await self.chunk_entity_relation_graph.has_node( + new_entity_name + ) + if existing_node: + raise ValueError( + f"Entity name '{new_entity_name}' already exists, cannot rename" + ) + + # 2. Update entity information in the graph + new_node_data = {**node_data, **updated_data} + new_node_data["entity_id"] = new_entity_name + + if "entity_name" in new_node_data: + del new_node_data[ + "entity_name" + ] # Node data should not contain entity_name field + + # If renaming entity + if is_renaming: + logger.info(f"Renaming entity '{entity_name}' to '{new_entity_name}'") + + # Create new entity + await self.chunk_entity_relation_graph.upsert_node( + new_entity_name, new_node_data + ) + + # Store relationships that need to be updated + relations_to_update = [] + relations_to_delete = [] + # Get all edges related to the original entity + edges = await self.chunk_entity_relation_graph.get_node_edges( + entity_name + ) + if edges: + # Recreate edges for the new entity + for source, target in edges: + edge_data = await self.chunk_entity_relation_graph.get_edge( + source, target ) - relations_to_delete.append( - compute_mdhash_id(target + source, prefix="rel-") - ) - if source == entity_name: - await self.chunk_entity_relation_graph.upsert_edge( - new_entity_name, target, edge_data + if edge_data: + relations_to_delete.append( + compute_mdhash_id(source + target, prefix="rel-") ) - relations_to_update.append( - (new_entity_name, target, edge_data) - ) - else: # target == entity_name - await self.chunk_entity_relation_graph.upsert_edge( - source, new_entity_name, edge_data - ) - relations_to_update.append( - (source, new_entity_name, edge_data) + relations_to_delete.append( + compute_mdhash_id(target + source, prefix="rel-") ) + if source == entity_name: + await self.chunk_entity_relation_graph.upsert_edge( + new_entity_name, target, edge_data + ) + relations_to_update.append( + (new_entity_name, target, edge_data) + ) + else: # target == entity_name + await self.chunk_entity_relation_graph.upsert_edge( + source, new_entity_name, edge_data + ) + relations_to_update.append( + (source, new_entity_name, edge_data) + ) - # Delete old entity - await self.chunk_entity_relation_graph.delete_node(entity_name) + # Delete old entity + await self.chunk_entity_relation_graph.delete_node(entity_name) - # Delete old entity record from vector database - old_entity_id = compute_mdhash_id(entity_name, prefix="ent-") - await self.entities_vdb.delete([old_entity_id]) - logger.info( - f"Deleted old entity '{entity_name}' and its vector embedding from database" - ) + # Delete old entity record from vector database + old_entity_id = compute_mdhash_id(entity_name, prefix="ent-") + await self.entities_vdb.delete([old_entity_id]) + logger.info( + f"Deleted old entity '{entity_name}' and its vector embedding from database" + ) - # Delete old relation records from vector database - await self.relationships_vdb.delete(relations_to_delete) - logger.info( - f"Deleted {len(relations_to_delete)} relation records for entity '{entity_name}' from vector database" - ) + # Delete old relation records from vector database + await self.relationships_vdb.delete(relations_to_delete) + logger.info( + f"Deleted {len(relations_to_delete)} relation records for entity '{entity_name}' from vector database" + ) - # Update relationship vector representations - for src, tgt, edge_data in relations_to_update: - description = edge_data.get("description", "") - keywords = edge_data.get("keywords", "") - source_id = edge_data.get("source_id", "") - weight = float(edge_data.get("weight", 1.0)) + # Update relationship vector representations + for src, tgt, edge_data in relations_to_update: + description = edge_data.get("description", "") + keywords = edge_data.get("keywords", "") + source_id = edge_data.get("source_id", "") + weight = float(edge_data.get("weight", 1.0)) - # Create new content for embedding - content = f"{src}\t{tgt}\n{keywords}\n{description}" + # Create new content for embedding + content = f"{src}\t{tgt}\n{keywords}\n{description}" - # Calculate relationship ID - relation_id = compute_mdhash_id(src + tgt, prefix="rel-") + # Calculate relationship ID + relation_id = compute_mdhash_id(src + tgt, prefix="rel-") - # Prepare data for vector database update - relation_data = { - relation_id: { - "content": content, - "src_id": src, - "tgt_id": tgt, - "source_id": source_id, - "description": description, - "keywords": keywords, - "weight": weight, + # Prepare data for vector database update + relation_data = { + relation_id: { + "content": content, + "src_id": src, + "tgt_id": tgt, + "source_id": source_id, + "description": description, + "keywords": keywords, + "weight": weight, + } } + + # Update vector database + await self.relationships_vdb.upsert(relation_data) + + # Update working entity name to new name + entity_name = new_entity_name + else: + # If not renaming, directly update node data + await self.chunk_entity_relation_graph.upsert_node( + entity_name, new_node_data + ) + + # 3. Recalculate entity's vector representation and update vector database + description = new_node_data.get("description", "") + source_id = new_node_data.get("source_id", "") + entity_type = new_node_data.get("entity_type", "") + content = entity_name + "\n" + description + + # Calculate entity ID + entity_id = compute_mdhash_id(entity_name, prefix="ent-") + + # Prepare data for vector database update + entity_data = { + entity_id: { + "content": content, + "entity_name": entity_name, + "source_id": source_id, + "description": description, + "entity_type": entity_type, } - - # Update vector database - await self.relationships_vdb.upsert(relation_data) - - # Update working entity name to new name - entity_name = new_entity_name - else: - # If not renaming, directly update node data - await self.chunk_entity_relation_graph.upsert_node( - entity_name, new_node_data - ) - - # 3. Recalculate entity's vector representation and update vector database - description = new_node_data.get("description", "") - source_id = new_node_data.get("source_id", "") - entity_type = new_node_data.get("entity_type", "") - content = entity_name + "\n" + description - - # Calculate entity ID - entity_id = compute_mdhash_id(entity_name, prefix="ent-") - - # Prepare data for vector database update - entity_data = { - entity_id: { - "content": content, - "entity_name": entity_name, - "source_id": source_id, - "description": description, - "entity_type": entity_type, } - } - # Update vector database - await self.entities_vdb.upsert(entity_data) + # Update vector database + await self.entities_vdb.upsert(entity_data) - # 4. Save changes - await self._edit_entity_done() + # 4. Save changes + await self._edit_entity_done() - logger.info(f"Entity '{entity_name}' successfully updated") - return await self.get_entity_info(entity_name, include_vector_data=True) - except Exception as e: - logger.error(f"Error while editing entity '{entity_name}': {e}") - raise + logger.info(f"Entity '{entity_name}' successfully updated") + return await self.get_entity_info(entity_name, include_vector_data=True) + except Exception as e: + logger.error(f"Error while editing entity '{entity_name}': {e}") + raise def edit_entity( self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True @@ -2111,7 +2120,6 @@ class LightRAG: ] ) - # TODO: Lock all KG relative DB to esure consistency across multiple processes async def aedit_relation( self, source_entity: str, target_entity: str, updated_data: dict[str, Any] ) -> dict[str, Any]: @@ -2127,77 +2135,80 @@ class LightRAG: Returns: Dictionary containing updated relation information """ - try: - # 1. Get current relation information - edge_exists = await self.chunk_entity_relation_graph.has_edge( - source_entity, target_entity - ) - if not edge_exists: - raise ValueError( - f"Relation from '{source_entity}' to '{target_entity}' does not exist" + graph_db_lock = get_graph_db_lock(enable_logging=False) + # Use graph database lock to ensure atomic graph and vector db operations + async with graph_db_lock: + try: + # 1. Get current relation information + edge_exists = await self.chunk_entity_relation_graph.has_edge( + source_entity, target_entity + ) + if not edge_exists: + raise ValueError( + f"Relation from '{source_entity}' to '{target_entity}' does not exist" + ) + edge_data = await self.chunk_entity_relation_graph.get_edge( + source_entity, target_entity + ) + # Important: First delete the old relation record from the vector database + old_relation_id = compute_mdhash_id( + source_entity + target_entity, prefix="rel-" + ) + await self.relationships_vdb.delete([old_relation_id]) + logger.info( + f"Deleted old relation record from vector database for relation {source_entity} -> {target_entity}" ) - edge_data = await self.chunk_entity_relation_graph.get_edge( - source_entity, target_entity - ) - # Important: First delete the old relation record from the vector database - old_relation_id = compute_mdhash_id( - source_entity + target_entity, prefix="rel-" - ) - await self.relationships_vdb.delete([old_relation_id]) - logger.info( - f"Deleted old relation record from vector database for relation {source_entity} -> {target_entity}" - ) - # 2. Update relation information in the graph - new_edge_data = {**edge_data, **updated_data} - await self.chunk_entity_relation_graph.upsert_edge( - source_entity, target_entity, new_edge_data - ) + # 2. Update relation information in the graph + new_edge_data = {**edge_data, **updated_data} + await self.chunk_entity_relation_graph.upsert_edge( + source_entity, target_entity, new_edge_data + ) - # 3. Recalculate relation's vector representation and update vector database - description = new_edge_data.get("description", "") - keywords = new_edge_data.get("keywords", "") - source_id = new_edge_data.get("source_id", "") - weight = float(new_edge_data.get("weight", 1.0)) + # 3. Recalculate relation's vector representation and update vector database + description = new_edge_data.get("description", "") + keywords = new_edge_data.get("keywords", "") + source_id = new_edge_data.get("source_id", "") + weight = float(new_edge_data.get("weight", 1.0)) - # Create content for embedding - content = f"{source_entity}\t{target_entity}\n{keywords}\n{description}" + # Create content for embedding + content = f"{source_entity}\t{target_entity}\n{keywords}\n{description}" - # Calculate relation ID - relation_id = compute_mdhash_id( - source_entity + target_entity, prefix="rel-" - ) + # Calculate relation ID + relation_id = compute_mdhash_id( + source_entity + target_entity, prefix="rel-" + ) - # Prepare data for vector database update - relation_data = { - relation_id: { - "content": content, - "src_id": source_entity, - "tgt_id": target_entity, - "source_id": source_id, - "description": description, - "keywords": keywords, - "weight": weight, + # Prepare data for vector database update + relation_data = { + relation_id: { + "content": content, + "src_id": source_entity, + "tgt_id": target_entity, + "source_id": source_id, + "description": description, + "keywords": keywords, + "weight": weight, + } } - } - # Update vector database - await self.relationships_vdb.upsert(relation_data) + # Update vector database + await self.relationships_vdb.upsert(relation_data) - # 4. Save changes - await self._edit_relation_done() + # 4. Save changes + await self._edit_relation_done() - logger.info( - f"Relation from '{source_entity}' to '{target_entity}' successfully updated" - ) - return await self.get_relation_info( - source_entity, target_entity, include_vector_data=True - ) - except Exception as e: - logger.error( - f"Error while editing relation from '{source_entity}' to '{target_entity}': {e}" - ) - raise + logger.info( + f"Relation from '{source_entity}' to '{target_entity}' successfully updated" + ) + return await self.get_relation_info( + source_entity, target_entity, include_vector_data=True + ) + except Exception as e: + logger.error( + f"Error while editing relation from '{source_entity}' to '{target_entity}': {e}" + ) + raise def edit_relation( self, source_entity: str, target_entity: str, updated_data: dict[str, Any] @@ -2245,54 +2256,57 @@ class LightRAG: Returns: Dictionary containing created entity information """ - try: - # Check if entity already exists - existing_node = await self.chunk_entity_relation_graph.has_node(entity_name) - if existing_node: - raise ValueError(f"Entity '{entity_name}' already exists") + graph_db_lock = get_graph_db_lock(enable_logging=False) + # Use graph database lock to ensure atomic graph and vector db operations + async with graph_db_lock: + try: + # Check if entity already exists + existing_node = await self.chunk_entity_relation_graph.has_node(entity_name) + if existing_node: + raise ValueError(f"Entity '{entity_name}' already exists") - # Prepare node data with defaults if missing - node_data = { - "entity_id": entity_name, - "entity_type": entity_data.get("entity_type", "UNKNOWN"), - "description": entity_data.get("description", ""), - "source_id": entity_data.get("source_id", "manual"), - } - - # Add entity to knowledge graph - await self.chunk_entity_relation_graph.upsert_node(entity_name, node_data) - - # Prepare content for entity - description = node_data.get("description", "") - source_id = node_data.get("source_id", "") - entity_type = node_data.get("entity_type", "") - content = entity_name + "\n" + description - - # Calculate entity ID - entity_id = compute_mdhash_id(entity_name, prefix="ent-") - - # Prepare data for vector database update - entity_data_for_vdb = { - entity_id: { - "content": content, - "entity_name": entity_name, - "source_id": source_id, - "description": description, - "entity_type": entity_type, + # Prepare node data with defaults if missing + node_data = { + "entity_id": entity_name, + "entity_type": entity_data.get("entity_type", "UNKNOWN"), + "description": entity_data.get("description", ""), + "source_id": entity_data.get("source_id", "manual"), } - } - # Update vector database - await self.entities_vdb.upsert(entity_data_for_vdb) + # Add entity to knowledge graph + await self.chunk_entity_relation_graph.upsert_node(entity_name, node_data) - # Save changes - await self._edit_entity_done() + # Prepare content for entity + description = node_data.get("description", "") + source_id = node_data.get("source_id", "") + entity_type = node_data.get("entity_type", "") + content = entity_name + "\n" + description - logger.info(f"Entity '{entity_name}' successfully created") - return await self.get_entity_info(entity_name, include_vector_data=True) - except Exception as e: - logger.error(f"Error while creating entity '{entity_name}': {e}") - raise + # Calculate entity ID + entity_id = compute_mdhash_id(entity_name, prefix="ent-") + + # Prepare data for vector database update + entity_data_for_vdb = { + entity_id: { + "content": content, + "entity_name": entity_name, + "source_id": source_id, + "description": description, + "entity_type": entity_type, + } + } + + # Update vector database + await self.entities_vdb.upsert(entity_data_for_vdb) + + # Save changes + await self._edit_entity_done() + + logger.info(f"Entity '{entity_name}' successfully created") + return await self.get_entity_info(entity_name, include_vector_data=True) + except Exception as e: + logger.error(f"Error while creating entity '{entity_name}': {e}") + raise def create_entity( self, entity_name: str, entity_data: dict[str, Any] @@ -2325,86 +2339,89 @@ class LightRAG: Returns: Dictionary containing created relation information """ - try: - # Check if both entities exist - source_exists = await self.chunk_entity_relation_graph.has_node( - source_entity - ) - target_exists = await self.chunk_entity_relation_graph.has_node( - target_entity - ) - - if not source_exists: - raise ValueError(f"Source entity '{source_entity}' does not exist") - if not target_exists: - raise ValueError(f"Target entity '{target_entity}' does not exist") - - # Check if relation already exists - existing_edge = await self.chunk_entity_relation_graph.has_edge( - source_entity, target_entity - ) - if existing_edge: - raise ValueError( - f"Relation from '{source_entity}' to '{target_entity}' already exists" + graph_db_lock = get_graph_db_lock(enable_logging=False) + # Use graph database lock to ensure atomic graph and vector db operations + async with graph_db_lock: + try: + # Check if both entities exist + source_exists = await self.chunk_entity_relation_graph.has_node( + source_entity + ) + target_exists = await self.chunk_entity_relation_graph.has_node( + target_entity ) - # Prepare edge data with defaults if missing - edge_data = { - "description": relation_data.get("description", ""), - "keywords": relation_data.get("keywords", ""), - "source_id": relation_data.get("source_id", "manual"), - "weight": float(relation_data.get("weight", 1.0)), - } + if not source_exists: + raise ValueError(f"Source entity '{source_entity}' does not exist") + if not target_exists: + raise ValueError(f"Target entity '{target_entity}' does not exist") - # Add relation to knowledge graph - await self.chunk_entity_relation_graph.upsert_edge( - source_entity, target_entity, edge_data - ) + # Check if relation already exists + existing_edge = await self.chunk_entity_relation_graph.has_edge( + source_entity, target_entity + ) + if existing_edge: + raise ValueError( + f"Relation from '{source_entity}' to '{target_entity}' already exists" + ) - # Prepare content for embedding - description = edge_data.get("description", "") - keywords = edge_data.get("keywords", "") - source_id = edge_data.get("source_id", "") - weight = edge_data.get("weight", 1.0) - - # Create content for embedding - content = f"{keywords}\t{source_entity}\n{target_entity}\n{description}" - - # Calculate relation ID - relation_id = compute_mdhash_id( - source_entity + target_entity, prefix="rel-" - ) - - # Prepare data for vector database update - relation_data_for_vdb = { - relation_id: { - "content": content, - "src_id": source_entity, - "tgt_id": target_entity, - "source_id": source_id, - "description": description, - "keywords": keywords, - "weight": weight, + # Prepare edge data with defaults if missing + edge_data = { + "description": relation_data.get("description", ""), + "keywords": relation_data.get("keywords", ""), + "source_id": relation_data.get("source_id", "manual"), + "weight": float(relation_data.get("weight", 1.0)), } - } - # Update vector database - await self.relationships_vdb.upsert(relation_data_for_vdb) + # Add relation to knowledge graph + await self.chunk_entity_relation_graph.upsert_edge( + source_entity, target_entity, edge_data + ) - # Save changes - await self._edit_relation_done() + # Prepare content for embedding + description = edge_data.get("description", "") + keywords = edge_data.get("keywords", "") + source_id = edge_data.get("source_id", "") + weight = edge_data.get("weight", 1.0) - logger.info( - f"Relation from '{source_entity}' to '{target_entity}' successfully created" - ) - return await self.get_relation_info( - source_entity, target_entity, include_vector_data=True - ) - except Exception as e: - logger.error( - f"Error while creating relation from '{source_entity}' to '{target_entity}': {e}" - ) - raise + # Create content for embedding + content = f"{keywords}\t{source_entity}\n{target_entity}\n{description}" + + # Calculate relation ID + relation_id = compute_mdhash_id( + source_entity + target_entity, prefix="rel-" + ) + + # Prepare data for vector database update + relation_data_for_vdb = { + relation_id: { + "content": content, + "src_id": source_entity, + "tgt_id": target_entity, + "source_id": source_id, + "description": description, + "keywords": keywords, + "weight": weight, + } + } + + # Update vector database + await self.relationships_vdb.upsert(relation_data_for_vdb) + + # Save changes + await self._edit_relation_done() + + logger.info( + f"Relation from '{source_entity}' to '{target_entity}' successfully created" + ) + return await self.get_relation_info( + source_entity, target_entity, include_vector_data=True + ) + except Exception as e: + logger.error( + f"Error while creating relation from '{source_entity}' to '{target_entity}': {e}" + ) + raise def create_relation( self, source_entity: str, target_entity: str, relation_data: dict[str, Any] @@ -2426,7 +2443,6 @@ class LightRAG: self.acreate_relation(source_entity, target_entity, relation_data) ) - # TODO: Lock all KG relative DB to esure consistency across multiple processes async def amerge_entities( self, source_entities: list[str], @@ -2454,221 +2470,224 @@ class LightRAG: Returns: Dictionary containing the merged entity information """ - try: - # Default merge strategy - default_strategy = { - "description": "concatenate", - "entity_type": "keep_first", - "source_id": "join_unique", - } - - merge_strategy = ( - default_strategy - if merge_strategy is None - else {**default_strategy, **merge_strategy} - ) - target_entity_data = ( - {} if target_entity_data is None else target_entity_data - ) - - # 1. Check if all source entities exist - source_entities_data = {} - for entity_name in source_entities: - node_exists = await self.chunk_entity_relation_graph.has_node( - entity_name - ) - if not node_exists: - raise ValueError(f"Source entity '{entity_name}' does not exist") - node_data = await self.chunk_entity_relation_graph.get_node(entity_name) - source_entities_data[entity_name] = node_data - - # 2. Check if target entity exists and get its data if it does - target_exists = await self.chunk_entity_relation_graph.has_node( - target_entity - ) - existing_target_entity_data = {} - if target_exists: - existing_target_entity_data = ( - await self.chunk_entity_relation_graph.get_node(target_entity) - ) - logger.info( - f"Target entity '{target_entity}' already exists, will merge data" - ) - - # 3. Merge entity data - merged_entity_data = self._merge_entity_attributes( - list(source_entities_data.values()) - + ([existing_target_entity_data] if target_exists else []), - merge_strategy, - ) - - # Apply any explicitly provided target entity data (overrides merged data) - for key, value in target_entity_data.items(): - merged_entity_data[key] = value - - # 4. Get all relationships of the source entities - all_relations = [] - for entity_name in source_entities: - # Get all relationships of the source entities - edges = await self.chunk_entity_relation_graph.get_node_edges( - entity_name - ) - if edges: - for src, tgt in edges: - # Ensure src is the current entity - if src == entity_name: - edge_data = await self.chunk_entity_relation_graph.get_edge( - src, tgt - ) - all_relations.append((src, tgt, edge_data)) - - # 5. Create or update the target entity - merged_entity_data["entity_id"] = target_entity - if not target_exists: - await self.chunk_entity_relation_graph.upsert_node( - target_entity, merged_entity_data - ) - logger.info(f"Created new target entity '{target_entity}'") - else: - await self.chunk_entity_relation_graph.upsert_node( - target_entity, merged_entity_data - ) - logger.info(f"Updated existing target entity '{target_entity}'") - - # 6. Recreate all relationships, pointing to the target entity - relation_updates = {} # Track relationships that need to be merged - relations_to_delete = [] - - for src, tgt, edge_data in all_relations: - relations_to_delete.append(compute_mdhash_id(src + tgt, prefix="rel-")) - relations_to_delete.append(compute_mdhash_id(tgt + src, prefix="rel-")) - new_src = target_entity if src in source_entities else src - new_tgt = target_entity if tgt in source_entities else tgt - - # Skip relationships between source entities to avoid self-loops - if new_src == new_tgt: - logger.info( - f"Skipping relationship between source entities: {src} -> {tgt} to avoid self-loop" - ) - continue - - # Check if the same relationship already exists - relation_key = f"{new_src}|{new_tgt}" - if relation_key in relation_updates: - # Merge relationship data - existing_data = relation_updates[relation_key]["data"] - merged_relation = self._merge_relation_attributes( - [existing_data, edge_data], - { - "description": "concatenate", - "keywords": "join_unique", - "source_id": "join_unique", - "weight": "max", - }, - ) - relation_updates[relation_key]["data"] = merged_relation - logger.info( - f"Merged duplicate relationship: {new_src} -> {new_tgt}" - ) - else: - relation_updates[relation_key] = { - "src": new_src, - "tgt": new_tgt, - "data": edge_data.copy(), - } - - # Apply relationship updates - for rel_data in relation_updates.values(): - await self.chunk_entity_relation_graph.upsert_edge( - rel_data["src"], rel_data["tgt"], rel_data["data"] - ) - logger.info( - f"Created or updated relationship: {rel_data['src']} -> {rel_data['tgt']}" - ) - - # Delete relationships records from vector database - await self.relationships_vdb.delete(relations_to_delete) - logger.info( - f"Deleted {len(relations_to_delete)} relation records for entity '{entity_name}' from vector database" - ) - - # 7. Update entity vector representation - description = merged_entity_data.get("description", "") - source_id = merged_entity_data.get("source_id", "") - entity_type = merged_entity_data.get("entity_type", "") - content = target_entity + "\n" + description - - entity_id = compute_mdhash_id(target_entity, prefix="ent-") - entity_data_for_vdb = { - entity_id: { - "content": content, - "entity_name": target_entity, - "source_id": source_id, - "description": description, - "entity_type": entity_type, + graph_db_lock = get_graph_db_lock(enable_logging=False) + # Use graph database lock to ensure atomic graph and vector db operations + async with graph_db_lock: + try: + # Default merge strategy + default_strategy = { + "description": "concatenate", + "entity_type": "keep_first", + "source_id": "join_unique", } - } - await self.entities_vdb.upsert(entity_data_for_vdb) + merge_strategy = ( + default_strategy + if merge_strategy is None + else {**default_strategy, **merge_strategy} + ) + target_entity_data = ( + {} if target_entity_data is None else target_entity_data + ) - # 8. Update relationship vector representations - for rel_data in relation_updates.values(): - src = rel_data["src"] - tgt = rel_data["tgt"] - edge_data = rel_data["data"] + # 1. Check if all source entities exist + source_entities_data = {} + for entity_name in source_entities: + node_exists = await self.chunk_entity_relation_graph.has_node( + entity_name + ) + if not node_exists: + raise ValueError(f"Source entity '{entity_name}' does not exist") + node_data = await self.chunk_entity_relation_graph.get_node(entity_name) + source_entities_data[entity_name] = node_data - description = edge_data.get("description", "") - keywords = edge_data.get("keywords", "") - source_id = edge_data.get("source_id", "") - weight = float(edge_data.get("weight", 1.0)) + # 2. Check if target entity exists and get its data if it does + target_exists = await self.chunk_entity_relation_graph.has_node( + target_entity + ) + existing_target_entity_data = {} + if target_exists: + existing_target_entity_data = ( + await self.chunk_entity_relation_graph.get_node(target_entity) + ) + logger.info( + f"Target entity '{target_entity}' already exists, will merge data" + ) - content = f"{keywords}\t{src}\n{tgt}\n{description}" - relation_id = compute_mdhash_id(src + tgt, prefix="rel-") + # 3. Merge entity data + merged_entity_data = self._merge_entity_attributes( + list(source_entities_data.values()) + + ([existing_target_entity_data] if target_exists else []), + merge_strategy, + ) - relation_data_for_vdb = { - relation_id: { + # Apply any explicitly provided target entity data (overrides merged data) + for key, value in target_entity_data.items(): + merged_entity_data[key] = value + + # 4. Get all relationships of the source entities + all_relations = [] + for entity_name in source_entities: + # Get all relationships of the source entities + edges = await self.chunk_entity_relation_graph.get_node_edges( + entity_name + ) + if edges: + for src, tgt in edges: + # Ensure src is the current entity + if src == entity_name: + edge_data = await self.chunk_entity_relation_graph.get_edge( + src, tgt + ) + all_relations.append((src, tgt, edge_data)) + + # 5. Create or update the target entity + merged_entity_data["entity_id"] = target_entity + if not target_exists: + await self.chunk_entity_relation_graph.upsert_node( + target_entity, merged_entity_data + ) + logger.info(f"Created new target entity '{target_entity}'") + else: + await self.chunk_entity_relation_graph.upsert_node( + target_entity, merged_entity_data + ) + logger.info(f"Updated existing target entity '{target_entity}'") + + # 6. Recreate all relationships, pointing to the target entity + relation_updates = {} # Track relationships that need to be merged + relations_to_delete = [] + + for src, tgt, edge_data in all_relations: + relations_to_delete.append(compute_mdhash_id(src + tgt, prefix="rel-")) + relations_to_delete.append(compute_mdhash_id(tgt + src, prefix="rel-")) + new_src = target_entity if src in source_entities else src + new_tgt = target_entity if tgt in source_entities else tgt + + # Skip relationships between source entities to avoid self-loops + if new_src == new_tgt: + logger.info( + f"Skipping relationship between source entities: {src} -> {tgt} to avoid self-loop" + ) + continue + + # Check if the same relationship already exists + relation_key = f"{new_src}|{new_tgt}" + if relation_key in relation_updates: + # Merge relationship data + existing_data = relation_updates[relation_key]["data"] + merged_relation = self._merge_relation_attributes( + [existing_data, edge_data], + { + "description": "concatenate", + "keywords": "join_unique", + "source_id": "join_unique", + "weight": "max", + }, + ) + relation_updates[relation_key]["data"] = merged_relation + logger.info( + f"Merged duplicate relationship: {new_src} -> {new_tgt}" + ) + else: + relation_updates[relation_key] = { + "src": new_src, + "tgt": new_tgt, + "data": edge_data.copy(), + } + + # Apply relationship updates + for rel_data in relation_updates.values(): + await self.chunk_entity_relation_graph.upsert_edge( + rel_data["src"], rel_data["tgt"], rel_data["data"] + ) + logger.info( + f"Created or updated relationship: {rel_data['src']} -> {rel_data['tgt']}" + ) + + # Delete relationships records from vector database + await self.relationships_vdb.delete(relations_to_delete) + logger.info( + f"Deleted {len(relations_to_delete)} relation records for entity '{entity_name}' from vector database" + ) + + # 7. Update entity vector representation + description = merged_entity_data.get("description", "") + source_id = merged_entity_data.get("source_id", "") + entity_type = merged_entity_data.get("entity_type", "") + content = target_entity + "\n" + description + + entity_id = compute_mdhash_id(target_entity, prefix="ent-") + entity_data_for_vdb = { + entity_id: { "content": content, - "src_id": src, - "tgt_id": tgt, + "entity_name": target_entity, "source_id": source_id, "description": description, - "keywords": keywords, - "weight": weight, + "entity_type": entity_type, } } - await self.relationships_vdb.upsert(relation_data_for_vdb) + await self.entities_vdb.upsert(entity_data_for_vdb) + + # 8. Update relationship vector representations + for rel_data in relation_updates.values(): + src = rel_data["src"] + tgt = rel_data["tgt"] + edge_data = rel_data["data"] + + description = edge_data.get("description", "") + keywords = edge_data.get("keywords", "") + source_id = edge_data.get("source_id", "") + weight = float(edge_data.get("weight", 1.0)) + + content = f"{keywords}\t{src}\n{tgt}\n{description}" + relation_id = compute_mdhash_id(src + tgt, prefix="rel-") + + relation_data_for_vdb = { + relation_id: { + "content": content, + "src_id": src, + "tgt_id": tgt, + "source_id": source_id, + "description": description, + "keywords": keywords, + "weight": weight, + } + } + + await self.relationships_vdb.upsert(relation_data_for_vdb) + + # 9. Delete source entities + for entity_name in source_entities: + if entity_name == target_entity: + logger.info( + f"Skipping deletion of '{entity_name}' as it's also the target entity" + ) + continue + + # Delete entity node from knowledge graph + await self.chunk_entity_relation_graph.delete_node(entity_name) + + # Delete entity record from vector database + entity_id = compute_mdhash_id(entity_name, prefix="ent-") + await self.entities_vdb.delete([entity_id]) - # 9. Delete source entities - for entity_name in source_entities: - if entity_name == target_entity: logger.info( - f"Skipping deletion of '{entity_name}' as it's also the target entity" + f"Deleted source entity '{entity_name}' and its vector embedding from database" ) - continue - # Delete entity node from knowledge graph - await self.chunk_entity_relation_graph.delete_node(entity_name) - - # Delete entity record from vector database - entity_id = compute_mdhash_id(entity_name, prefix="ent-") - await self.entities_vdb.delete([entity_id]) + # 10. Save changes + await self._merge_entities_done() logger.info( - f"Deleted source entity '{entity_name}' and its vector embedding from database" + f"Successfully merged {len(source_entities)} entities into '{target_entity}'" ) + return await self.get_entity_info(target_entity, include_vector_data=True) - # 10. Save changes - await self._merge_entities_done() - - logger.info( - f"Successfully merged {len(source_entities)} entities into '{target_entity}'" - ) - return await self.get_entity_info(target_entity, include_vector_data=True) - - except Exception as e: - logger.error(f"Error merging entities: {e}") - raise + except Exception as e: + logger.error(f"Error merging entities: {e}") + raise async def aexport_data( self,