From 1772e7a8870fece1c8c2ad058ebc636babb5c671 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 31 Mar 2025 16:21:20 +0800 Subject: [PATCH] Add delete support to all storage implementation --- lightrag/base.py | 60 +++++++++++++++++--- lightrag/kg/faiss_impl.py | 22 +++++++- lightrag/kg/json_doc_status_impl.py | 12 ++-- lightrag/kg/json_kv_impl.py | 12 ++-- lightrag/kg/nano_vector_db_impl.py | 30 +++++++++- lightrag/kg/networkx_impl.py | 31 +++++++++- lightrag/kg/oracle_impl.py | 27 +++++++++ lightrag/kg/tidb_impl.py | 88 ++++++++++++++++++++++++++++- lightrag/lightrag.py | 6 ++ 9 files changed, 266 insertions(+), 22 deletions(-) diff --git a/lightrag/base.py b/lightrag/base.py index f35440c1..6b9163df 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -153,15 +153,33 @@ class BaseVectorStorage(StorageNameSpace, ABC): @abstractmethod async def upsert(self, data: dict[str, dict[str, Any]]) -> None: - """Insert or update vectors in the storage.""" + """Insert or update vectors in the storage. + + Importance notes for in-memory storage: + 1. Changes will be persisted to disk during the next index_done_callback + 2. Only one process should updating the storage at a time before index_done_callback, + KG-storage-log should be used to avoid data corruption + """ @abstractmethod async def delete_entity(self, entity_name: str) -> None: - """Delete a single entity by its name.""" + """Delete a single entity by its name. + + Importance notes for in-memory storage: + 1. Changes will be persisted to disk during the next index_done_callback + 2. Only one process should updating the storage at a time before index_done_callback, + KG-storage-log should be used to avoid data corruption + """ @abstractmethod async def delete_entity_relation(self, entity_name: str) -> None: - """Delete relations for a given entity.""" + """Delete relations for a given entity. + + Importance notes for in-memory storage: + 1. Changes will be persisted to disk during the next index_done_callback + 2. Only one process should updating the storage at a time before index_done_callback, + KG-storage-log should be used to avoid data corruption + """ @abstractmethod async def get_by_id(self, id: str) -> dict[str, Any] | None: @@ -187,6 +205,19 @@ class BaseVectorStorage(StorageNameSpace, ABC): """ pass + @abstractmethod + async def delete(self, ids: list[str]): + """Delete vectors with specified IDs + + Importance notes for in-memory storage: + 1. Changes will be persisted to disk during the next index_done_callback + 2. Only one process should updating the storage at a time before index_done_callback, + KG-storage-log should be used to avoid data corruption + + Args: + ids: List of vector IDs to be deleted + """ + @dataclass class BaseKVStorage(StorageNameSpace, ABC): @@ -206,16 +237,20 @@ class BaseKVStorage(StorageNameSpace, ABC): @abstractmethod async def upsert(self, data: dict[str, dict[str, Any]]) -> None: - """Upsert data""" + """Upsert data + + Importance notes for in-memory storage: + 1. Changes will be persisted to disk during the next index_done_callback + 2. update flags to notify other processes that data persistence is needed + """ @abstractmethod async def delete(self, ids: list[str]) -> None: """Delete specific records from storage by their IDs - This method will: - 1. Remove the specified records from in-memory storage - 2. For in-memory DB, update flags to notify other processes that data persistence is needed - 3. For in-memory DB, changes will be persisted to disk during the next index_done_callback + Importance notes for in-memory storage: + 1. Changes will be persisted to disk during the next index_done_callback + 2. update flags to notify other processes that data persistence is needed Args: ids (list[str]): List of document IDs to be deleted from storage @@ -267,7 +302,14 @@ class BaseGraphStorage(StorageNameSpace, ABC): async def upsert_edge( self, source_node_id: str, target_node_id: str, edge_data: dict[str, str] ) -> None: - """Delete a node from the graph.""" + """Delete a node from the graph. + + Importance notes for in-memory storage: + 1. Changes will be persisted to disk during the next index_done_callback + 2. Only one process should updating the storage at a time before index_done_callback, + KG-storage-log should be used to avoid data corruption + """ + @abstractmethod async def delete_node(self, node_id: str) -> None: diff --git a/lightrag/kg/faiss_impl.py b/lightrag/kg/faiss_impl.py index 42133090..1e0659cb 100644 --- a/lightrag/kg/faiss_impl.py +++ b/lightrag/kg/faiss_impl.py @@ -217,6 +217,11 @@ class FaissVectorDBStorage(BaseVectorStorage): async def delete(self, ids: list[str]): """ Delete vectors for the provided custom IDs. + + Importance notes: + 1. Changes will be persisted to disk during the next index_done_callback + 2. Only one process should updating the storage at a time before index_done_callback, + KG-storage-log should be used to avoid data corruption """ logger.info(f"Deleting {len(ids)} vectors from {self.namespace}") to_remove = [] @@ -232,13 +237,22 @@ class FaissVectorDBStorage(BaseVectorStorage): ) async def delete_entity(self, entity_name: str) -> None: + """ + Importance notes: + 1. Changes will be persisted to disk during the next index_done_callback + 2. Only one process should updating the storage at a time before index_done_callback, + KG-storage-log should be used to avoid data corruption + """ entity_id = compute_mdhash_id(entity_name, prefix="ent-") logger.debug(f"Attempting to delete entity {entity_name} with ID {entity_id}") await self.delete([entity_id]) async def delete_entity_relation(self, entity_name: str) -> None: """ - Delete relations for a given entity by scanning metadata. + Importance notes: + 1. Changes will be persisted to disk during the next index_done_callback + 2. Only one process should updating the storage at a time before index_done_callback, + KG-storage-log should be used to avoid data corruption """ logger.debug(f"Searching relations for entity {entity_name}") relations = [] @@ -433,6 +447,12 @@ class FaissVectorDBStorage(BaseVectorStorage): async def drop(self) -> dict[str, str]: """Drop all vector data from storage and clean up resources + This method will: + 1. Remove the vector database storage file if it exists + 2. Reinitialize the vector database client + 3. Update flags to notify other processes + 4. Changes is persisted to disk immediately + This method will remove all vectors from the Faiss index and delete the storage files. Returns: diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index bbd0cd8e..33b61eea 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -109,6 +109,11 @@ class JsonDocStatusStorage(DocStatusStorage): await clear_all_update_flags(self.namespace) async def upsert(self, data: dict[str, dict[str, Any]]) -> None: + """ + Importance notes for in-memory storage: + 1. Changes will be persisted to disk during the next index_done_callback + 2. update flags to notify other processes that data persistence is needed + """ if not data: return logger.info(f"Inserting {len(data)} records to {self.namespace}") @@ -125,10 +130,9 @@ class JsonDocStatusStorage(DocStatusStorage): async def delete(self, doc_ids: list[str]): """Delete specific records from storage by their IDs - This method will: - 1. Remove the specified records from in-memory storage - 2. Update flags to notify other processes that data persistence is needed - 3. The changes will be persisted to disk during the next index_done_callback + Importance notes for in-memory storage: + 1. Changes will be persisted to disk during the next index_done_callback + 2. update flags to notify other processes that data persistence is needed Args: ids (list[str]): List of document IDs to be deleted from storage diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index ea4fb51b..4972bf6a 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -114,6 +114,11 @@ class JsonKVStorage(BaseKVStorage): return set(keys) - set(self._data.keys()) async def upsert(self, data: dict[str, dict[str, Any]]) -> None: + """ + Importance notes for in-memory storage: + 1. Changes will be persisted to disk during the next index_done_callback + 2. update flags to notify other processes that data persistence is needed + """ if not data: return logger.info(f"Inserting {len(data)} records to {self.namespace}") @@ -124,10 +129,9 @@ class JsonKVStorage(BaseKVStorage): async def delete(self, ids: list[str]) -> None: """Delete specific records from storage by their IDs - This method will: - 1. Remove the specified records from in-memory storage - 2. Update flags to notify other processes that data persistence is needed - 3. The changes will be persisted to disk during the next index_done_callback + Importance notes for in-memory storage: + 1. Changes will be persisted to disk during the next index_done_callback + 2. update flags to notify other processes that data persistence is needed Args: ids (list[str]): List of document IDs to be deleted from storage diff --git a/lightrag/kg/nano_vector_db_impl.py b/lightrag/kg/nano_vector_db_impl.py index 0f907a42..8c00437d 100644 --- a/lightrag/kg/nano_vector_db_impl.py +++ b/lightrag/kg/nano_vector_db_impl.py @@ -78,6 +78,13 @@ class NanoVectorDBStorage(BaseVectorStorage): return self._client async def upsert(self, data: dict[str, dict[str, Any]]) -> None: + """ + Importance notes: + 1. Changes will be persisted to disk during the next index_done_callback + 2. Only one process should updating the storage at a time before index_done_callback, + KG-storage-log should be used to avoid data corruption + """ + logger.info(f"Inserting {len(data)} to {self.namespace}") if not data: return @@ -146,6 +153,11 @@ class NanoVectorDBStorage(BaseVectorStorage): async def delete(self, ids: list[str]): """Delete vectors with specified IDs + Importance notes: + 1. Changes will be persisted to disk during the next index_done_callback + 2. Only one process should updating the storage at a time before index_done_callback, + KG-storage-log should be used to avoid data corruption + Args: ids: List of vector IDs to be deleted """ @@ -159,6 +171,13 @@ class NanoVectorDBStorage(BaseVectorStorage): logger.error(f"Error while deleting vectors from {self.namespace}: {e}") async def delete_entity(self, entity_name: str) -> None: + """ + Importance notes: + 1. Changes will be persisted to disk during the next index_done_callback + 2. Only one process should updating the storage at a time before index_done_callback, + KG-storage-log should be used to avoid data corruption + """ + try: entity_id = compute_mdhash_id(entity_name, prefix="ent-") logger.debug( @@ -176,6 +195,13 @@ class NanoVectorDBStorage(BaseVectorStorage): logger.error(f"Error deleting entity {entity_name}: {e}") async def delete_entity_relation(self, entity_name: str) -> None: + """ + Importance notes: + 1. Changes will be persisted to disk during the next index_done_callback + 2. Only one process should updating the storage at a time before index_done_callback, + KG-storage-log should be used to avoid data corruption + """ + try: client = await self._get_client() storage = getattr(client, "_NanoVectorDB__storage") @@ -288,7 +314,9 @@ class NanoVectorDBStorage(BaseVectorStorage): 1. Remove the vector database storage file if it exists 2. Reinitialize the vector database client 3. Update flags to notify other processes - 4. Trigger index_done_callback to save the empty state + 4. Changes is persisted to disk immediately + + This method is intended for use in scenarios where all data needs to be removed, Returns: dict[str, str]: Operation status and message diff --git a/lightrag/kg/networkx_impl.py b/lightrag/kg/networkx_impl.py index 99e0e223..0baa72a3 100644 --- a/lightrag/kg/networkx_impl.py +++ b/lightrag/kg/networkx_impl.py @@ -156,16 +156,34 @@ class NetworkXStorage(BaseGraphStorage): return None async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None: + """ + Importance notes: + 1. Changes will be persisted to disk during the next index_done_callback + 2. Only one process should updating the storage at a time before index_done_callback, + KG-storage-log should be used to avoid data corruption + """ graph = await self._get_graph() graph.add_node(node_id, **node_data) async def upsert_edge( self, source_node_id: str, target_node_id: str, edge_data: dict[str, str] ) -> None: + """ + Importance notes: + 1. Changes will be persisted to disk during the next index_done_callback + 2. Only one process should updating the storage at a time before index_done_callback, + KG-storage-log should be used to avoid data corruption + """ graph = await self._get_graph() graph.add_edge(source_node_id, target_node_id, **edge_data) async def delete_node(self, node_id: str) -> None: + """ + Importance notes: + 1. Changes will be persisted to disk during the next index_done_callback + 2. Only one process should updating the storage at a time before index_done_callback, + KG-storage-log should be used to avoid data corruption + """ graph = await self._get_graph() if graph.has_node(node_id): graph.remove_node(node_id) @@ -173,6 +191,7 @@ class NetworkXStorage(BaseGraphStorage): else: logger.warning(f"Node {node_id} not found in the graph for deletion.") + # TODO: NOT USED async def embed_nodes( self, algorithm: str ) -> tuple[np.ndarray[Any, Any], list[str]]: @@ -193,6 +212,11 @@ class NetworkXStorage(BaseGraphStorage): async def remove_nodes(self, nodes: list[str]): """Delete multiple nodes + Importance notes: + 1. Changes will be persisted to disk during the next index_done_callback + 2. Only one process should updating the storage at a time before index_done_callback, + KG-storage-log should be used to avoid data corruption + Args: nodes: List of node IDs to be deleted """ @@ -204,6 +228,11 @@ class NetworkXStorage(BaseGraphStorage): async def remove_edges(self, edges: list[tuple[str, str]]): """Delete multiple edges + Importance notes: + 1. Changes will be persisted to disk during the next index_done_callback + 2. Only one process should updating the storage at a time before index_done_callback, + KG-storage-log should be used to avoid data corruption + Args: edges: List of edges to be deleted, each edge is a (source, target) tuple """ @@ -433,7 +462,7 @@ class NetworkXStorage(BaseGraphStorage): 1. Remove the graph storage file if it exists 2. Reset the graph to an empty state 3. Update flags to notify other processes - 4. Trigger index_done_callback to save the empty state + 4. Changes is persisted to disk immediately Returns: dict[str, str]: Operation status and message diff --git a/lightrag/kg/oracle_impl.py b/lightrag/kg/oracle_impl.py index 0477ea03..2560502b 100644 --- a/lightrag/kg/oracle_impl.py +++ b/lightrag/kg/oracle_impl.py @@ -392,6 +392,33 @@ class OracleKVStorage(BaseKVStorage): # Oracle handles persistence automatically pass + async def delete(self, ids: list[str]) -> dict[str, str]: + """Delete records with specified IDs from the storage. + + Args: + ids: List of record IDs to be deleted + + Returns: + Dictionary with status and message + """ + if not ids: + return {"status": "success", "message": "No IDs provided for deletion"} + + try: + table_name = namespace_to_table_name(self.namespace) + if not table_name: + return {"status": "error", "message": f"Unknown namespace: {self.namespace}"} + + ids_list = ",".join([f"'{id}'" for id in ids]) + delete_sql = f"DELETE FROM {table_name} WHERE workspace=:workspace AND id IN ({ids_list})" + + await self.db.execute(delete_sql, {"workspace": self.db.workspace}) + logger.info(f"Successfully deleted {len(ids)} records from {self.namespace}") + return {"status": "success", "message": f"Successfully deleted {len(ids)} records"} + except Exception as e: + logger.error(f"Error deleting records from {self.namespace}: {e}") + return {"status": "error", "message": str(e)} + async def drop(self) -> dict[str, str]: """Drop the storage""" try: diff --git a/lightrag/kg/tidb_impl.py b/lightrag/kg/tidb_impl.py index a7dc0039..895e5ebb 100644 --- a/lightrag/kg/tidb_impl.py +++ b/lightrag/kg/tidb_impl.py @@ -278,6 +278,35 @@ class TiDBKVStorage(BaseKVStorage): # Ti handles persistence automatically pass + async def delete(self, ids: list[str]) -> dict[str, str]: + """Delete records with specified IDs from the storage. + + Args: + ids: List of record IDs to be deleted + + Returns: + Dictionary with status and message + """ + if not ids: + return {"status": "success", "message": "No IDs provided for deletion"} + + try: + table_name = namespace_to_table_name(self.namespace) + id_field = namespace_to_id(self.namespace) + + if not table_name or not id_field: + return {"status": "error", "message": f"Unknown namespace: {self.namespace}"} + + ids_list = ",".join([f"'{id}'" for id in ids]) + delete_sql = f"DELETE FROM {table_name} WHERE workspace = :workspace AND {id_field} IN ({ids_list})" + + await self.db.execute(delete_sql, {"workspace": self.db.workspace}) + logger.info(f"Successfully deleted {len(ids)} records from {self.namespace}") + return {"status": "success", "message": f"Successfully deleted {len(ids)} records"} + except Exception as e: + logger.error(f"Error deleting records from {self.namespace}: {e}") + return {"status": "error", "message": str(e)} + async def drop(self) -> dict[str, str]: """Drop the storage""" try: @@ -421,11 +450,66 @@ class TiDBVectorDBStorage(BaseVectorStorage): params = {"workspace": self.db.workspace, "status": status} return await self.db.query(SQL, params, multirows=True) + async def delete(self, ids: list[str]) -> None: + """Delete vectors with specified IDs from the storage. + + Args: + ids: List of vector IDs to be deleted + """ + if not ids: + return + + table_name = namespace_to_table_name(self.namespace) + id_field = namespace_to_id(self.namespace) + + if not table_name or not id_field: + logger.error(f"Unknown namespace for vector deletion: {self.namespace}") + return + + ids_list = ",".join([f"'{id}'" for id in ids]) + delete_sql = f"DELETE FROM {table_name} WHERE workspace = :workspace AND {id_field} IN ({ids_list})" + + try: + await self.db.execute(delete_sql, {"workspace": self.db.workspace}) + logger.debug(f"Successfully deleted {len(ids)} vectors from {self.namespace}") + except Exception as e: + logger.error(f"Error while deleting vectors from {self.namespace}: {e}") + async def delete_entity(self, entity_name: str) -> None: - raise NotImplementedError + """Delete an entity by its name from the vector storage. + + Args: + entity_name: The name of the entity to delete + """ + try: + # Construct SQL to delete the entity + delete_sql = """DELETE FROM LIGHTRAG_GRAPH_NODES + WHERE workspace = :workspace AND name = :entity_name""" + + await self.db.execute( + delete_sql, {"workspace": self.db.workspace, "entity_name": entity_name} + ) + logger.debug(f"Successfully deleted entity {entity_name}") + except Exception as e: + logger.error(f"Error deleting entity {entity_name}: {e}") async def delete_entity_relation(self, entity_name: str) -> None: - raise NotImplementedError + """Delete all relations associated with an entity. + + Args: + entity_name: The name of the entity whose relations should be deleted + """ + try: + # Delete relations where the entity is either the source or target + delete_sql = """DELETE FROM LIGHTRAG_GRAPH_EDGES + WHERE workspace = :workspace AND (source_name = :entity_name OR target_name = :entity_name)""" + + await self.db.execute( + delete_sql, {"workspace": self.db.workspace, "entity_name": entity_name} + ) + logger.debug(f"Successfully deleted relations for entity {entity_name}") + except Exception as e: + logger.error(f"Error deleting relations for entity {entity_name}: {e}") async def index_done_callback(self) -> None: # Ti handles persistence automatically diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 283bd4a4..81797385 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1449,6 +1449,7 @@ class LightRAG: loop = always_get_an_event_loop() return loop.run_until_complete(self.adelete_by_entity(entity_name)) + # TODO: Lock all KG relative DB to esure consistency across multiple processes async def adelete_by_entity(self, entity_name: str) -> None: try: await self.entities_vdb.delete_entity(entity_name) @@ -1486,6 +1487,7 @@ class LightRAG: self.adelete_by_relation(source_entity, target_entity) ) + # TODO: Lock all KG relative DB to esure consistency across multiple processes async def adelete_by_relation(self, source_entity: str, target_entity: str) -> None: """Asynchronously delete a relation between two entities. @@ -1555,6 +1557,7 @@ class LightRAG: """ return await self.doc_status.get_docs_by_status(status) + # TODO: Lock all KG relative DB to esure consistency across multiple processes async def adelete_by_doc_id(self, doc_id: str) -> None: """Delete a document and all its related data @@ -1907,6 +1910,7 @@ class LightRAG: """Synchronous version of aclear_cache.""" return always_get_an_event_loop().run_until_complete(self.aclear_cache(modes)) + # TODO: Lock all KG relative DB to esure consistency across multiple processes async def aedit_entity( self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True ) -> dict[str, Any]: @@ -2119,6 +2123,7 @@ class LightRAG: ] ) + # TODO: Lock all KG relative DB to esure consistency across multiple processes async def aedit_relation( self, source_entity: str, target_entity: str, updated_data: dict[str, Any] ) -> dict[str, Any]: @@ -2433,6 +2438,7 @@ class LightRAG: self.acreate_relation(source_entity, target_entity, relation_data) ) + # TODO: Lock all KG relative DB to esure consistency across multiple processes async def amerge_entities( self, source_entities: list[str],