Add delete support to all storage implementation

This commit is contained in:
yangdx
2025-03-31 16:21:20 +08:00
parent 2cb64ad280
commit 1772e7a887
9 changed files with 266 additions and 22 deletions

View File

@@ -153,15 +153,33 @@ class BaseVectorStorage(StorageNameSpace, ABC):
@abstractmethod @abstractmethod
async def upsert(self, data: dict[str, dict[str, Any]]) -> None: async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
"""Insert or update vectors in the storage.""" """Insert or update vectors in the storage.
Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback
2. Only one process should updating the storage at a time before index_done_callback,
KG-storage-log should be used to avoid data corruption
"""
@abstractmethod @abstractmethod
async def delete_entity(self, entity_name: str) -> None: async def delete_entity(self, entity_name: str) -> None:
"""Delete a single entity by its name.""" """Delete a single entity by its name.
Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback
2. Only one process should updating the storage at a time before index_done_callback,
KG-storage-log should be used to avoid data corruption
"""
@abstractmethod @abstractmethod
async def delete_entity_relation(self, entity_name: str) -> None: async def delete_entity_relation(self, entity_name: str) -> None:
"""Delete relations for a given entity.""" """Delete relations for a given entity.
Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback
2. Only one process should updating the storage at a time before index_done_callback,
KG-storage-log should be used to avoid data corruption
"""
@abstractmethod @abstractmethod
async def get_by_id(self, id: str) -> dict[str, Any] | None: async def get_by_id(self, id: str) -> dict[str, Any] | None:
@@ -187,6 +205,19 @@ class BaseVectorStorage(StorageNameSpace, ABC):
""" """
pass pass
@abstractmethod
async def delete(self, ids: list[str]):
"""Delete vectors with specified IDs
Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback
2. Only one process should updating the storage at a time before index_done_callback,
KG-storage-log should be used to avoid data corruption
Args:
ids: List of vector IDs to be deleted
"""
@dataclass @dataclass
class BaseKVStorage(StorageNameSpace, ABC): class BaseKVStorage(StorageNameSpace, ABC):
@@ -206,16 +237,20 @@ class BaseKVStorage(StorageNameSpace, ABC):
@abstractmethod @abstractmethod
async def upsert(self, data: dict[str, dict[str, Any]]) -> None: async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
"""Upsert data""" """Upsert data
Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback
2. update flags to notify other processes that data persistence is needed
"""
@abstractmethod @abstractmethod
async def delete(self, ids: list[str]) -> None: async def delete(self, ids: list[str]) -> None:
"""Delete specific records from storage by their IDs """Delete specific records from storage by their IDs
This method will: Importance notes for in-memory storage:
1. Remove the specified records from in-memory storage 1. Changes will be persisted to disk during the next index_done_callback
2. For in-memory DB, update flags to notify other processes that data persistence is needed 2. update flags to notify other processes that data persistence is needed
3. For in-memory DB, changes will be persisted to disk during the next index_done_callback
Args: Args:
ids (list[str]): List of document IDs to be deleted from storage ids (list[str]): List of document IDs to be deleted from storage
@@ -267,7 +302,14 @@ class BaseGraphStorage(StorageNameSpace, ABC):
async def upsert_edge( async def upsert_edge(
self, source_node_id: str, target_node_id: str, edge_data: dict[str, str] self, source_node_id: str, target_node_id: str, edge_data: dict[str, str]
) -> None: ) -> None:
"""Delete a node from the graph.""" """Delete a node from the graph.
Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback
2. Only one process should updating the storage at a time before index_done_callback,
KG-storage-log should be used to avoid data corruption
"""
@abstractmethod @abstractmethod
async def delete_node(self, node_id: str) -> None: async def delete_node(self, node_id: str) -> None:

View File

@@ -217,6 +217,11 @@ class FaissVectorDBStorage(BaseVectorStorage):
async def delete(self, ids: list[str]): async def delete(self, ids: list[str]):
""" """
Delete vectors for the provided custom IDs. Delete vectors for the provided custom IDs.
Importance notes:
1. Changes will be persisted to disk during the next index_done_callback
2. Only one process should updating the storage at a time before index_done_callback,
KG-storage-log should be used to avoid data corruption
""" """
logger.info(f"Deleting {len(ids)} vectors from {self.namespace}") logger.info(f"Deleting {len(ids)} vectors from {self.namespace}")
to_remove = [] to_remove = []
@@ -232,13 +237,22 @@ class FaissVectorDBStorage(BaseVectorStorage):
) )
async def delete_entity(self, entity_name: str) -> None: async def delete_entity(self, entity_name: str) -> None:
"""
Importance notes:
1. Changes will be persisted to disk during the next index_done_callback
2. Only one process should updating the storage at a time before index_done_callback,
KG-storage-log should be used to avoid data corruption
"""
entity_id = compute_mdhash_id(entity_name, prefix="ent-") entity_id = compute_mdhash_id(entity_name, prefix="ent-")
logger.debug(f"Attempting to delete entity {entity_name} with ID {entity_id}") logger.debug(f"Attempting to delete entity {entity_name} with ID {entity_id}")
await self.delete([entity_id]) await self.delete([entity_id])
async def delete_entity_relation(self, entity_name: str) -> None: async def delete_entity_relation(self, entity_name: str) -> None:
""" """
Delete relations for a given entity by scanning metadata. Importance notes:
1. Changes will be persisted to disk during the next index_done_callback
2. Only one process should updating the storage at a time before index_done_callback,
KG-storage-log should be used to avoid data corruption
""" """
logger.debug(f"Searching relations for entity {entity_name}") logger.debug(f"Searching relations for entity {entity_name}")
relations = [] relations = []
@@ -433,6 +447,12 @@ class FaissVectorDBStorage(BaseVectorStorage):
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop all vector data from storage and clean up resources """Drop all vector data from storage and clean up resources
This method will:
1. Remove the vector database storage file if it exists
2. Reinitialize the vector database client
3. Update flags to notify other processes
4. Changes is persisted to disk immediately
This method will remove all vectors from the Faiss index and delete the storage files. This method will remove all vectors from the Faiss index and delete the storage files.
Returns: Returns:

View File

@@ -109,6 +109,11 @@ class JsonDocStatusStorage(DocStatusStorage):
await clear_all_update_flags(self.namespace) await clear_all_update_flags(self.namespace)
async def upsert(self, data: dict[str, dict[str, Any]]) -> None: async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
"""
Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback
2. update flags to notify other processes that data persistence is needed
"""
if not data: if not data:
return return
logger.info(f"Inserting {len(data)} records to {self.namespace}") logger.info(f"Inserting {len(data)} records to {self.namespace}")
@@ -125,10 +130,9 @@ class JsonDocStatusStorage(DocStatusStorage):
async def delete(self, doc_ids: list[str]): async def delete(self, doc_ids: list[str]):
"""Delete specific records from storage by their IDs """Delete specific records from storage by their IDs
This method will: Importance notes for in-memory storage:
1. Remove the specified records from in-memory storage 1. Changes will be persisted to disk during the next index_done_callback
2. Update flags to notify other processes that data persistence is needed 2. update flags to notify other processes that data persistence is needed
3. The changes will be persisted to disk during the next index_done_callback
Args: Args:
ids (list[str]): List of document IDs to be deleted from storage ids (list[str]): List of document IDs to be deleted from storage

View File

@@ -114,6 +114,11 @@ class JsonKVStorage(BaseKVStorage):
return set(keys) - set(self._data.keys()) return set(keys) - set(self._data.keys())
async def upsert(self, data: dict[str, dict[str, Any]]) -> None: async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
"""
Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback
2. update flags to notify other processes that data persistence is needed
"""
if not data: if not data:
return return
logger.info(f"Inserting {len(data)} records to {self.namespace}") logger.info(f"Inserting {len(data)} records to {self.namespace}")
@@ -124,10 +129,9 @@ class JsonKVStorage(BaseKVStorage):
async def delete(self, ids: list[str]) -> None: async def delete(self, ids: list[str]) -> None:
"""Delete specific records from storage by their IDs """Delete specific records from storage by their IDs
This method will: Importance notes for in-memory storage:
1. Remove the specified records from in-memory storage 1. Changes will be persisted to disk during the next index_done_callback
2. Update flags to notify other processes that data persistence is needed 2. update flags to notify other processes that data persistence is needed
3. The changes will be persisted to disk during the next index_done_callback
Args: Args:
ids (list[str]): List of document IDs to be deleted from storage ids (list[str]): List of document IDs to be deleted from storage

View File

@@ -78,6 +78,13 @@ class NanoVectorDBStorage(BaseVectorStorage):
return self._client return self._client
async def upsert(self, data: dict[str, dict[str, Any]]) -> None: async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
"""
Importance notes:
1. Changes will be persisted to disk during the next index_done_callback
2. Only one process should updating the storage at a time before index_done_callback,
KG-storage-log should be used to avoid data corruption
"""
logger.info(f"Inserting {len(data)} to {self.namespace}") logger.info(f"Inserting {len(data)} to {self.namespace}")
if not data: if not data:
return return
@@ -146,6 +153,11 @@ class NanoVectorDBStorage(BaseVectorStorage):
async def delete(self, ids: list[str]): async def delete(self, ids: list[str]):
"""Delete vectors with specified IDs """Delete vectors with specified IDs
Importance notes:
1. Changes will be persisted to disk during the next index_done_callback
2. Only one process should updating the storage at a time before index_done_callback,
KG-storage-log should be used to avoid data corruption
Args: Args:
ids: List of vector IDs to be deleted ids: List of vector IDs to be deleted
""" """
@@ -159,6 +171,13 @@ class NanoVectorDBStorage(BaseVectorStorage):
logger.error(f"Error while deleting vectors from {self.namespace}: {e}") logger.error(f"Error while deleting vectors from {self.namespace}: {e}")
async def delete_entity(self, entity_name: str) -> None: async def delete_entity(self, entity_name: str) -> None:
"""
Importance notes:
1. Changes will be persisted to disk during the next index_done_callback
2. Only one process should updating the storage at a time before index_done_callback,
KG-storage-log should be used to avoid data corruption
"""
try: try:
entity_id = compute_mdhash_id(entity_name, prefix="ent-") entity_id = compute_mdhash_id(entity_name, prefix="ent-")
logger.debug( logger.debug(
@@ -176,6 +195,13 @@ class NanoVectorDBStorage(BaseVectorStorage):
logger.error(f"Error deleting entity {entity_name}: {e}") logger.error(f"Error deleting entity {entity_name}: {e}")
async def delete_entity_relation(self, entity_name: str) -> None: async def delete_entity_relation(self, entity_name: str) -> None:
"""
Importance notes:
1. Changes will be persisted to disk during the next index_done_callback
2. Only one process should updating the storage at a time before index_done_callback,
KG-storage-log should be used to avoid data corruption
"""
try: try:
client = await self._get_client() client = await self._get_client()
storage = getattr(client, "_NanoVectorDB__storage") storage = getattr(client, "_NanoVectorDB__storage")
@@ -288,7 +314,9 @@ class NanoVectorDBStorage(BaseVectorStorage):
1. Remove the vector database storage file if it exists 1. Remove the vector database storage file if it exists
2. Reinitialize the vector database client 2. Reinitialize the vector database client
3. Update flags to notify other processes 3. Update flags to notify other processes
4. Trigger index_done_callback to save the empty state 4. Changes is persisted to disk immediately
This method is intended for use in scenarios where all data needs to be removed,
Returns: Returns:
dict[str, str]: Operation status and message dict[str, str]: Operation status and message

View File

@@ -156,16 +156,34 @@ class NetworkXStorage(BaseGraphStorage):
return None return None
async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None: async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None:
"""
Importance notes:
1. Changes will be persisted to disk during the next index_done_callback
2. Only one process should updating the storage at a time before index_done_callback,
KG-storage-log should be used to avoid data corruption
"""
graph = await self._get_graph() graph = await self._get_graph()
graph.add_node(node_id, **node_data) graph.add_node(node_id, **node_data)
async def upsert_edge( async def upsert_edge(
self, source_node_id: str, target_node_id: str, edge_data: dict[str, str] self, source_node_id: str, target_node_id: str, edge_data: dict[str, str]
) -> None: ) -> None:
"""
Importance notes:
1. Changes will be persisted to disk during the next index_done_callback
2. Only one process should updating the storage at a time before index_done_callback,
KG-storage-log should be used to avoid data corruption
"""
graph = await self._get_graph() graph = await self._get_graph()
graph.add_edge(source_node_id, target_node_id, **edge_data) graph.add_edge(source_node_id, target_node_id, **edge_data)
async def delete_node(self, node_id: str) -> None: async def delete_node(self, node_id: str) -> None:
"""
Importance notes:
1. Changes will be persisted to disk during the next index_done_callback
2. Only one process should updating the storage at a time before index_done_callback,
KG-storage-log should be used to avoid data corruption
"""
graph = await self._get_graph() graph = await self._get_graph()
if graph.has_node(node_id): if graph.has_node(node_id):
graph.remove_node(node_id) graph.remove_node(node_id)
@@ -173,6 +191,7 @@ class NetworkXStorage(BaseGraphStorage):
else: else:
logger.warning(f"Node {node_id} not found in the graph for deletion.") logger.warning(f"Node {node_id} not found in the graph for deletion.")
# TODO: NOT USED
async def embed_nodes( async def embed_nodes(
self, algorithm: str self, algorithm: str
) -> tuple[np.ndarray[Any, Any], list[str]]: ) -> tuple[np.ndarray[Any, Any], list[str]]:
@@ -193,6 +212,11 @@ class NetworkXStorage(BaseGraphStorage):
async def remove_nodes(self, nodes: list[str]): async def remove_nodes(self, nodes: list[str]):
"""Delete multiple nodes """Delete multiple nodes
Importance notes:
1. Changes will be persisted to disk during the next index_done_callback
2. Only one process should updating the storage at a time before index_done_callback,
KG-storage-log should be used to avoid data corruption
Args: Args:
nodes: List of node IDs to be deleted nodes: List of node IDs to be deleted
""" """
@@ -204,6 +228,11 @@ class NetworkXStorage(BaseGraphStorage):
async def remove_edges(self, edges: list[tuple[str, str]]): async def remove_edges(self, edges: list[tuple[str, str]]):
"""Delete multiple edges """Delete multiple edges
Importance notes:
1. Changes will be persisted to disk during the next index_done_callback
2. Only one process should updating the storage at a time before index_done_callback,
KG-storage-log should be used to avoid data corruption
Args: Args:
edges: List of edges to be deleted, each edge is a (source, target) tuple edges: List of edges to be deleted, each edge is a (source, target) tuple
""" """
@@ -433,7 +462,7 @@ class NetworkXStorage(BaseGraphStorage):
1. Remove the graph storage file if it exists 1. Remove the graph storage file if it exists
2. Reset the graph to an empty state 2. Reset the graph to an empty state
3. Update flags to notify other processes 3. Update flags to notify other processes
4. Trigger index_done_callback to save the empty state 4. Changes is persisted to disk immediately
Returns: Returns:
dict[str, str]: Operation status and message dict[str, str]: Operation status and message

View File

@@ -392,6 +392,33 @@ class OracleKVStorage(BaseKVStorage):
# Oracle handles persistence automatically # Oracle handles persistence automatically
pass pass
async def delete(self, ids: list[str]) -> dict[str, str]:
"""Delete records with specified IDs from the storage.
Args:
ids: List of record IDs to be deleted
Returns:
Dictionary with status and message
"""
if not ids:
return {"status": "success", "message": "No IDs provided for deletion"}
try:
table_name = namespace_to_table_name(self.namespace)
if not table_name:
return {"status": "error", "message": f"Unknown namespace: {self.namespace}"}
ids_list = ",".join([f"'{id}'" for id in ids])
delete_sql = f"DELETE FROM {table_name} WHERE workspace=:workspace AND id IN ({ids_list})"
await self.db.execute(delete_sql, {"workspace": self.db.workspace})
logger.info(f"Successfully deleted {len(ids)} records from {self.namespace}")
return {"status": "success", "message": f"Successfully deleted {len(ids)} records"}
except Exception as e:
logger.error(f"Error deleting records from {self.namespace}: {e}")
return {"status": "error", "message": str(e)}
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop the storage""" """Drop the storage"""
try: try:

View File

@@ -278,6 +278,35 @@ class TiDBKVStorage(BaseKVStorage):
# Ti handles persistence automatically # Ti handles persistence automatically
pass pass
async def delete(self, ids: list[str]) -> dict[str, str]:
"""Delete records with specified IDs from the storage.
Args:
ids: List of record IDs to be deleted
Returns:
Dictionary with status and message
"""
if not ids:
return {"status": "success", "message": "No IDs provided for deletion"}
try:
table_name = namespace_to_table_name(self.namespace)
id_field = namespace_to_id(self.namespace)
if not table_name or not id_field:
return {"status": "error", "message": f"Unknown namespace: {self.namespace}"}
ids_list = ",".join([f"'{id}'" for id in ids])
delete_sql = f"DELETE FROM {table_name} WHERE workspace = :workspace AND {id_field} IN ({ids_list})"
await self.db.execute(delete_sql, {"workspace": self.db.workspace})
logger.info(f"Successfully deleted {len(ids)} records from {self.namespace}")
return {"status": "success", "message": f"Successfully deleted {len(ids)} records"}
except Exception as e:
logger.error(f"Error deleting records from {self.namespace}: {e}")
return {"status": "error", "message": str(e)}
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop the storage""" """Drop the storage"""
try: try:
@@ -421,11 +450,66 @@ class TiDBVectorDBStorage(BaseVectorStorage):
params = {"workspace": self.db.workspace, "status": status} params = {"workspace": self.db.workspace, "status": status}
return await self.db.query(SQL, params, multirows=True) return await self.db.query(SQL, params, multirows=True)
async def delete(self, ids: list[str]) -> None:
"""Delete vectors with specified IDs from the storage.
Args:
ids: List of vector IDs to be deleted
"""
if not ids:
return
table_name = namespace_to_table_name(self.namespace)
id_field = namespace_to_id(self.namespace)
if not table_name or not id_field:
logger.error(f"Unknown namespace for vector deletion: {self.namespace}")
return
ids_list = ",".join([f"'{id}'" for id in ids])
delete_sql = f"DELETE FROM {table_name} WHERE workspace = :workspace AND {id_field} IN ({ids_list})"
try:
await self.db.execute(delete_sql, {"workspace": self.db.workspace})
logger.debug(f"Successfully deleted {len(ids)} vectors from {self.namespace}")
except Exception as e:
logger.error(f"Error while deleting vectors from {self.namespace}: {e}")
async def delete_entity(self, entity_name: str) -> None: async def delete_entity(self, entity_name: str) -> None:
raise NotImplementedError """Delete an entity by its name from the vector storage.
Args:
entity_name: The name of the entity to delete
"""
try:
# Construct SQL to delete the entity
delete_sql = """DELETE FROM LIGHTRAG_GRAPH_NODES
WHERE workspace = :workspace AND name = :entity_name"""
await self.db.execute(
delete_sql, {"workspace": self.db.workspace, "entity_name": entity_name}
)
logger.debug(f"Successfully deleted entity {entity_name}")
except Exception as e:
logger.error(f"Error deleting entity {entity_name}: {e}")
async def delete_entity_relation(self, entity_name: str) -> None: async def delete_entity_relation(self, entity_name: str) -> None:
raise NotImplementedError """Delete all relations associated with an entity.
Args:
entity_name: The name of the entity whose relations should be deleted
"""
try:
# Delete relations where the entity is either the source or target
delete_sql = """DELETE FROM LIGHTRAG_GRAPH_EDGES
WHERE workspace = :workspace AND (source_name = :entity_name OR target_name = :entity_name)"""
await self.db.execute(
delete_sql, {"workspace": self.db.workspace, "entity_name": entity_name}
)
logger.debug(f"Successfully deleted relations for entity {entity_name}")
except Exception as e:
logger.error(f"Error deleting relations for entity {entity_name}: {e}")
async def index_done_callback(self) -> None: async def index_done_callback(self) -> None:
# Ti handles persistence automatically # Ti handles persistence automatically

View File

@@ -1449,6 +1449,7 @@ class LightRAG:
loop = always_get_an_event_loop() loop = always_get_an_event_loop()
return loop.run_until_complete(self.adelete_by_entity(entity_name)) return loop.run_until_complete(self.adelete_by_entity(entity_name))
# TODO: Lock all KG relative DB to esure consistency across multiple processes
async def adelete_by_entity(self, entity_name: str) -> None: async def adelete_by_entity(self, entity_name: str) -> None:
try: try:
await self.entities_vdb.delete_entity(entity_name) await self.entities_vdb.delete_entity(entity_name)
@@ -1486,6 +1487,7 @@ class LightRAG:
self.adelete_by_relation(source_entity, target_entity) self.adelete_by_relation(source_entity, target_entity)
) )
# TODO: Lock all KG relative DB to esure consistency across multiple processes
async def adelete_by_relation(self, source_entity: str, target_entity: str) -> None: async def adelete_by_relation(self, source_entity: str, target_entity: str) -> None:
"""Asynchronously delete a relation between two entities. """Asynchronously delete a relation between two entities.
@@ -1555,6 +1557,7 @@ class LightRAG:
""" """
return await self.doc_status.get_docs_by_status(status) return await self.doc_status.get_docs_by_status(status)
# TODO: Lock all KG relative DB to esure consistency across multiple processes
async def adelete_by_doc_id(self, doc_id: str) -> None: async def adelete_by_doc_id(self, doc_id: str) -> None:
"""Delete a document and all its related data """Delete a document and all its related data
@@ -1907,6 +1910,7 @@ class LightRAG:
"""Synchronous version of aclear_cache.""" """Synchronous version of aclear_cache."""
return always_get_an_event_loop().run_until_complete(self.aclear_cache(modes)) return always_get_an_event_loop().run_until_complete(self.aclear_cache(modes))
# TODO: Lock all KG relative DB to esure consistency across multiple processes
async def aedit_entity( async def aedit_entity(
self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True
) -> dict[str, Any]: ) -> dict[str, Any]:
@@ -2119,6 +2123,7 @@ class LightRAG:
] ]
) )
# TODO: Lock all KG relative DB to esure consistency across multiple processes
async def aedit_relation( async def aedit_relation(
self, source_entity: str, target_entity: str, updated_data: dict[str, Any] self, source_entity: str, target_entity: str, updated_data: dict[str, Any]
) -> dict[str, Any]: ) -> dict[str, Any]:
@@ -2433,6 +2438,7 @@ class LightRAG:
self.acreate_relation(source_entity, target_entity, relation_data) self.acreate_relation(source_entity, target_entity, relation_data)
) )
# TODO: Lock all KG relative DB to esure consistency across multiple processes
async def amerge_entities( async def amerge_entities(
self, self,
source_entities: list[str], source_entities: list[str],