Add drop_cace_by_modes to all KV storage implementation

This commit is contained in:
yangdx
2025-03-31 23:10:21 +08:00
parent 04967b33cc
commit 3d4f8f67c9
8 changed files with 228 additions and 23 deletions

View File

@@ -259,6 +259,20 @@ class BaseKVStorage(StorageNameSpace, ABC):
None None
""" """
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
"""Delete specific records from storage by cache mode
Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback
2. update flags to notify other processes that data persistence is needed
Args:
modes (list[str]): List of cache modes to be dropped from storage
Returns:
True: if the cache drop successfully
False: if the cache drop failed, or the cache mode is not supported
"""
@dataclass @dataclass
class BaseGraphStorage(StorageNameSpace, ABC): class BaseGraphStorage(StorageNameSpace, ABC):
@@ -310,7 +324,6 @@ class BaseGraphStorage(StorageNameSpace, ABC):
KG-storage-log should be used to avoid data corruption KG-storage-log should be used to avoid data corruption
""" """
@abstractmethod @abstractmethod
async def delete_node(self, node_id: str) -> None: async def delete_node(self, node_id: str) -> None:
"""Embed nodes using an algorithm.""" """Embed nodes using an algorithm."""
@@ -381,6 +394,10 @@ class DocStatusStorage(BaseKVStorage, ABC):
) -> dict[str, DocProcessingStatus]: ) -> dict[str, DocProcessingStatus]:
"""Get all documents with a specific status""" """Get all documents with a specific status"""
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
"""Drop cache is not supported for Doc Status storage"""
return False
class StoragesStatus(str, Enum): class StoragesStatus(str, Enum):
"""Storages status""" """Storages status"""

View File

@@ -127,7 +127,7 @@ class JsonDocStatusStorage(DocStatusStorage):
async with self._storage_lock: async with self._storage_lock:
return self._data.get(id) return self._data.get(id)
async def delete(self, doc_ids: list[str]): async def delete(self, doc_ids: list[str]) -> None:
"""Delete specific records from storage by their IDs """Delete specific records from storage by their IDs
Importance notes for in-memory storage: Importance notes for in-memory storage:

View File

@@ -144,6 +144,30 @@ class JsonKVStorage(BaseKVStorage):
self._data.pop(doc_id, None) self._data.pop(doc_id, None)
await set_all_update_flags(self.namespace) await set_all_update_flags(self.namespace)
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
"""Delete specific records from storage by by cache mode
Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback
2. update flags to notify other processes that data persistence is needed
Args:
ids (list[str]): List of cache mode to be drop from storage
Returns:
True: if the cache drop successfully
False: if the cache drop failed
"""
if not modes:
return False
try:
await self.delete(modes)
return True
except Exception:
return False
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop all data from storage and clean up resources """Drop all data from storage and clean up resources
This action will persistent the data to disk immediately. This action will persistent the data to disk immediately.

View File

@@ -165,6 +165,28 @@ class MongoKVStorage(BaseKVStorage):
except PyMongoError as e: except PyMongoError as e:
logger.error(f"Error deleting documents from {self.namespace}: {e}") logger.error(f"Error deleting documents from {self.namespace}: {e}")
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
"""Delete specific records from storage by cache mode
Args:
modes (list[str]): List of cache modes to be dropped from storage
Returns:
bool: True if successful, False otherwise
"""
if not modes:
return False
try:
# Build regex pattern to match documents with the specified modes
pattern = f"^({'|'.join(modes)})_"
result = await self._data.delete_many({"_id": {"$regex": pattern}})
logger.info(f"Deleted {result.deleted_count} documents by modes: {modes}")
return True
except Exception as e:
logger.error(f"Error deleting cache by modes {modes}: {e}")
return False
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop the storage by removing all documents in the collection. """Drop the storage by removing all documents in the collection.

View File

@@ -392,32 +392,63 @@ class OracleKVStorage(BaseKVStorage):
# Oracle handles persistence automatically # Oracle handles persistence automatically
pass pass
async def delete(self, ids: list[str]) -> dict[str, str]: async def delete(self, ids: list[str]) -> None:
"""Delete records with specified IDs from the storage. """Delete records with specified IDs from the storage.
Args: Args:
ids: List of record IDs to be deleted ids: List of record IDs to be deleted
Returns:
Dictionary with status and message
""" """
if not ids: if not ids:
return {"status": "success", "message": "No IDs provided for deletion"} return
try: try:
table_name = namespace_to_table_name(self.namespace) table_name = namespace_to_table_name(self.namespace)
if not table_name: if not table_name:
return {"status": "error", "message": f"Unknown namespace: {self.namespace}"} logger.error(f"Unknown namespace for deletion: {self.namespace}")
return
ids_list = ",".join([f"'{id}'" for id in ids]) ids_list = ",".join([f"'{id}'" for id in ids])
delete_sql = f"DELETE FROM {table_name} WHERE workspace=:workspace AND id IN ({ids_list})" delete_sql = f"DELETE FROM {table_name} WHERE workspace=:workspace AND id IN ({ids_list})"
await self.db.execute(delete_sql, {"workspace": self.db.workspace}) await self.db.execute(delete_sql, {"workspace": self.db.workspace})
logger.info(f"Successfully deleted {len(ids)} records from {self.namespace}") logger.info(f"Successfully deleted {len(ids)} records from {self.namespace}")
return {"status": "success", "message": f"Successfully deleted {len(ids)} records"}
except Exception as e: except Exception as e:
logger.error(f"Error deleting records from {self.namespace}: {e}") logger.error(f"Error deleting records from {self.namespace}: {e}")
return {"status": "error", "message": str(e)}
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
"""Delete specific records from storage by cache mode
Args:
modes (list[str]): List of cache modes to be dropped from storage
Returns:
bool: True if successful, False otherwise
"""
if not modes:
return False
try:
table_name = namespace_to_table_name(self.namespace)
if not table_name:
return False
if table_name != "LIGHTRAG_LLM_CACHE":
return False
# 构建Oracle风格的IN查询
modes_list = ", ".join([f"'{mode}'" for mode in modes])
sql = f"""
DELETE FROM {table_name}
WHERE workspace = :workspace
AND cache_mode IN ({modes_list})
"""
logger.info(f"Deleting cache by modes: {modes}")
await self.db.execute(sql, {"workspace": self.db.workspace})
return True
except Exception as e:
logger.error(f"Error deleting cache by modes {modes}: {e}")
return False
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop the storage""" """Drop the storage"""

View File

@@ -378,6 +378,67 @@ class PGKVStorage(BaseKVStorage):
# PG handles persistence automatically # PG handles persistence automatically
pass pass
async def delete(self, ids: list[str]) -> None:
"""Delete specific records from storage by their IDs
Args:
ids (list[str]): List of document IDs to be deleted from storage
Returns:
None
"""
if not ids:
return
table_name = namespace_to_table_name(self.namespace)
if not table_name:
logger.error(f"Unknown namespace for deletion: {self.namespace}")
return
delete_sql = f"DELETE FROM {table_name} WHERE workspace=$1 AND id = ANY($2)"
try:
await self.db.execute(delete_sql, {"workspace": self.db.workspace, "ids": ids})
logger.debug(f"Successfully deleted {len(ids)} records from {self.namespace}")
except Exception as e:
logger.error(f"Error while deleting records from {self.namespace}: {e}")
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
"""Delete specific records from storage by cache mode
Args:
modes (list[str]): List of cache modes to be dropped from storage
Returns:
bool: True if successful, False otherwise
"""
if not modes:
return False
try:
table_name = namespace_to_table_name(self.namespace)
if not table_name:
return False
if table_name != "LIGHTRAG_LLM_CACHE":
return False
sql = f"""
DELETE FROM {table_name}
WHERE workspace = $1 AND mode = ANY($2)
"""
params = {
"workspace": self.db.workspace,
"modes": modes
}
logger.info(f"Deleting cache by modes: {modes}")
await self.db.execute(sql, params)
return True
except Exception as e:
logger.error(f"Error deleting cache by modes {modes}: {e}")
return False
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop the storage""" """Drop the storage"""
try: try:
@@ -558,13 +619,10 @@ class PGVectorStorage(BaseVectorStorage):
logger.error(f"Unknown namespace for vector deletion: {self.namespace}") logger.error(f"Unknown namespace for vector deletion: {self.namespace}")
return return
ids_list = ",".join([f"'{id}'" for id in ids]) delete_sql = f"DELETE FROM {table_name} WHERE workspace=$1 AND id = ANY($2)"
delete_sql = (
f"DELETE FROM {table_name} WHERE workspace=$1 AND id IN ({ids_list})"
)
try: try:
await self.db.execute(delete_sql, {"workspace": self.db.workspace}) await self.db.execute(delete_sql, {"workspace": self.db.workspace, "ids": ids})
logger.debug( logger.debug(
f"Successfully deleted {len(ids)} vectors from {self.namespace}" f"Successfully deleted {len(ids)} vectors from {self.namespace}"
) )

View File

@@ -84,6 +84,28 @@ class RedisKVStorage(BaseKVStorage):
f"Deleted {deleted_count} of {len(ids)} entries from {self.namespace}" f"Deleted {deleted_count} of {len(ids)} entries from {self.namespace}"
) )
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
"""Delete specific records from storage by by cache mode
Importance notes for Redis storage:
1. This will immediately delete the specified cache modes from Redis
Args:
modes (list[str]): List of cache mode to be drop from storage
Returns:
True: if the cache drop successfully
False: if the cache drop failed
"""
if not modes:
return False
try:
await self.delete(modes)
return True
except Exception:
return False
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop the storage by removing all keys under the current namespace. """Drop the storage by removing all keys under the current namespace.

View File

@@ -278,34 +278,65 @@ class TiDBKVStorage(BaseKVStorage):
# Ti handles persistence automatically # Ti handles persistence automatically
pass pass
async def delete(self, ids: list[str]) -> dict[str, str]: async def delete(self, ids: list[str]) -> None:
"""Delete records with specified IDs from the storage. """Delete records with specified IDs from the storage.
Args: Args:
ids: List of record IDs to be deleted ids: List of record IDs to be deleted
Returns:
Dictionary with status and message
""" """
if not ids: if not ids:
return {"status": "success", "message": "No IDs provided for deletion"} return
try: try:
table_name = namespace_to_table_name(self.namespace) table_name = namespace_to_table_name(self.namespace)
id_field = namespace_to_id(self.namespace) id_field = namespace_to_id(self.namespace)
if not table_name or not id_field: if not table_name or not id_field:
return {"status": "error", "message": f"Unknown namespace: {self.namespace}"} logger.error(f"Unknown namespace for deletion: {self.namespace}")
return
ids_list = ",".join([f"'{id}'" for id in ids]) ids_list = ",".join([f"'{id}'" for id in ids])
delete_sql = f"DELETE FROM {table_name} WHERE workspace = :workspace AND {id_field} IN ({ids_list})" delete_sql = f"DELETE FROM {table_name} WHERE workspace = :workspace AND {id_field} IN ({ids_list})"
await self.db.execute(delete_sql, {"workspace": self.db.workspace}) await self.db.execute(delete_sql, {"workspace": self.db.workspace})
logger.info(f"Successfully deleted {len(ids)} records from {self.namespace}") logger.info(f"Successfully deleted {len(ids)} records from {self.namespace}")
return {"status": "success", "message": f"Successfully deleted {len(ids)} records"}
except Exception as e: except Exception as e:
logger.error(f"Error deleting records from {self.namespace}: {e}") logger.error(f"Error deleting records from {self.namespace}: {e}")
return {"status": "error", "message": str(e)}
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
"""Delete specific records from storage by cache mode
Args:
modes (list[str]): List of cache modes to be dropped from storage
Returns:
bool: True if successful, False otherwise
"""
if not modes:
return False
try:
table_name = namespace_to_table_name(self.namespace)
if not table_name:
return False
if table_name != "LIGHTRAG_LLM_CACHE":
return False
# 构建MySQL风格的IN查询
modes_list = ", ".join([f"'{mode}'" for mode in modes])
sql = f"""
DELETE FROM {table_name}
WHERE workspace = :workspace
AND mode IN ({modes_list})
"""
logger.info(f"Deleting cache by modes: {modes}")
await self.db.execute(sql, {"workspace": self.db.workspace})
return True
except Exception as e:
logger.error(f"Error deleting cache by modes {modes}: {e}")
return False
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop the storage""" """Drop the storage"""