diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 6a10a295..f83f9dbb 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -3,15 +3,11 @@ from __future__ import annotations import asyncio import configparser import os -import csv import warnings from dataclasses import asdict, dataclass, field from datetime import datetime from functools import partial from typing import Any, AsyncIterator, Callable, Iterator, cast, final, Literal -import pandas as pd - -from .kg.shared_storage import get_graph_db_lock from lightrag.kg import ( STORAGES, @@ -1429,111 +1425,60 @@ class LightRAG: async def _query_done(self): await self.llm_response_cache.index_done_callback() - def delete_by_entity(self, entity_name: str) -> None: - loop = always_get_an_event_loop() - return loop.run_until_complete(self.adelete_by_entity(entity_name)) - async def adelete_by_entity(self, entity_name: str) -> None: - graph_db_lock = get_graph_db_lock(enable_logging=False) - # Use graph database lock to ensure atomic graph and vector db operations - async with graph_db_lock: - try: - await self.entities_vdb.delete_entity(entity_name) - await self.relationships_vdb.delete_entity_relation(entity_name) - await self.chunk_entity_relation_graph.delete_node(entity_name) - - logger.info( - f"Entity '{entity_name}' and its relationships have been deleted." - ) - await self._delete_by_entity_done() - except Exception as e: - logger.error(f"Error while deleting entity '{entity_name}': {e}") - - async def _delete_by_entity_done(self) -> None: - await asyncio.gather( - *[ - cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in [ # type: ignore - self.entities_vdb, - self.relationships_vdb, - self.chunk_entity_relation_graph, - ] - ] - ) - - def delete_by_relation(self, source_entity: str, target_entity: str) -> None: - """Synchronously delete a relation between two entities. + async def aclear_cache(self, modes: list[str] | None = None) -> None: + """Clear cache data from the LLM response cache storage. Args: - source_entity: Name of the source entity - target_entity: Name of the target entity + modes (list[str] | None): Modes of cache to clear. Options: ["default", "naive", "local", "global", "hybrid", "mix"]. + "default" represents extraction cache. + If None, clears all cache. + + Example: + # Clear all cache + await rag.aclear_cache() + + # Clear local mode cache + await rag.aclear_cache(modes=["local"]) + + # Clear extraction cache + await rag.aclear_cache(modes=["default"]) """ - loop = always_get_an_event_loop() - return loop.run_until_complete( - self.adelete_by_relation(source_entity, target_entity) - ) + if not self.llm_response_cache: + logger.warning("No cache storage configured") + return - async def adelete_by_relation(self, source_entity: str, target_entity: str) -> None: - """Asynchronously delete a relation between two entities. + valid_modes = ["default", "naive", "local", "global", "hybrid", "mix"] - Args: - source_entity: Name of the source entity - target_entity: Name of the target entity - """ - graph_db_lock = get_graph_db_lock(enable_logging=False) - # Use graph database lock to ensure atomic graph and vector db operations - async with graph_db_lock: - try: - # TODO: check if has_edge function works on reverse relation - # Check if the relation exists - edge_exists = await self.chunk_entity_relation_graph.has_edge( - source_entity, target_entity - ) - if not edge_exists: - logger.warning( - f"Relation from '{source_entity}' to '{target_entity}' does not exist" - ) - return + # Validate input + if modes and not all(mode in valid_modes for mode in modes): + raise ValueError(f"Invalid mode. Valid modes are: {valid_modes}") - # Delete relation from vector database - relation_id = compute_mdhash_id( - source_entity + target_entity, prefix="rel-" - ) - await self.relationships_vdb.delete([relation_id]) + try: + # Reset the cache storage for specified mode + if modes: + success = await self.llm_response_cache.drop_cache_by_modes(modes) + if success: + logger.info(f"Cleared cache for modes: {modes}") + else: + logger.warning(f"Failed to clear cache for modes: {modes}") + else: + # Clear all modes + success = await self.llm_response_cache.drop_cache_by_modes(valid_modes) + if success: + logger.info("Cleared all cache") + else: + logger.warning("Failed to clear all cache") - # Delete relation from knowledge graph - await self.chunk_entity_relation_graph.remove_edges( - [(source_entity, target_entity)] - ) + await self.llm_response_cache.index_done_callback() - logger.info( - f"Successfully deleted relation from '{source_entity}' to '{target_entity}'" - ) - await self._delete_relation_done() - except Exception as e: - logger.error( - f"Error while deleting relation from '{source_entity}' to '{target_entity}': {e}" - ) + except Exception as e: + logger.error(f"Error while clearing cache: {e}") - async def _delete_relation_done(self) -> None: - """Callback after relation deletion is complete""" - await asyncio.gather( - *[ - cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in [ # type: ignore - self.relationships_vdb, - self.chunk_entity_relation_graph, - ] - ] - ) + def clear_cache(self, modes: list[str] | None = None) -> None: + """Synchronous version of aclear_cache.""" + return always_get_an_event_loop().run_until_complete(self.aclear_cache(modes)) - async def get_processing_status(self) -> dict[str, int]: - """Get current document processing status counts - - Returns: - Dict with counts for each status - """ - return await self.doc_status.get_status_counts() async def get_docs_by_status( self, status: DocStatus @@ -1803,107 +1748,76 @@ class LightRAG: except Exception as e: logger.error(f"Error while deleting document {doc_id}: {e}") + + async def adelete_by_entity(self, entity_name: str) -> None: + """Asynchronously delete an entity and all its relationships. + + Args: + entity_name: Name of the entity to delete + """ + from .utils_graph import adelete_by_entity + return await adelete_by_entity( + self.chunk_entity_relation_graph, + self.entities_vdb, + self.relationships_vdb, + entity_name + ) + + def delete_by_entity(self, entity_name: str) -> None: + loop = always_get_an_event_loop() + return loop.run_until_complete(self.adelete_by_entity(entity_name)) + + async def adelete_by_relation(self, source_entity: str, target_entity: str) -> None: + """Asynchronously delete a relation between two entities. + + Args: + source_entity: Name of the source entity + target_entity: Name of the target entity + """ + from .utils_graph import adelete_by_relation + return await adelete_by_relation( + self.chunk_entity_relation_graph, + self.relationships_vdb, + source_entity, + target_entity + ) + + def delete_by_relation(self, source_entity: str, target_entity: str) -> None: + loop = always_get_an_event_loop() + return loop.run_until_complete(self.adelete_by_relation(source_entity, target_entity)) + + async def get_processing_status(self) -> dict[str, int]: + """Get current document processing status counts + + Returns: + Dict with counts for each status + """ + return await self.doc_status.get_status_counts() + async def get_entity_info( self, entity_name: str, include_vector_data: bool = False ) -> dict[str, str | None | dict[str, str]]: """Get detailed information of an entity""" - - # Get information from the graph - node_data = await self.chunk_entity_relation_graph.get_node(entity_name) - source_id = node_data.get("source_id") if node_data else None - - result: dict[str, str | None | dict[str, str]] = { - "entity_name": entity_name, - "source_id": source_id, - "graph_data": node_data, - } - - # Optional: Get vector database information - if include_vector_data: - entity_id = compute_mdhash_id(entity_name, prefix="ent-") - vector_data = await self.entities_vdb.get_by_id(entity_id) - result["vector_data"] = vector_data - - return result + from .utils_graph import get_entity_info + return await get_entity_info( + self.chunk_entity_relation_graph, + self.entities_vdb, + entity_name, + include_vector_data + ) async def get_relation_info( self, src_entity: str, tgt_entity: str, include_vector_data: bool = False ) -> dict[str, str | None | dict[str, str]]: """Get detailed information of a relationship""" - - # Get information from the graph - edge_data = await self.chunk_entity_relation_graph.get_edge( - src_entity, tgt_entity + from .utils_graph import get_relation_info + return await get_relation_info( + self.chunk_entity_relation_graph, + self.relationships_vdb, + src_entity, + tgt_entity, + include_vector_data ) - source_id = edge_data.get("source_id") if edge_data else None - - result: dict[str, str | None | dict[str, str]] = { - "src_entity": src_entity, - "tgt_entity": tgt_entity, - "source_id": source_id, - "graph_data": edge_data, - } - - # Optional: Get vector database information - if include_vector_data: - rel_id = compute_mdhash_id(src_entity + tgt_entity, prefix="rel-") - vector_data = await self.relationships_vdb.get_by_id(rel_id) - result["vector_data"] = vector_data - - return result - - async def aclear_cache(self, modes: list[str] | None = None) -> None: - """Clear cache data from the LLM response cache storage. - - Args: - modes (list[str] | None): Modes of cache to clear. Options: ["default", "naive", "local", "global", "hybrid", "mix"]. - "default" represents extraction cache. - If None, clears all cache. - - Example: - # Clear all cache - await rag.aclear_cache() - - # Clear local mode cache - await rag.aclear_cache(modes=["local"]) - - # Clear extraction cache - await rag.aclear_cache(modes=["default"]) - """ - if not self.llm_response_cache: - logger.warning("No cache storage configured") - return - - valid_modes = ["default", "naive", "local", "global", "hybrid", "mix"] - - # Validate input - if modes and not all(mode in valid_modes for mode in modes): - raise ValueError(f"Invalid mode. Valid modes are: {valid_modes}") - - try: - # Reset the cache storage for specified mode - if modes: - success = await self.llm_response_cache.drop_cache_by_modes(modes) - if success: - logger.info(f"Cleared cache for modes: {modes}") - else: - logger.warning(f"Failed to clear cache for modes: {modes}") - else: - # Clear all modes - success = await self.llm_response_cache.drop_cache_by_modes(valid_modes) - if success: - logger.info("Cleared all cache") - else: - logger.warning("Failed to clear all cache") - - await self.llm_response_cache.index_done_callback() - - except Exception as e: - logger.error(f"Error while clearing cache: {e}") - - def clear_cache(self, modes: list[str] | None = None) -> None: - """Synchronous version of aclear_cache.""" - return always_get_an_event_loop().run_until_complete(self.aclear_cache(modes)) async def aedit_entity( self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True @@ -1920,206 +1834,24 @@ class LightRAG: Returns: Dictionary containing updated entity information """ - graph_db_lock = get_graph_db_lock(enable_logging=False) - # Use graph database lock to ensure atomic graph and vector db operations - async with graph_db_lock: - try: - # 1. Get current entity information - node_exists = await self.chunk_entity_relation_graph.has_node(entity_name) - if not node_exists: - raise ValueError(f"Entity '{entity_name}' does not exist") - node_data = await self.chunk_entity_relation_graph.get_node(entity_name) - - # Check if entity is being renamed - new_entity_name = updated_data.get("entity_name", entity_name) - is_renaming = new_entity_name != entity_name - - # If renaming, check if new name already exists - if is_renaming: - if not allow_rename: - raise ValueError( - "Entity renaming is not allowed. Set allow_rename=True to enable this feature" - ) - - existing_node = await self.chunk_entity_relation_graph.has_node( - new_entity_name - ) - if existing_node: - raise ValueError( - f"Entity name '{new_entity_name}' already exists, cannot rename" - ) - - # 2. Update entity information in the graph - new_node_data = {**node_data, **updated_data} - new_node_data["entity_id"] = new_entity_name - - if "entity_name" in new_node_data: - del new_node_data[ - "entity_name" - ] # Node data should not contain entity_name field - - # If renaming entity - if is_renaming: - logger.info(f"Renaming entity '{entity_name}' to '{new_entity_name}'") - - # Create new entity - await self.chunk_entity_relation_graph.upsert_node( - new_entity_name, new_node_data - ) - - # Store relationships that need to be updated - relations_to_update = [] - relations_to_delete = [] - # Get all edges related to the original entity - edges = await self.chunk_entity_relation_graph.get_node_edges( - entity_name - ) - if edges: - # Recreate edges for the new entity - for source, target in edges: - edge_data = await self.chunk_entity_relation_graph.get_edge( - source, target - ) - if edge_data: - relations_to_delete.append( - compute_mdhash_id(source + target, prefix="rel-") - ) - relations_to_delete.append( - compute_mdhash_id(target + source, prefix="rel-") - ) - if source == entity_name: - await self.chunk_entity_relation_graph.upsert_edge( - new_entity_name, target, edge_data - ) - relations_to_update.append( - (new_entity_name, target, edge_data) - ) - else: # target == entity_name - await self.chunk_entity_relation_graph.upsert_edge( - source, new_entity_name, edge_data - ) - relations_to_update.append( - (source, new_entity_name, edge_data) - ) - - # Delete old entity - await self.chunk_entity_relation_graph.delete_node(entity_name) - - # Delete old entity record from vector database - old_entity_id = compute_mdhash_id(entity_name, prefix="ent-") - await self.entities_vdb.delete([old_entity_id]) - logger.info( - f"Deleted old entity '{entity_name}' and its vector embedding from database" - ) - - # Delete old relation records from vector database - await self.relationships_vdb.delete(relations_to_delete) - logger.info( - f"Deleted {len(relations_to_delete)} relation records for entity '{entity_name}' from vector database" - ) - - # Update relationship vector representations - for src, tgt, edge_data in relations_to_update: - description = edge_data.get("description", "") - keywords = edge_data.get("keywords", "") - source_id = edge_data.get("source_id", "") - weight = float(edge_data.get("weight", 1.0)) - - # Create new content for embedding - content = f"{src}\t{tgt}\n{keywords}\n{description}" - - # Calculate relationship ID - relation_id = compute_mdhash_id(src + tgt, prefix="rel-") - - # Prepare data for vector database update - relation_data = { - relation_id: { - "content": content, - "src_id": src, - "tgt_id": tgt, - "source_id": source_id, - "description": description, - "keywords": keywords, - "weight": weight, - } - } - - # Update vector database - await self.relationships_vdb.upsert(relation_data) - - # Update working entity name to new name - entity_name = new_entity_name - else: - # If not renaming, directly update node data - await self.chunk_entity_relation_graph.upsert_node( - entity_name, new_node_data - ) - - # 3. Recalculate entity's vector representation and update vector database - description = new_node_data.get("description", "") - source_id = new_node_data.get("source_id", "") - entity_type = new_node_data.get("entity_type", "") - content = entity_name + "\n" + description - - # Calculate entity ID - entity_id = compute_mdhash_id(entity_name, prefix="ent-") - - # Prepare data for vector database update - entity_data = { - entity_id: { - "content": content, - "entity_name": entity_name, - "source_id": source_id, - "description": description, - "entity_type": entity_type, - } - } - - # Update vector database - await self.entities_vdb.upsert(entity_data) - - # 4. Save changes - await self._edit_entity_done() - - logger.info(f"Entity '{entity_name}' successfully updated") - return await self.get_entity_info(entity_name, include_vector_data=True) - except Exception as e: - logger.error(f"Error while editing entity '{entity_name}': {e}") - raise + from .utils_graph import aedit_entity + return await aedit_entity( + self.chunk_entity_relation_graph, + self.entities_vdb, + self.relationships_vdb, + entity_name, + updated_data, + allow_rename + ) def edit_entity( self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True ) -> dict[str, Any]: - """Synchronously edit entity information. - - Updates entity information in the knowledge graph and re-embeds the entity in the vector database. - - Args: - entity_name: Name of the entity to edit - updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"} - allow_rename: Whether to allow entity renaming, defaults to True - - Returns: - Dictionary containing updated entity information - """ loop = always_get_an_event_loop() return loop.run_until_complete( self.aedit_entity(entity_name, updated_data, allow_rename) ) - async def _edit_entity_done(self) -> None: - """Callback after entity editing is complete, ensures updates are persisted""" - await asyncio.gather( - *[ - cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in [ # type: ignore - self.entities_vdb, - self.relationships_vdb, - self.chunk_entity_relation_graph, - ] - ] - ) - async def aedit_relation( self, source_entity: str, target_entity: str, updated_data: dict[str, Any] ) -> dict[str, Any]: @@ -2135,113 +1867,24 @@ class LightRAG: Returns: Dictionary containing updated relation information """ - graph_db_lock = get_graph_db_lock(enable_logging=False) - # Use graph database lock to ensure atomic graph and vector db operations - async with graph_db_lock: - try: - # 1. Get current relation information - edge_exists = await self.chunk_entity_relation_graph.has_edge( - source_entity, target_entity - ) - if not edge_exists: - raise ValueError( - f"Relation from '{source_entity}' to '{target_entity}' does not exist" - ) - edge_data = await self.chunk_entity_relation_graph.get_edge( - source_entity, target_entity - ) - # Important: First delete the old relation record from the vector database - old_relation_id = compute_mdhash_id( - source_entity + target_entity, prefix="rel-" - ) - await self.relationships_vdb.delete([old_relation_id]) - logger.info( - f"Deleted old relation record from vector database for relation {source_entity} -> {target_entity}" - ) - - # 2. Update relation information in the graph - new_edge_data = {**edge_data, **updated_data} - await self.chunk_entity_relation_graph.upsert_edge( - source_entity, target_entity, new_edge_data - ) - - # 3. Recalculate relation's vector representation and update vector database - description = new_edge_data.get("description", "") - keywords = new_edge_data.get("keywords", "") - source_id = new_edge_data.get("source_id", "") - weight = float(new_edge_data.get("weight", 1.0)) - - # Create content for embedding - content = f"{source_entity}\t{target_entity}\n{keywords}\n{description}" - - # Calculate relation ID - relation_id = compute_mdhash_id( - source_entity + target_entity, prefix="rel-" - ) - - # Prepare data for vector database update - relation_data = { - relation_id: { - "content": content, - "src_id": source_entity, - "tgt_id": target_entity, - "source_id": source_id, - "description": description, - "keywords": keywords, - "weight": weight, - } - } - - # Update vector database - await self.relationships_vdb.upsert(relation_data) - - # 4. Save changes - await self._edit_relation_done() - - logger.info( - f"Relation from '{source_entity}' to '{target_entity}' successfully updated" - ) - return await self.get_relation_info( - source_entity, target_entity, include_vector_data=True - ) - except Exception as e: - logger.error( - f"Error while editing relation from '{source_entity}' to '{target_entity}': {e}" - ) - raise + from .utils_graph import aedit_relation + return await aedit_relation( + self.chunk_entity_relation_graph, + self.entities_vdb, + self.relationships_vdb, + source_entity, + target_entity, + updated_data + ) def edit_relation( self, source_entity: str, target_entity: str, updated_data: dict[str, Any] ) -> dict[str, Any]: - """Synchronously edit relation information. - - Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database. - - Args: - source_entity: Name of the source entity - target_entity: Name of the target entity - updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "keywords": "keywords"} - - Returns: - Dictionary containing updated relation information - """ loop = always_get_an_event_loop() return loop.run_until_complete( self.aedit_relation(source_entity, target_entity, updated_data) ) - async def _edit_relation_done(self) -> None: - """Callback after relation editing is complete, ensures updates are persisted""" - await asyncio.gather( - *[ - cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in [ # type: ignore - self.relationships_vdb, - self.chunk_entity_relation_graph, - ] - ] - ) - async def acreate_entity( self, entity_name: str, entity_data: dict[str, Any] ) -> dict[str, Any]: @@ -2256,71 +1899,18 @@ class LightRAG: Returns: Dictionary containing created entity information """ - graph_db_lock = get_graph_db_lock(enable_logging=False) - # Use graph database lock to ensure atomic graph and vector db operations - async with graph_db_lock: - try: - # Check if entity already exists - existing_node = await self.chunk_entity_relation_graph.has_node(entity_name) - if existing_node: - raise ValueError(f"Entity '{entity_name}' already exists") - - # Prepare node data with defaults if missing - node_data = { - "entity_id": entity_name, - "entity_type": entity_data.get("entity_type", "UNKNOWN"), - "description": entity_data.get("description", ""), - "source_id": entity_data.get("source_id", "manual"), - } - - # Add entity to knowledge graph - await self.chunk_entity_relation_graph.upsert_node(entity_name, node_data) - - # Prepare content for entity - description = node_data.get("description", "") - source_id = node_data.get("source_id", "") - entity_type = node_data.get("entity_type", "") - content = entity_name + "\n" + description - - # Calculate entity ID - entity_id = compute_mdhash_id(entity_name, prefix="ent-") - - # Prepare data for vector database update - entity_data_for_vdb = { - entity_id: { - "content": content, - "entity_name": entity_name, - "source_id": source_id, - "description": description, - "entity_type": entity_type, - } - } - - # Update vector database - await self.entities_vdb.upsert(entity_data_for_vdb) - - # Save changes - await self._edit_entity_done() - - logger.info(f"Entity '{entity_name}' successfully created") - return await self.get_entity_info(entity_name, include_vector_data=True) - except Exception as e: - logger.error(f"Error while creating entity '{entity_name}': {e}") - raise + from .utils_graph import acreate_entity + return await acreate_entity( + self.chunk_entity_relation_graph, + self.entities_vdb, + self.relationships_vdb, + entity_name, + entity_data + ) def create_entity( self, entity_name: str, entity_data: dict[str, Any] ) -> dict[str, Any]: - """Synchronously create a new entity. - - Creates a new entity in the knowledge graph and adds it to the vector database. - Args: - entity_name: Name of the new entity - entity_data: Dictionary containing entity attributes, e.g. {"description": "description", "entity_type": "type"} - - Returns: - Dictionary containing created entity information - """ loop = always_get_an_event_loop() return loop.run_until_complete(self.acreate_entity(entity_name, entity_data)) @@ -2339,105 +1929,19 @@ class LightRAG: Returns: Dictionary containing created relation information """ - graph_db_lock = get_graph_db_lock(enable_logging=False) - # Use graph database lock to ensure atomic graph and vector db operations - async with graph_db_lock: - try: - # Check if both entities exist - source_exists = await self.chunk_entity_relation_graph.has_node( - source_entity - ) - target_exists = await self.chunk_entity_relation_graph.has_node( - target_entity - ) - - if not source_exists: - raise ValueError(f"Source entity '{source_entity}' does not exist") - if not target_exists: - raise ValueError(f"Target entity '{target_entity}' does not exist") - - # Check if relation already exists - existing_edge = await self.chunk_entity_relation_graph.has_edge( - source_entity, target_entity - ) - if existing_edge: - raise ValueError( - f"Relation from '{source_entity}' to '{target_entity}' already exists" - ) - - # Prepare edge data with defaults if missing - edge_data = { - "description": relation_data.get("description", ""), - "keywords": relation_data.get("keywords", ""), - "source_id": relation_data.get("source_id", "manual"), - "weight": float(relation_data.get("weight", 1.0)), - } - - # Add relation to knowledge graph - await self.chunk_entity_relation_graph.upsert_edge( - source_entity, target_entity, edge_data - ) - - # Prepare content for embedding - description = edge_data.get("description", "") - keywords = edge_data.get("keywords", "") - source_id = edge_data.get("source_id", "") - weight = edge_data.get("weight", 1.0) - - # Create content for embedding - content = f"{keywords}\t{source_entity}\n{target_entity}\n{description}" - - # Calculate relation ID - relation_id = compute_mdhash_id( - source_entity + target_entity, prefix="rel-" - ) - - # Prepare data for vector database update - relation_data_for_vdb = { - relation_id: { - "content": content, - "src_id": source_entity, - "tgt_id": target_entity, - "source_id": source_id, - "description": description, - "keywords": keywords, - "weight": weight, - } - } - - # Update vector database - await self.relationships_vdb.upsert(relation_data_for_vdb) - - # Save changes - await self._edit_relation_done() - - logger.info( - f"Relation from '{source_entity}' to '{target_entity}' successfully created" - ) - return await self.get_relation_info( - source_entity, target_entity, include_vector_data=True - ) - except Exception as e: - logger.error( - f"Error while creating relation from '{source_entity}' to '{target_entity}': {e}" - ) - raise + from .utils_graph import acreate_relation + return await acreate_relation( + self.chunk_entity_relation_graph, + self.entities_vdb, + self.relationships_vdb, + source_entity, + target_entity, + relation_data + ) def create_relation( self, source_entity: str, target_entity: str, relation_data: dict[str, Any] ) -> dict[str, Any]: - """Synchronously create a new relation between entities. - - Creates a new relation (edge) in the knowledge graph and adds it to the vector database. - - Args: - source_entity: Name of the source entity - target_entity: Name of the target entity - relation_data: Dictionary containing relation attributes, e.g. {"description": "description", "keywords": "keywords"} - - Returns: - Dictionary containing created relation information - """ loop = always_get_an_event_loop() return loop.run_until_complete( self.acreate_relation(source_entity, target_entity, relation_data) @@ -2470,224 +1974,30 @@ class LightRAG: Returns: Dictionary containing the merged entity information """ - graph_db_lock = get_graph_db_lock(enable_logging=False) - # Use graph database lock to ensure atomic graph and vector db operations - async with graph_db_lock: - try: - # Default merge strategy - default_strategy = { - "description": "concatenate", - "entity_type": "keep_first", - "source_id": "join_unique", - } + from .utils_graph import amerge_entities + return await amerge_entities( + self.chunk_entity_relation_graph, + self.entities_vdb, + self.relationships_vdb, + source_entities, + target_entity, + merge_strategy, + target_entity_data + ) - merge_strategy = ( - default_strategy - if merge_strategy is None - else {**default_strategy, **merge_strategy} - ) - target_entity_data = ( - {} if target_entity_data is None else target_entity_data - ) - - # 1. Check if all source entities exist - source_entities_data = {} - for entity_name in source_entities: - node_exists = await self.chunk_entity_relation_graph.has_node( - entity_name - ) - if not node_exists: - raise ValueError(f"Source entity '{entity_name}' does not exist") - node_data = await self.chunk_entity_relation_graph.get_node(entity_name) - source_entities_data[entity_name] = node_data - - # 2. Check if target entity exists and get its data if it does - target_exists = await self.chunk_entity_relation_graph.has_node( - target_entity - ) - existing_target_entity_data = {} - if target_exists: - existing_target_entity_data = ( - await self.chunk_entity_relation_graph.get_node(target_entity) - ) - logger.info( - f"Target entity '{target_entity}' already exists, will merge data" - ) - - # 3. Merge entity data - merged_entity_data = self._merge_entity_attributes( - list(source_entities_data.values()) - + ([existing_target_entity_data] if target_exists else []), - merge_strategy, - ) - - # Apply any explicitly provided target entity data (overrides merged data) - for key, value in target_entity_data.items(): - merged_entity_data[key] = value - - # 4. Get all relationships of the source entities - all_relations = [] - for entity_name in source_entities: - # Get all relationships of the source entities - edges = await self.chunk_entity_relation_graph.get_node_edges( - entity_name - ) - if edges: - for src, tgt in edges: - # Ensure src is the current entity - if src == entity_name: - edge_data = await self.chunk_entity_relation_graph.get_edge( - src, tgt - ) - all_relations.append((src, tgt, edge_data)) - - # 5. Create or update the target entity - merged_entity_data["entity_id"] = target_entity - if not target_exists: - await self.chunk_entity_relation_graph.upsert_node( - target_entity, merged_entity_data - ) - logger.info(f"Created new target entity '{target_entity}'") - else: - await self.chunk_entity_relation_graph.upsert_node( - target_entity, merged_entity_data - ) - logger.info(f"Updated existing target entity '{target_entity}'") - - # 6. Recreate all relationships, pointing to the target entity - relation_updates = {} # Track relationships that need to be merged - relations_to_delete = [] - - for src, tgt, edge_data in all_relations: - relations_to_delete.append(compute_mdhash_id(src + tgt, prefix="rel-")) - relations_to_delete.append(compute_mdhash_id(tgt + src, prefix="rel-")) - new_src = target_entity if src in source_entities else src - new_tgt = target_entity if tgt in source_entities else tgt - - # Skip relationships between source entities to avoid self-loops - if new_src == new_tgt: - logger.info( - f"Skipping relationship between source entities: {src} -> {tgt} to avoid self-loop" - ) - continue - - # Check if the same relationship already exists - relation_key = f"{new_src}|{new_tgt}" - if relation_key in relation_updates: - # Merge relationship data - existing_data = relation_updates[relation_key]["data"] - merged_relation = self._merge_relation_attributes( - [existing_data, edge_data], - { - "description": "concatenate", - "keywords": "join_unique", - "source_id": "join_unique", - "weight": "max", - }, - ) - relation_updates[relation_key]["data"] = merged_relation - logger.info( - f"Merged duplicate relationship: {new_src} -> {new_tgt}" - ) - else: - relation_updates[relation_key] = { - "src": new_src, - "tgt": new_tgt, - "data": edge_data.copy(), - } - - # Apply relationship updates - for rel_data in relation_updates.values(): - await self.chunk_entity_relation_graph.upsert_edge( - rel_data["src"], rel_data["tgt"], rel_data["data"] - ) - logger.info( - f"Created or updated relationship: {rel_data['src']} -> {rel_data['tgt']}" - ) - - # Delete relationships records from vector database - await self.relationships_vdb.delete(relations_to_delete) - logger.info( - f"Deleted {len(relations_to_delete)} relation records for entity '{entity_name}' from vector database" - ) - - # 7. Update entity vector representation - description = merged_entity_data.get("description", "") - source_id = merged_entity_data.get("source_id", "") - entity_type = merged_entity_data.get("entity_type", "") - content = target_entity + "\n" + description - - entity_id = compute_mdhash_id(target_entity, prefix="ent-") - entity_data_for_vdb = { - entity_id: { - "content": content, - "entity_name": target_entity, - "source_id": source_id, - "description": description, - "entity_type": entity_type, - } - } - - await self.entities_vdb.upsert(entity_data_for_vdb) - - # 8. Update relationship vector representations - for rel_data in relation_updates.values(): - src = rel_data["src"] - tgt = rel_data["tgt"] - edge_data = rel_data["data"] - - description = edge_data.get("description", "") - keywords = edge_data.get("keywords", "") - source_id = edge_data.get("source_id", "") - weight = float(edge_data.get("weight", 1.0)) - - content = f"{keywords}\t{src}\n{tgt}\n{description}" - relation_id = compute_mdhash_id(src + tgt, prefix="rel-") - - relation_data_for_vdb = { - relation_id: { - "content": content, - "src_id": src, - "tgt_id": tgt, - "source_id": source_id, - "description": description, - "keywords": keywords, - "weight": weight, - } - } - - await self.relationships_vdb.upsert(relation_data_for_vdb) - - # 9. Delete source entities - for entity_name in source_entities: - if entity_name == target_entity: - logger.info( - f"Skipping deletion of '{entity_name}' as it's also the target entity" - ) - continue - - # Delete entity node from knowledge graph - await self.chunk_entity_relation_graph.delete_node(entity_name) - - # Delete entity record from vector database - entity_id = compute_mdhash_id(entity_name, prefix="ent-") - await self.entities_vdb.delete([entity_id]) - - logger.info( - f"Deleted source entity '{entity_name}' and its vector embedding from database" - ) - - # 10. Save changes - await self._merge_entities_done() - - logger.info( - f"Successfully merged {len(source_entities)} entities into '{target_entity}'" - ) - return await self.get_entity_info(target_entity, include_vector_data=True) - - except Exception as e: - logger.error(f"Error merging entities: {e}") - raise + def merge_entities( + self, + source_entities: list[str], + target_entity: str, + merge_strategy: dict[str, str] = None, + target_entity_data: dict[str, Any] = None, + ) -> dict[str, Any]: + loop = always_get_an_event_loop() + return loop.run_until_complete( + self.amerge_entities( + source_entities, target_entity, merge_strategy, target_entity_data + ) + ) async def aexport_data( self, @@ -2707,275 +2017,16 @@ class LightRAG: - table: Print formatted tables to console include_vector_data: Whether to include data from the vector database. """ - # Collect data - entities_data = [] - relations_data = [] - relationships_data = [] - - # --- Entities --- - all_entities = await self.chunk_entity_relation_graph.get_all_labels() - for entity_name in all_entities: - entity_info = await self.get_entity_info( - entity_name, include_vector_data=include_vector_data - ) - entity_row = { - "entity_name": entity_name, - "source_id": entity_info["source_id"], - "graph_data": str( - entity_info["graph_data"] - ), # Convert to string to ensure compatibility - } - if include_vector_data and "vector_data" in entity_info: - entity_row["vector_data"] = str(entity_info["vector_data"]) - entities_data.append(entity_row) - - # --- Relations --- - for src_entity in all_entities: - for tgt_entity in all_entities: - if src_entity == tgt_entity: - continue - - edge_exists = await self.chunk_entity_relation_graph.has_edge( - src_entity, tgt_entity - ) - if edge_exists: - relation_info = await self.get_relation_info( - src_entity, tgt_entity, include_vector_data=include_vector_data - ) - relation_row = { - "src_entity": src_entity, - "tgt_entity": tgt_entity, - "source_id": relation_info["source_id"], - "graph_data": str( - relation_info["graph_data"] - ), # Convert to string - } - if include_vector_data and "vector_data" in relation_info: - relation_row["vector_data"] = str(relation_info["vector_data"]) - relations_data.append(relation_row) - - # --- Relationships (from VectorDB) --- - all_relationships = await self.relationships_vdb.client_storage - for rel in all_relationships["data"]: - relationships_data.append( - { - "relationship_id": rel["__id__"], - "data": str(rel), # Convert to string for compatibility - } - ) - - # Export based on format - if file_format == "csv": - # CSV export - with open(output_path, "w", newline="", encoding="utf-8") as csvfile: - # Entities - if entities_data: - csvfile.write("# ENTITIES\n") - writer = csv.DictWriter(csvfile, fieldnames=entities_data[0].keys()) - writer.writeheader() - writer.writerows(entities_data) - csvfile.write("\n\n") - - # Relations - if relations_data: - csvfile.write("# RELATIONS\n") - writer = csv.DictWriter( - csvfile, fieldnames=relations_data[0].keys() - ) - writer.writeheader() - writer.writerows(relations_data) - csvfile.write("\n\n") - - # Relationships - if relationships_data: - csvfile.write("# RELATIONSHIPS\n") - writer = csv.DictWriter( - csvfile, fieldnames=relationships_data[0].keys() - ) - writer.writeheader() - writer.writerows(relationships_data) - - elif file_format == "excel": - # Excel export - entities_df = ( - pd.DataFrame(entities_data) if entities_data else pd.DataFrame() - ) - relations_df = ( - pd.DataFrame(relations_data) if relations_data else pd.DataFrame() - ) - relationships_df = ( - pd.DataFrame(relationships_data) - if relationships_data - else pd.DataFrame() - ) - - with pd.ExcelWriter(output_path, engine="xlsxwriter") as writer: - if not entities_df.empty: - entities_df.to_excel(writer, sheet_name="Entities", index=False) - if not relations_df.empty: - relations_df.to_excel(writer, sheet_name="Relations", index=False) - if not relationships_df.empty: - relationships_df.to_excel( - writer, sheet_name="Relationships", index=False - ) - - elif file_format == "md": - # Markdown export - with open(output_path, "w", encoding="utf-8") as mdfile: - mdfile.write("# LightRAG Data Export\n\n") - - # Entities - mdfile.write("## Entities\n\n") - if entities_data: - # Write header - mdfile.write("| " + " | ".join(entities_data[0].keys()) + " |\n") - mdfile.write( - "| " - + " | ".join(["---"] * len(entities_data[0].keys())) - + " |\n" - ) - - # Write rows - for entity in entities_data: - mdfile.write( - "| " + " | ".join(str(v) for v in entity.values()) + " |\n" - ) - mdfile.write("\n\n") - else: - mdfile.write("*No entity data available*\n\n") - - # Relations - mdfile.write("## Relations\n\n") - if relations_data: - # Write header - mdfile.write("| " + " | ".join(relations_data[0].keys()) + " |\n") - mdfile.write( - "| " - + " | ".join(["---"] * len(relations_data[0].keys())) - + " |\n" - ) - - # Write rows - for relation in relations_data: - mdfile.write( - "| " - + " | ".join(str(v) for v in relation.values()) - + " |\n" - ) - mdfile.write("\n\n") - else: - mdfile.write("*No relation data available*\n\n") - - # Relationships - mdfile.write("## Relationships\n\n") - if relationships_data: - # Write header - mdfile.write( - "| " + " | ".join(relationships_data[0].keys()) + " |\n" - ) - mdfile.write( - "| " - + " | ".join(["---"] * len(relationships_data[0].keys())) - + " |\n" - ) - - # Write rows - for relationship in relationships_data: - mdfile.write( - "| " - + " | ".join(str(v) for v in relationship.values()) - + " |\n" - ) - else: - mdfile.write("*No relationship data available*\n\n") - - elif file_format == "txt": - # Plain text export - with open(output_path, "w", encoding="utf-8") as txtfile: - txtfile.write("LIGHTRAG DATA EXPORT\n") - txtfile.write("=" * 80 + "\n\n") - - # Entities - txtfile.write("ENTITIES\n") - txtfile.write("-" * 80 + "\n") - if entities_data: - # Create fixed width columns - col_widths = { - k: max(len(k), max(len(str(e[k])) for e in entities_data)) - for k in entities_data[0] - } - header = " ".join(k.ljust(col_widths[k]) for k in entities_data[0]) - txtfile.write(header + "\n") - txtfile.write("-" * len(header) + "\n") - - # Write rows - for entity in entities_data: - row = " ".join( - str(v).ljust(col_widths[k]) for k, v in entity.items() - ) - txtfile.write(row + "\n") - txtfile.write("\n\n") - else: - txtfile.write("No entity data available\n\n") - - # Relations - txtfile.write("RELATIONS\n") - txtfile.write("-" * 80 + "\n") - if relations_data: - # Create fixed width columns - col_widths = { - k: max(len(k), max(len(str(r[k])) for r in relations_data)) - for k in relations_data[0] - } - header = " ".join( - k.ljust(col_widths[k]) for k in relations_data[0] - ) - txtfile.write(header + "\n") - txtfile.write("-" * len(header) + "\n") - - # Write rows - for relation in relations_data: - row = " ".join( - str(v).ljust(col_widths[k]) for k, v in relation.items() - ) - txtfile.write(row + "\n") - txtfile.write("\n\n") - else: - txtfile.write("No relation data available\n\n") - - # Relationships - txtfile.write("RELATIONSHIPS\n") - txtfile.write("-" * 80 + "\n") - if relationships_data: - # Create fixed width columns - col_widths = { - k: max(len(k), max(len(str(r[k])) for r in relationships_data)) - for k in relationships_data[0] - } - header = " ".join( - k.ljust(col_widths[k]) for k in relationships_data[0] - ) - txtfile.write(header + "\n") - txtfile.write("-" * len(header) + "\n") - - # Write rows - for relationship in relationships_data: - row = " ".join( - str(v).ljust(col_widths[k]) for k, v in relationship.items() - ) - txtfile.write(row + "\n") - else: - txtfile.write("No relationship data available\n\n") - - else: - raise ValueError( - f"Unsupported file format: {file_format}. " - f"Choose from: csv, excel, md, txt" - ) - if file_format is not None: - print(f"Data exported to: {output_path} with format: {file_format}") - else: - print("Data displayed as table format") + from .utils import aexport_data as utils_aexport_data + + await utils_aexport_data( + self.chunk_entity_relation_graph, + self.entities_vdb, + self.relationships_vdb, + output_path, + file_format, + include_vector_data + ) def export_data( self, @@ -3004,153 +2055,3 @@ class LightRAG: loop.run_until_complete( self.aexport_data(output_path, file_format, include_vector_data) ) - - def merge_entities( - self, - source_entities: list[str], - target_entity: str, - merge_strategy: dict[str, str] = None, - target_entity_data: dict[str, Any] = None, - ) -> dict[str, Any]: - """Synchronously merge multiple entities into one entity. - - Merges multiple source entities into a target entity, handling all relationships, - and updating both the knowledge graph and vector database. - - Args: - source_entities: List of source entity names to merge - target_entity: Name of the target entity after merging - merge_strategy: Merge strategy configuration, e.g. {"description": "concatenate", "entity_type": "keep_first"} - target_entity_data: Dictionary of specific values to set for the target entity, - overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"} - - Returns: - Dictionary containing the merged entity information - """ - loop = always_get_an_event_loop() - return loop.run_until_complete( - self.amerge_entities( - source_entities, target_entity, merge_strategy, target_entity_data - ) - ) - - def _merge_entity_attributes( - self, entity_data_list: list[dict[str, Any]], merge_strategy: dict[str, str] - ) -> dict[str, Any]: - """Merge attributes from multiple entities. - - Args: - entity_data_list: List of dictionaries containing entity data - merge_strategy: Merge strategy for each field - - Returns: - Dictionary containing merged entity data - """ - merged_data = {} - - # Collect all possible keys - all_keys = set() - for data in entity_data_list: - all_keys.update(data.keys()) - - # Merge values for each key - for key in all_keys: - # Get all values for this key - values = [data.get(key) for data in entity_data_list if data.get(key)] - - if not values: - continue - - # Merge values according to strategy - strategy = merge_strategy.get(key, "keep_first") - - if strategy == "concatenate": - merged_data[key] = "\n\n".join(values) - elif strategy == "keep_first": - merged_data[key] = values[0] - elif strategy == "keep_last": - merged_data[key] = values[-1] - elif strategy == "join_unique": - # Handle fields separated by GRAPH_FIELD_SEP - unique_items = set() - for value in values: - items = value.split(GRAPH_FIELD_SEP) - unique_items.update(items) - merged_data[key] = GRAPH_FIELD_SEP.join(unique_items) - else: - # Default strategy - merged_data[key] = values[0] - - return merged_data - - def _merge_relation_attributes( - self, relation_data_list: list[dict[str, Any]], merge_strategy: dict[str, str] - ) -> dict[str, Any]: - """Merge attributes from multiple relationships. - - Args: - relation_data_list: List of dictionaries containing relationship data - merge_strategy: Merge strategy for each field - - Returns: - Dictionary containing merged relationship data - """ - merged_data = {} - - # Collect all possible keys - all_keys = set() - for data in relation_data_list: - all_keys.update(data.keys()) - - # Merge values for each key - for key in all_keys: - # Get all values for this key - values = [ - data.get(key) - for data in relation_data_list - if data.get(key) is not None - ] - - if not values: - continue - - # Merge values according to strategy - strategy = merge_strategy.get(key, "keep_first") - - if strategy == "concatenate": - merged_data[key] = "\n\n".join(str(v) for v in values) - elif strategy == "keep_first": - merged_data[key] = values[0] - elif strategy == "keep_last": - merged_data[key] = values[-1] - elif strategy == "join_unique": - # Handle fields separated by GRAPH_FIELD_SEP - unique_items = set() - for value in values: - items = str(value).split(GRAPH_FIELD_SEP) - unique_items.update(items) - merged_data[key] = GRAPH_FIELD_SEP.join(unique_items) - elif strategy == "max": - # For numeric fields like weight - try: - merged_data[key] = max(float(v) for v in values) - except (ValueError, TypeError): - merged_data[key] = values[0] - else: - # Default strategy - merged_data[key] = values[0] - - return merged_data - - async def _merge_entities_done(self) -> None: - """Callback after entity merging is complete, ensures updates are persisted""" - await asyncio.gather( - *[ - cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in [ # type: ignore - self.entities_vdb, - self.relationships_vdb, - self.chunk_entity_relation_graph, - ] - ] - ) diff --git a/lightrag/utils.py b/lightrag/utils.py index 43d82196..18d1c7aa 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -893,6 +893,365 @@ def always_get_an_event_loop() -> asyncio.AbstractEventLoop: return new_loop +async def aexport_data( + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + output_path: str, + file_format: str = "csv", + include_vector_data: bool = False, +) -> None: + """ + Asynchronously exports all entities, relations, and relationships to various formats. + + Args: + chunk_entity_relation_graph: Graph storage instance for entities and relations + entities_vdb: Vector database storage for entities + relationships_vdb: Vector database storage for relationships + output_path: The path to the output file (including extension). + file_format: Output format - "csv", "excel", "md", "txt". + - csv: Comma-separated values file + - excel: Microsoft Excel file with multiple sheets + - md: Markdown tables + - txt: Plain text formatted output + include_vector_data: Whether to include data from the vector database. + """ + # Collect data + entities_data = [] + relations_data = [] + relationships_data = [] + + # --- Entities --- + all_entities = await chunk_entity_relation_graph.get_all_labels() + for entity_name in all_entities: + # Get entity information from graph + node_data = await chunk_entity_relation_graph.get_node(entity_name) + source_id = node_data.get("source_id") if node_data else None + + entity_info = { + "graph_data": node_data, + "source_id": source_id, + } + + # Optional: Get vector database information + if include_vector_data: + entity_id = compute_mdhash_id(entity_name, prefix="ent-") + vector_data = await entities_vdb.get_by_id(entity_id) + entity_info["vector_data"] = vector_data + + entity_row = { + "entity_name": entity_name, + "source_id": source_id, + "graph_data": str(entity_info["graph_data"]), # Convert to string to ensure compatibility + } + if include_vector_data and "vector_data" in entity_info: + entity_row["vector_data"] = str(entity_info["vector_data"]) + entities_data.append(entity_row) + + # --- Relations --- + for src_entity in all_entities: + for tgt_entity in all_entities: + if src_entity == tgt_entity: + continue + + edge_exists = await chunk_entity_relation_graph.has_edge( + src_entity, tgt_entity + ) + if edge_exists: + # Get edge information from graph + edge_data = await chunk_entity_relation_graph.get_edge( + src_entity, tgt_entity + ) + source_id = edge_data.get("source_id") if edge_data else None + + relation_info = { + "graph_data": edge_data, + "source_id": source_id, + } + + # Optional: Get vector database information + if include_vector_data: + rel_id = compute_mdhash_id(src_entity + tgt_entity, prefix="rel-") + vector_data = await relationships_vdb.get_by_id(rel_id) + relation_info["vector_data"] = vector_data + + relation_row = { + "src_entity": src_entity, + "tgt_entity": tgt_entity, + "source_id": relation_info["source_id"], + "graph_data": str(relation_info["graph_data"]), # Convert to string + } + if include_vector_data and "vector_data" in relation_info: + relation_row["vector_data"] = str(relation_info["vector_data"]) + relations_data.append(relation_row) + + # --- Relationships (from VectorDB) --- + all_relationships = await relationships_vdb.client_storage + for rel in all_relationships["data"]: + relationships_data.append( + { + "relationship_id": rel["__id__"], + "data": str(rel), # Convert to string for compatibility + } + ) + + # Export based on format + if file_format == "csv": + # CSV export + with open(output_path, "w", newline="", encoding="utf-8") as csvfile: + # Entities + if entities_data: + csvfile.write("# ENTITIES\n") + writer = csv.DictWriter(csvfile, fieldnames=entities_data[0].keys()) + writer.writeheader() + writer.writerows(entities_data) + csvfile.write("\n\n") + + # Relations + if relations_data: + csvfile.write("# RELATIONS\n") + writer = csv.DictWriter( + csvfile, fieldnames=relations_data[0].keys() + ) + writer.writeheader() + writer.writerows(relations_data) + csvfile.write("\n\n") + + # Relationships + if relationships_data: + csvfile.write("# RELATIONSHIPS\n") + writer = csv.DictWriter( + csvfile, fieldnames=relationships_data[0].keys() + ) + writer.writeheader() + writer.writerows(relationships_data) + + elif file_format == "excel": + # Excel export + import pandas as pd + + entities_df = ( + pd.DataFrame(entities_data) if entities_data else pd.DataFrame() + ) + relations_df = ( + pd.DataFrame(relations_data) if relations_data else pd.DataFrame() + ) + relationships_df = ( + pd.DataFrame(relationships_data) + if relationships_data + else pd.DataFrame() + ) + + with pd.ExcelWriter(output_path, engine="xlsxwriter") as writer: + if not entities_df.empty: + entities_df.to_excel(writer, sheet_name="Entities", index=False) + if not relations_df.empty: + relations_df.to_excel(writer, sheet_name="Relations", index=False) + if not relationships_df.empty: + relationships_df.to_excel( + writer, sheet_name="Relationships", index=False + ) + + elif file_format == "md": + # Markdown export + with open(output_path, "w", encoding="utf-8") as mdfile: + mdfile.write("# LightRAG Data Export\n\n") + + # Entities + mdfile.write("## Entities\n\n") + if entities_data: + # Write header + mdfile.write("| " + " | ".join(entities_data[0].keys()) + " |\n") + mdfile.write( + "| " + + " | ".join(["---"] * len(entities_data[0].keys())) + + " |\n" + ) + + # Write rows + for entity in entities_data: + mdfile.write( + "| " + " | ".join(str(v) for v in entity.values()) + " |\n" + ) + mdfile.write("\n\n") + else: + mdfile.write("*No entity data available*\n\n") + + # Relations + mdfile.write("## Relations\n\n") + if relations_data: + # Write header + mdfile.write("| " + " | ".join(relations_data[0].keys()) + " |\n") + mdfile.write( + "| " + + " | ".join(["---"] * len(relations_data[0].keys())) + + " |\n" + ) + + # Write rows + for relation in relations_data: + mdfile.write( + "| " + + " | ".join(str(v) for v in relation.values()) + + " |\n" + ) + mdfile.write("\n\n") + else: + mdfile.write("*No relation data available*\n\n") + + # Relationships + mdfile.write("## Relationships\n\n") + if relationships_data: + # Write header + mdfile.write( + "| " + " | ".join(relationships_data[0].keys()) + " |\n" + ) + mdfile.write( + "| " + + " | ".join(["---"] * len(relationships_data[0].keys())) + + " |\n" + ) + + # Write rows + for relationship in relationships_data: + mdfile.write( + "| " + + " | ".join(str(v) for v in relationship.values()) + + " |\n" + ) + else: + mdfile.write("*No relationship data available*\n\n") + + elif file_format == "txt": + # Plain text export + with open(output_path, "w", encoding="utf-8") as txtfile: + txtfile.write("LIGHTRAG DATA EXPORT\n") + txtfile.write("=" * 80 + "\n\n") + + # Entities + txtfile.write("ENTITIES\n") + txtfile.write("-" * 80 + "\n") + if entities_data: + # Create fixed width columns + col_widths = { + k: max(len(k), max(len(str(e[k])) for e in entities_data)) + for k in entities_data[0] + } + header = " ".join(k.ljust(col_widths[k]) for k in entities_data[0]) + txtfile.write(header + "\n") + txtfile.write("-" * len(header) + "\n") + + # Write rows + for entity in entities_data: + row = " ".join( + str(v).ljust(col_widths[k]) for k, v in entity.items() + ) + txtfile.write(row + "\n") + txtfile.write("\n\n") + else: + txtfile.write("No entity data available\n\n") + + # Relations + txtfile.write("RELATIONS\n") + txtfile.write("-" * 80 + "\n") + if relations_data: + # Create fixed width columns + col_widths = { + k: max(len(k), max(len(str(r[k])) for r in relations_data)) + for k in relations_data[0] + } + header = " ".join( + k.ljust(col_widths[k]) for k in relations_data[0] + ) + txtfile.write(header + "\n") + txtfile.write("-" * len(header) + "\n") + + # Write rows + for relation in relations_data: + row = " ".join( + str(v).ljust(col_widths[k]) for k, v in relation.items() + ) + txtfile.write(row + "\n") + txtfile.write("\n\n") + else: + txtfile.write("No relation data available\n\n") + + # Relationships + txtfile.write("RELATIONSHIPS\n") + txtfile.write("-" * 80 + "\n") + if relationships_data: + # Create fixed width columns + col_widths = { + k: max(len(k), max(len(str(r[k])) for r in relationships_data)) + for k in relationships_data[0] + } + header = " ".join( + k.ljust(col_widths[k]) for k in relationships_data[0] + ) + txtfile.write(header + "\n") + txtfile.write("-" * len(header) + "\n") + + # Write rows + for relationship in relationships_data: + row = " ".join( + str(v).ljust(col_widths[k]) for k, v in relationship.items() + ) + txtfile.write(row + "\n") + else: + txtfile.write("No relationship data available\n\n") + + else: + raise ValueError( + f"Unsupported file format: {file_format}. " + f"Choose from: csv, excel, md, txt" + ) + if file_format is not None: + print(f"Data exported to: {output_path} with format: {file_format}") + else: + print("Data displayed as table format") + + +def export_data( + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + output_path: str, + file_format: str = "csv", + include_vector_data: bool = False, +) -> None: + """ + Synchronously exports all entities, relations, and relationships to various formats. + + Args: + chunk_entity_relation_graph: Graph storage instance for entities and relations + entities_vdb: Vector database storage for entities + relationships_vdb: Vector database storage for relationships + output_path: The path to the output file (including extension). + file_format: Output format - "csv", "excel", "md", "txt". + - csv: Comma-separated values file + - excel: Microsoft Excel file with multiple sheets + - md: Markdown tables + - txt: Plain text formatted output + include_vector_data: Whether to include data from the vector database. + """ + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + loop.run_until_complete( + aexport_data( + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + output_path, + file_format, + include_vector_data + ) + ) + + def lazy_external_import(module_name: str, class_name: str) -> Callable[..., Any]: """Lazily import a class from an external module based on the package of the caller.""" # Get the caller's module and package diff --git a/lightrag/utils_graph.py b/lightrag/utils_graph.py new file mode 100644 index 00000000..3382dd41 --- /dev/null +++ b/lightrag/utils_graph.py @@ -0,0 +1,1031 @@ +from __future__ import annotations + +import asyncio +from typing import Any, cast + +from .kg.shared_storage import get_graph_db_lock +from .prompt import GRAPH_FIELD_SEP +from .utils import compute_mdhash_id, logger, StorageNameSpace + +async def adelete_by_entity( + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + entity_name: str +) -> None: + """Asynchronously delete an entity and all its relationships. + + Args: + chunk_entity_relation_graph: Graph storage instance + entities_vdb: Vector database storage for entities + relationships_vdb: Vector database storage for relationships + entity_name: Name of the entity to delete + """ + graph_db_lock = get_graph_db_lock(enable_logging=False) + # Use graph database lock to ensure atomic graph and vector db operations + async with graph_db_lock: + try: + await entities_vdb.delete_entity(entity_name) + await relationships_vdb.delete_entity_relation(entity_name) + await chunk_entity_relation_graph.delete_node(entity_name) + + logger.info( + f"Entity '{entity_name}' and its relationships have been deleted." + ) + await _delete_by_entity_done(entities_vdb, relationships_vdb, chunk_entity_relation_graph) + except Exception as e: + logger.error(f"Error while deleting entity '{entity_name}': {e}") + +async def _delete_by_entity_done(entities_vdb, relationships_vdb, chunk_entity_relation_graph) -> None: + """Callback after entity deletion is complete, ensures updates are persisted""" + await asyncio.gather( + *[ + cast(StorageNameSpace, storage_inst).index_done_callback() + for storage_inst in [ # type: ignore + entities_vdb, + relationships_vdb, + chunk_entity_relation_graph, + ] + ] + ) + +async def adelete_by_relation( + chunk_entity_relation_graph, + relationships_vdb, + source_entity: str, + target_entity: str +) -> None: + """Asynchronously delete a relation between two entities. + + Args: + chunk_entity_relation_graph: Graph storage instance + relationships_vdb: Vector database storage for relationships + source_entity: Name of the source entity + target_entity: Name of the target entity + """ + graph_db_lock = get_graph_db_lock(enable_logging=False) + # Use graph database lock to ensure atomic graph and vector db operations + async with graph_db_lock: + try: + # Check if the relation exists + edge_exists = await chunk_entity_relation_graph.has_edge( + source_entity, target_entity + ) + if not edge_exists: + logger.warning( + f"Relation from '{source_entity}' to '{target_entity}' does not exist" + ) + return + + # Delete relation from vector database + relation_id = compute_mdhash_id( + source_entity + target_entity, prefix="rel-" + ) + await relationships_vdb.delete([relation_id]) + + # Delete relation from knowledge graph + await chunk_entity_relation_graph.remove_edges( + [(source_entity, target_entity)] + ) + + logger.info( + f"Successfully deleted relation from '{source_entity}' to '{target_entity}'" + ) + await _delete_relation_done(relationships_vdb, chunk_entity_relation_graph) + except Exception as e: + logger.error( + f"Error while deleting relation from '{source_entity}' to '{target_entity}': {e}" + ) + +async def _delete_relation_done(relationships_vdb, chunk_entity_relation_graph) -> None: + """Callback after relation deletion is complete, ensures updates are persisted""" + await asyncio.gather( + *[ + cast(StorageNameSpace, storage_inst).index_done_callback() + for storage_inst in [ # type: ignore + relationships_vdb, + chunk_entity_relation_graph, + ] + ] + ) + +async def aedit_entity( + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + entity_name: str, + updated_data: dict[str, str], + allow_rename: bool = True +) -> dict[str, Any]: + """Asynchronously edit entity information. + + Updates entity information in the knowledge graph and re-embeds the entity in the vector database. + + Args: + chunk_entity_relation_graph: Graph storage instance + entities_vdb: Vector database storage for entities + relationships_vdb: Vector database storage for relationships + entity_name: Name of the entity to edit + updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"} + allow_rename: Whether to allow entity renaming, defaults to True + + Returns: + Dictionary containing updated entity information + """ + graph_db_lock = get_graph_db_lock(enable_logging=False) + # Use graph database lock to ensure atomic graph and vector db operations + async with graph_db_lock: + try: + # 1. Get current entity information + node_exists = await chunk_entity_relation_graph.has_node(entity_name) + if not node_exists: + raise ValueError(f"Entity '{entity_name}' does not exist") + node_data = await chunk_entity_relation_graph.get_node(entity_name) + + # Check if entity is being renamed + new_entity_name = updated_data.get("entity_name", entity_name) + is_renaming = new_entity_name != entity_name + + # If renaming, check if new name already exists + if is_renaming: + if not allow_rename: + raise ValueError( + "Entity renaming is not allowed. Set allow_rename=True to enable this feature" + ) + + existing_node = await chunk_entity_relation_graph.has_node( + new_entity_name + ) + if existing_node: + raise ValueError( + f"Entity name '{new_entity_name}' already exists, cannot rename" + ) + + # 2. Update entity information in the graph + new_node_data = {**node_data, **updated_data} + new_node_data["entity_id"] = new_entity_name + + if "entity_name" in new_node_data: + del new_node_data[ + "entity_name" + ] # Node data should not contain entity_name field + + # If renaming entity + if is_renaming: + logger.info(f"Renaming entity '{entity_name}' to '{new_entity_name}'") + + # Create new entity + await chunk_entity_relation_graph.upsert_node( + new_entity_name, new_node_data + ) + + # Store relationships that need to be updated + relations_to_update = [] + relations_to_delete = [] + # Get all edges related to the original entity + edges = await chunk_entity_relation_graph.get_node_edges( + entity_name + ) + if edges: + # Recreate edges for the new entity + for source, target in edges: + edge_data = await chunk_entity_relation_graph.get_edge( + source, target + ) + if edge_data: + relations_to_delete.append( + compute_mdhash_id(source + target, prefix="rel-") + ) + relations_to_delete.append( + compute_mdhash_id(target + source, prefix="rel-") + ) + if source == entity_name: + await chunk_entity_relation_graph.upsert_edge( + new_entity_name, target, edge_data + ) + relations_to_update.append( + (new_entity_name, target, edge_data) + ) + else: # target == entity_name + await chunk_entity_relation_graph.upsert_edge( + source, new_entity_name, edge_data + ) + relations_to_update.append( + (source, new_entity_name, edge_data) + ) + + # Delete old entity + await chunk_entity_relation_graph.delete_node(entity_name) + + # Delete old entity record from vector database + old_entity_id = compute_mdhash_id(entity_name, prefix="ent-") + await entities_vdb.delete([old_entity_id]) + logger.info( + f"Deleted old entity '{entity_name}' and its vector embedding from database" + ) + + # Delete old relation records from vector database + await relationships_vdb.delete(relations_to_delete) + logger.info( + f"Deleted {len(relations_to_delete)} relation records for entity '{entity_name}' from vector database" + ) + + # Update relationship vector representations + for src, tgt, edge_data in relations_to_update: + description = edge_data.get("description", "") + keywords = edge_data.get("keywords", "") + source_id = edge_data.get("source_id", "") + weight = float(edge_data.get("weight", 1.0)) + + # Create new content for embedding + content = f"{src}\t{tgt}\n{keywords}\n{description}" + + # Calculate relationship ID + relation_id = compute_mdhash_id(src + tgt, prefix="rel-") + + # Prepare data for vector database update + relation_data = { + relation_id: { + "content": content, + "src_id": src, + "tgt_id": tgt, + "source_id": source_id, + "description": description, + "keywords": keywords, + "weight": weight, + } + } + + # Update vector database + await relationships_vdb.upsert(relation_data) + + # Update working entity name to new name + entity_name = new_entity_name + else: + # If not renaming, directly update node data + await chunk_entity_relation_graph.upsert_node( + entity_name, new_node_data + ) + + # 3. Recalculate entity's vector representation and update vector database + description = new_node_data.get("description", "") + source_id = new_node_data.get("source_id", "") + entity_type = new_node_data.get("entity_type", "") + content = entity_name + "\n" + description + + # Calculate entity ID + entity_id = compute_mdhash_id(entity_name, prefix="ent-") + + # Prepare data for vector database update + entity_data = { + entity_id: { + "content": content, + "entity_name": entity_name, + "source_id": source_id, + "description": description, + "entity_type": entity_type, + } + } + + # Update vector database + await entities_vdb.upsert(entity_data) + + # 4. Save changes + await _edit_entity_done(entities_vdb, relationships_vdb, chunk_entity_relation_graph) + + logger.info(f"Entity '{entity_name}' successfully updated") + return await get_entity_info(chunk_entity_relation_graph, entities_vdb, entity_name, include_vector_data=True) + except Exception as e: + logger.error(f"Error while editing entity '{entity_name}': {e}") + raise + +async def _edit_entity_done(entities_vdb, relationships_vdb, chunk_entity_relation_graph) -> None: + """Callback after entity editing is complete, ensures updates are persisted""" + await asyncio.gather( + *[ + cast(StorageNameSpace, storage_inst).index_done_callback() + for storage_inst in [ # type: ignore + entities_vdb, + relationships_vdb, + chunk_entity_relation_graph, + ] + ] + ) + +async def aedit_relation( + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + source_entity: str, + target_entity: str, + updated_data: dict[str, Any] +) -> dict[str, Any]: + """Asynchronously edit relation information. + + Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database. + + Args: + chunk_entity_relation_graph: Graph storage instance + entities_vdb: Vector database storage for entities + relationships_vdb: Vector database storage for relationships + source_entity: Name of the source entity + target_entity: Name of the target entity + updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "keywords": "new keywords"} + + Returns: + Dictionary containing updated relation information + """ + graph_db_lock = get_graph_db_lock(enable_logging=False) + # Use graph database lock to ensure atomic graph and vector db operations + async with graph_db_lock: + try: + # 1. Get current relation information + edge_exists = await chunk_entity_relation_graph.has_edge( + source_entity, target_entity + ) + if not edge_exists: + raise ValueError( + f"Relation from '{source_entity}' to '{target_entity}' does not exist" + ) + edge_data = await chunk_entity_relation_graph.get_edge( + source_entity, target_entity + ) + # Important: First delete the old relation record from the vector database + old_relation_id = compute_mdhash_id( + source_entity + target_entity, prefix="rel-" + ) + await relationships_vdb.delete([old_relation_id]) + logger.info( + f"Deleted old relation record from vector database for relation {source_entity} -> {target_entity}" + ) + + # 2. Update relation information in the graph + new_edge_data = {**edge_data, **updated_data} + await chunk_entity_relation_graph.upsert_edge( + source_entity, target_entity, new_edge_data + ) + + # 3. Recalculate relation's vector representation and update vector database + description = new_edge_data.get("description", "") + keywords = new_edge_data.get("keywords", "") + source_id = new_edge_data.get("source_id", "") + weight = float(new_edge_data.get("weight", 1.0)) + + # Create content for embedding + content = f"{source_entity}\t{target_entity}\n{keywords}\n{description}" + + # Calculate relation ID + relation_id = compute_mdhash_id( + source_entity + target_entity, prefix="rel-" + ) + + # Prepare data for vector database update + relation_data = { + relation_id: { + "content": content, + "src_id": source_entity, + "tgt_id": target_entity, + "source_id": source_id, + "description": description, + "keywords": keywords, + "weight": weight, + } + } + + # Update vector database + await relationships_vdb.upsert(relation_data) + + # 4. Save changes + await _edit_relation_done(relationships_vdb, chunk_entity_relation_graph) + + logger.info( + f"Relation from '{source_entity}' to '{target_entity}' successfully updated" + ) + return await get_relation_info( + chunk_entity_relation_graph, relationships_vdb, source_entity, target_entity, include_vector_data=True + ) + except Exception as e: + logger.error( + f"Error while editing relation from '{source_entity}' to '{target_entity}': {e}" + ) + raise + +async def _edit_relation_done(relationships_vdb, chunk_entity_relation_graph) -> None: + """Callback after relation editing is complete, ensures updates are persisted""" + await asyncio.gather( + *[ + cast(StorageNameSpace, storage_inst).index_done_callback() + for storage_inst in [ # type: ignore + relationships_vdb, + chunk_entity_relation_graph, + ] + ] + ) + +async def acreate_entity( + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + entity_name: str, + entity_data: dict[str, Any] +) -> dict[str, Any]: + """Asynchronously create a new entity. + + Creates a new entity in the knowledge graph and adds it to the vector database. + + Args: + chunk_entity_relation_graph: Graph storage instance + entities_vdb: Vector database storage for entities + relationships_vdb: Vector database storage for relationships + entity_name: Name of the new entity + entity_data: Dictionary containing entity attributes, e.g. {"description": "description", "entity_type": "type"} + + Returns: + Dictionary containing created entity information + """ + graph_db_lock = get_graph_db_lock(enable_logging=False) + # Use graph database lock to ensure atomic graph and vector db operations + async with graph_db_lock: + try: + # Check if entity already exists + existing_node = await chunk_entity_relation_graph.has_node(entity_name) + if existing_node: + raise ValueError(f"Entity '{entity_name}' already exists") + + # Prepare node data with defaults if missing + node_data = { + "entity_id": entity_name, + "entity_type": entity_data.get("entity_type", "UNKNOWN"), + "description": entity_data.get("description", ""), + "source_id": entity_data.get("source_id", "manual"), + } + + # Add entity to knowledge graph + await chunk_entity_relation_graph.upsert_node(entity_name, node_data) + + # Prepare content for entity + description = node_data.get("description", "") + source_id = node_data.get("source_id", "") + entity_type = node_data.get("entity_type", "") + content = entity_name + "\n" + description + + # Calculate entity ID + entity_id = compute_mdhash_id(entity_name, prefix="ent-") + + # Prepare data for vector database update + entity_data_for_vdb = { + entity_id: { + "content": content, + "entity_name": entity_name, + "source_id": source_id, + "description": description, + "entity_type": entity_type, + } + } + + # Update vector database + await entities_vdb.upsert(entity_data_for_vdb) + + # Save changes + await _edit_entity_done(entities_vdb, relationships_vdb, chunk_entity_relation_graph) + + logger.info(f"Entity '{entity_name}' successfully created") + return await get_entity_info(chunk_entity_relation_graph, entities_vdb, entity_name, include_vector_data=True) + except Exception as e: + logger.error(f"Error while creating entity '{entity_name}': {e}") + raise + +async def acreate_relation( + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + source_entity: str, + target_entity: str, + relation_data: dict[str, Any] +) -> dict[str, Any]: + """Asynchronously create a new relation between entities. + + Creates a new relation (edge) in the knowledge graph and adds it to the vector database. + + Args: + chunk_entity_relation_graph: Graph storage instance + entities_vdb: Vector database storage for entities + relationships_vdb: Vector database storage for relationships + source_entity: Name of the source entity + target_entity: Name of the target entity + relation_data: Dictionary containing relation attributes, e.g. {"description": "description", "keywords": "keywords"} + + Returns: + Dictionary containing created relation information + """ + graph_db_lock = get_graph_db_lock(enable_logging=False) + # Use graph database lock to ensure atomic graph and vector db operations + async with graph_db_lock: + try: + # Check if both entities exist + source_exists = await chunk_entity_relation_graph.has_node( + source_entity + ) + target_exists = await chunk_entity_relation_graph.has_node( + target_entity + ) + + if not source_exists: + raise ValueError(f"Source entity '{source_entity}' does not exist") + if not target_exists: + raise ValueError(f"Target entity '{target_entity}' does not exist") + + # Check if relation already exists + existing_edge = await chunk_entity_relation_graph.has_edge( + source_entity, target_entity + ) + if existing_edge: + raise ValueError( + f"Relation from '{source_entity}' to '{target_entity}' already exists" + ) + + # Prepare edge data with defaults if missing + edge_data = { + "description": relation_data.get("description", ""), + "keywords": relation_data.get("keywords", ""), + "source_id": relation_data.get("source_id", "manual"), + "weight": float(relation_data.get("weight", 1.0)), + } + + # Add relation to knowledge graph + await chunk_entity_relation_graph.upsert_edge( + source_entity, target_entity, edge_data + ) + + # Prepare content for embedding + description = edge_data.get("description", "") + keywords = edge_data.get("keywords", "") + source_id = edge_data.get("source_id", "") + weight = edge_data.get("weight", 1.0) + + # Create content for embedding + content = f"{keywords}\t{source_entity}\n{target_entity}\n{description}" + + # Calculate relation ID + relation_id = compute_mdhash_id( + source_entity + target_entity, prefix="rel-" + ) + + # Prepare data for vector database update + relation_data_for_vdb = { + relation_id: { + "content": content, + "src_id": source_entity, + "tgt_id": target_entity, + "source_id": source_id, + "description": description, + "keywords": keywords, + "weight": weight, + } + } + + # Update vector database + await relationships_vdb.upsert(relation_data_for_vdb) + + # Save changes + await _edit_relation_done(relationships_vdb, chunk_entity_relation_graph) + + logger.info( + f"Relation from '{source_entity}' to '{target_entity}' successfully created" + ) + return await get_relation_info( + chunk_entity_relation_graph, relationships_vdb, source_entity, target_entity, include_vector_data=True + ) + except Exception as e: + logger.error( + f"Error while creating relation from '{source_entity}' to '{target_entity}': {e}" + ) + raise + +async def amerge_entities( + chunk_entity_relation_graph, + entities_vdb, + relationships_vdb, + source_entities: list[str], + target_entity: str, + merge_strategy: dict[str, str] = None, + target_entity_data: dict[str, Any] = None, +) -> dict[str, Any]: + """Asynchronously merge multiple entities into one entity. + + Merges multiple source entities into a target entity, handling all relationships, + and updating both the knowledge graph and vector database. + + Args: + chunk_entity_relation_graph: Graph storage instance + entities_vdb: Vector database storage for entities + relationships_vdb: Vector database storage for relationships + source_entities: List of source entity names to merge + target_entity: Name of the target entity after merging + merge_strategy: Merge strategy configuration, e.g. {"description": "concatenate", "entity_type": "keep_first"} + Supported strategies: + - "concatenate": Concatenate all values (for text fields) + - "keep_first": Keep the first non-empty value + - "keep_last": Keep the last non-empty value + - "join_unique": Join all unique values (for fields separated by delimiter) + target_entity_data: Dictionary of specific values to set for the target entity, + overriding any merged values, e.g. {"description": "custom description", "entity_type": "PERSON"} + + Returns: + Dictionary containing the merged entity information + """ + graph_db_lock = get_graph_db_lock(enable_logging=False) + # Use graph database lock to ensure atomic graph and vector db operations + async with graph_db_lock: + try: + # Default merge strategy + default_strategy = { + "description": "concatenate", + "entity_type": "keep_first", + "source_id": "join_unique", + } + + merge_strategy = ( + default_strategy + if merge_strategy is None + else {**default_strategy, **merge_strategy} + ) + target_entity_data = ( + {} if target_entity_data is None else target_entity_data + ) + + # 1. Check if all source entities exist + source_entities_data = {} + for entity_name in source_entities: + node_exists = await chunk_entity_relation_graph.has_node( + entity_name + ) + if not node_exists: + raise ValueError(f"Source entity '{entity_name}' does not exist") + node_data = await chunk_entity_relation_graph.get_node(entity_name) + source_entities_data[entity_name] = node_data + + # 2. Check if target entity exists and get its data if it does + target_exists = await chunk_entity_relation_graph.has_node( + target_entity + ) + existing_target_entity_data = {} + if target_exists: + existing_target_entity_data = ( + await chunk_entity_relation_graph.get_node(target_entity) + ) + logger.info( + f"Target entity '{target_entity}' already exists, will merge data" + ) + + # 3. Merge entity data + merged_entity_data = _merge_entity_attributes( + list(source_entities_data.values()) + + ([existing_target_entity_data] if target_exists else []), + merge_strategy, + ) + + # Apply any explicitly provided target entity data (overrides merged data) + for key, value in target_entity_data.items(): + merged_entity_data[key] = value + + # 4. Get all relationships of the source entities + all_relations = [] + for entity_name in source_entities: + # Get all relationships of the source entities + edges = await chunk_entity_relation_graph.get_node_edges( + entity_name + ) + if edges: + for src, tgt in edges: + # Ensure src is the current entity + if src == entity_name: + edge_data = await chunk_entity_relation_graph.get_edge( + src, tgt + ) + all_relations.append((src, tgt, edge_data)) + + # 5. Create or update the target entity + merged_entity_data["entity_id"] = target_entity + if not target_exists: + await chunk_entity_relation_graph.upsert_node( + target_entity, merged_entity_data + ) + logger.info(f"Created new target entity '{target_entity}'") + else: + await chunk_entity_relation_graph.upsert_node( + target_entity, merged_entity_data + ) + logger.info(f"Updated existing target entity '{target_entity}'") + + # 6. Recreate all relationships, pointing to the target entity + relation_updates = {} # Track relationships that need to be merged + relations_to_delete = [] + + for src, tgt, edge_data in all_relations: + relations_to_delete.append(compute_mdhash_id(src + tgt, prefix="rel-")) + relations_to_delete.append(compute_mdhash_id(tgt + src, prefix="rel-")) + new_src = target_entity if src in source_entities else src + new_tgt = target_entity if tgt in source_entities else tgt + + # Skip relationships between source entities to avoid self-loops + if new_src == new_tgt: + logger.info( + f"Skipping relationship between source entities: {src} -> {tgt} to avoid self-loop" + ) + continue + + # Check if the same relationship already exists + relation_key = f"{new_src}|{new_tgt}" + if relation_key in relation_updates: + # Merge relationship data + existing_data = relation_updates[relation_key]["data"] + merged_relation = _merge_relation_attributes( + [existing_data, edge_data], + { + "description": "concatenate", + "keywords": "join_unique", + "source_id": "join_unique", + "weight": "max", + }, + ) + relation_updates[relation_key]["data"] = merged_relation + logger.info( + f"Merged duplicate relationship: {new_src} -> {new_tgt}" + ) + else: + relation_updates[relation_key] = { + "src": new_src, + "tgt": new_tgt, + "data": edge_data.copy(), + } + + # Apply relationship updates + for rel_data in relation_updates.values(): + await chunk_entity_relation_graph.upsert_edge( + rel_data["src"], rel_data["tgt"], rel_data["data"] + ) + logger.info( + f"Created or updated relationship: {rel_data['src']} -> {rel_data['tgt']}" + ) + + # Delete relationships records from vector database + await relationships_vdb.delete(relations_to_delete) + logger.info( + f"Deleted {len(relations_to_delete)} relation records for entity from vector database" + ) + + # 7. Update entity vector representation + description = merged_entity_data.get("description", "") + source_id = merged_entity_data.get("source_id", "") + entity_type = merged_entity_data.get("entity_type", "") + content = target_entity + "\n" + description + + entity_id = compute_mdhash_id(target_entity, prefix="ent-") + entity_data_for_vdb = { + entity_id: { + "content": content, + "entity_name": target_entity, + "source_id": source_id, + "description": description, + "entity_type": entity_type, + } + } + + await entities_vdb.upsert(entity_data_for_vdb) + + # 8. Update relationship vector representations + for rel_data in relation_updates.values(): + src = rel_data["src"] + tgt = rel_data["tgt"] + edge_data = rel_data["data"] + + description = edge_data.get("description", "") + keywords = edge_data.get("keywords", "") + source_id = edge_data.get("source_id", "") + weight = float(edge_data.get("weight", 1.0)) + + content = f"{keywords}\t{src}\n{tgt}\n{description}" + relation_id = compute_mdhash_id(src + tgt, prefix="rel-") + + relation_data_for_vdb = { + relation_id: { + "content": content, + "src_id": src, + "tgt_id": tgt, + "source_id": source_id, + "description": description, + "keywords": keywords, + "weight": weight, + } + } + + await relationships_vdb.upsert(relation_data_for_vdb) + + # 9. Delete source entities + for entity_name in source_entities: + if entity_name == target_entity: + logger.info( + f"Skipping deletion of '{entity_name}' as it's also the target entity" + ) + continue + + # Delete entity node from knowledge graph + await chunk_entity_relation_graph.delete_node(entity_name) + + # Delete entity record from vector database + entity_id = compute_mdhash_id(entity_name, prefix="ent-") + await entities_vdb.delete([entity_id]) + + logger.info( + f"Deleted source entity '{entity_name}' and its vector embedding from database" + ) + + # 10. Save changes + await _merge_entities_done(entities_vdb, relationships_vdb, chunk_entity_relation_graph) + + logger.info( + f"Successfully merged {len(source_entities)} entities into '{target_entity}'" + ) + return await get_entity_info(chunk_entity_relation_graph, entities_vdb, target_entity, include_vector_data=True) + + except Exception as e: + logger.error(f"Error merging entities: {e}") + raise + +def _merge_entity_attributes( + entity_data_list: list[dict[str, Any]], merge_strategy: dict[str, str] +) -> dict[str, Any]: + """Merge attributes from multiple entities. + + Args: + entity_data_list: List of dictionaries containing entity data + merge_strategy: Merge strategy for each field + + Returns: + Dictionary containing merged entity data + """ + merged_data = {} + + # Collect all possible keys + all_keys = set() + for data in entity_data_list: + all_keys.update(data.keys()) + + # Merge values for each key + for key in all_keys: + # Get all values for this key + values = [data.get(key) for data in entity_data_list if data.get(key)] + + if not values: + continue + + # Merge values according to strategy + strategy = merge_strategy.get(key, "keep_first") + + if strategy == "concatenate": + merged_data[key] = "\n\n".join(values) + elif strategy == "keep_first": + merged_data[key] = values[0] + elif strategy == "keep_last": + merged_data[key] = values[-1] + elif strategy == "join_unique": + # Handle fields separated by GRAPH_FIELD_SEP + unique_items = set() + for value in values: + items = value.split(GRAPH_FIELD_SEP) + unique_items.update(items) + merged_data[key] = GRAPH_FIELD_SEP.join(unique_items) + else: + # Default strategy + merged_data[key] = values[0] + + return merged_data + +def _merge_relation_attributes( + relation_data_list: list[dict[str, Any]], merge_strategy: dict[str, str] +) -> dict[str, Any]: + """Merge attributes from multiple relationships. + + Args: + relation_data_list: List of dictionaries containing relationship data + merge_strategy: Merge strategy for each field + + Returns: + Dictionary containing merged relationship data + """ + merged_data = {} + + # Collect all possible keys + all_keys = set() + for data in relation_data_list: + all_keys.update(data.keys()) + + # Merge values for each key + for key in all_keys: + # Get all values for this key + values = [ + data.get(key) + for data in relation_data_list + if data.get(key) is not None + ] + + if not values: + continue + + # Merge values according to strategy + strategy = merge_strategy.get(key, "keep_first") + + if strategy == "concatenate": + merged_data[key] = "\n\n".join(str(v) for v in values) + elif strategy == "keep_first": + merged_data[key] = values[0] + elif strategy == "keep_last": + merged_data[key] = values[-1] + elif strategy == "join_unique": + # Handle fields separated by GRAPH_FIELD_SEP + unique_items = set() + for value in values: + items = str(value).split(GRAPH_FIELD_SEP) + unique_items.update(items) + merged_data[key] = GRAPH_FIELD_SEP.join(unique_items) + elif strategy == "max": + # For numeric fields like weight + try: + merged_data[key] = max(float(v) for v in values) + except (ValueError, TypeError): + merged_data[key] = values[0] + else: + # Default strategy + merged_data[key] = values[0] + + return merged_data + +async def _merge_entities_done(entities_vdb, relationships_vdb, chunk_entity_relation_graph) -> None: + """Callback after entity merging is complete, ensures updates are persisted""" + await asyncio.gather( + *[ + cast(StorageNameSpace, storage_inst).index_done_callback() + for storage_inst in [ # type: ignore + entities_vdb, + relationships_vdb, + chunk_entity_relation_graph, + ] + ] + ) + +async def get_entity_info( + chunk_entity_relation_graph, + entities_vdb, + entity_name: str, + include_vector_data: bool = False +) -> dict[str, str | None | dict[str, str]]: + """Get detailed information of an entity""" + + # Get information from the graph + node_data = await chunk_entity_relation_graph.get_node(entity_name) + source_id = node_data.get("source_id") if node_data else None + + result: dict[str, str | None | dict[str, str]] = { + "entity_name": entity_name, + "source_id": source_id, + "graph_data": node_data, + } + + # Optional: Get vector database information + if include_vector_data: + entity_id = compute_mdhash_id(entity_name, prefix="ent-") + vector_data = await entities_vdb.get_by_id(entity_id) + result["vector_data"] = vector_data + + return result + +async def get_relation_info( + chunk_entity_relation_graph, + relationships_vdb, + src_entity: str, + tgt_entity: str, + include_vector_data: bool = False +) -> dict[str, str | None | dict[str, str]]: + """Get detailed information of a relationship""" + + # Get information from the graph + edge_data = await chunk_entity_relation_graph.get_edge( + src_entity, tgt_entity + ) + source_id = edge_data.get("source_id") if edge_data else None + + result: dict[str, str | None | dict[str, str]] = { + "src_entity": src_entity, + "tgt_entity": tgt_entity, + "source_id": source_id, + "graph_data": edge_data, + } + + # Optional: Get vector database information + if include_vector_data: + rel_id = compute_mdhash_id(src_entity + tgt_entity, prefix="rel-") + vector_data = await relationships_vdb.get_by_id(rel_id) + result["vector_data"] = vector_data + + return result