From 3d4f8f67c947c8dfc8ba2cfcdc19d1ea2a78cd61 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 31 Mar 2025 23:10:21 +0800 Subject: [PATCH] Add drop_cace_by_modes to all KV storage implementation --- lightrag/base.py | 19 +++++++- lightrag/kg/json_doc_status_impl.py | 2 +- lightrag/kg/json_kv_impl.py | 24 ++++++++++ lightrag/kg/mongo_impl.py | 22 ++++++++++ lightrag/kg/oracle_impl.py | 47 ++++++++++++++++---- lightrag/kg/postgres_impl.py | 68 ++++++++++++++++++++++++++--- lightrag/kg/redis_impl.py | 22 ++++++++++ lightrag/kg/tidb_impl.py | 47 ++++++++++++++++---- 8 files changed, 228 insertions(+), 23 deletions(-) diff --git a/lightrag/base.py b/lightrag/base.py index 6b9163df..05f30d3c 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -259,6 +259,20 @@ class BaseKVStorage(StorageNameSpace, ABC): None """ + async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: + """Delete specific records from storage by cache mode + + Importance notes for in-memory storage: + 1. Changes will be persisted to disk during the next index_done_callback + 2. update flags to notify other processes that data persistence is needed + + Args: + modes (list[str]): List of cache modes to be dropped from storage + + Returns: + True: if the cache drop successfully + False: if the cache drop failed, or the cache mode is not supported + """ @dataclass class BaseGraphStorage(StorageNameSpace, ABC): @@ -310,7 +324,6 @@ class BaseGraphStorage(StorageNameSpace, ABC): KG-storage-log should be used to avoid data corruption """ - @abstractmethod async def delete_node(self, node_id: str) -> None: """Embed nodes using an algorithm.""" @@ -381,6 +394,10 @@ class DocStatusStorage(BaseKVStorage, ABC): ) -> dict[str, DocProcessingStatus]: """Get all documents with a specific status""" + async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: + """Drop cache is not supported for Doc Status storage""" + return False + class StoragesStatus(str, Enum): """Storages status""" diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index 33b61eea..003a6733 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -127,7 +127,7 @@ class JsonDocStatusStorage(DocStatusStorage): async with self._storage_lock: return self._data.get(id) - async def delete(self, doc_ids: list[str]): + async def delete(self, doc_ids: list[str]) -> None: """Delete specific records from storage by their IDs Importance notes for in-memory storage: diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index 4972bf6a..8857aa9a 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -144,6 +144,30 @@ class JsonKVStorage(BaseKVStorage): self._data.pop(doc_id, None) await set_all_update_flags(self.namespace) + async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: + """Delete specific records from storage by by cache mode + + Importance notes for in-memory storage: + 1. Changes will be persisted to disk during the next index_done_callback + 2. update flags to notify other processes that data persistence is needed + + Args: + ids (list[str]): List of cache mode to be drop from storage + + Returns: + True: if the cache drop successfully + False: if the cache drop failed + """ + if not modes: + return False + + try: + await self.delete(modes) + return True + except Exception: + return False + + async def drop(self) -> dict[str, str]: """Drop all data from storage and clean up resources This action will persistent the data to disk immediately. diff --git a/lightrag/kg/mongo_impl.py b/lightrag/kg/mongo_impl.py index d5832af0..24c215be 100644 --- a/lightrag/kg/mongo_impl.py +++ b/lightrag/kg/mongo_impl.py @@ -165,6 +165,28 @@ class MongoKVStorage(BaseKVStorage): except PyMongoError as e: logger.error(f"Error deleting documents from {self.namespace}: {e}") + async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: + """Delete specific records from storage by cache mode + + Args: + modes (list[str]): List of cache modes to be dropped from storage + + Returns: + bool: True if successful, False otherwise + """ + if not modes: + return False + + try: + # Build regex pattern to match documents with the specified modes + pattern = f"^({'|'.join(modes)})_" + result = await self._data.delete_many({"_id": {"$regex": pattern}}) + logger.info(f"Deleted {result.deleted_count} documents by modes: {modes}") + return True + except Exception as e: + logger.error(f"Error deleting cache by modes {modes}: {e}") + return False + async def drop(self) -> dict[str, str]: """Drop the storage by removing all documents in the collection. diff --git a/lightrag/kg/oracle_impl.py b/lightrag/kg/oracle_impl.py index 2560502b..6d3e2e8d 100644 --- a/lightrag/kg/oracle_impl.py +++ b/lightrag/kg/oracle_impl.py @@ -392,32 +392,63 @@ class OracleKVStorage(BaseKVStorage): # Oracle handles persistence automatically pass - async def delete(self, ids: list[str]) -> dict[str, str]: + async def delete(self, ids: list[str]) -> None: """Delete records with specified IDs from the storage. Args: ids: List of record IDs to be deleted - - Returns: - Dictionary with status and message """ if not ids: - return {"status": "success", "message": "No IDs provided for deletion"} + return try: table_name = namespace_to_table_name(self.namespace) if not table_name: - return {"status": "error", "message": f"Unknown namespace: {self.namespace}"} + logger.error(f"Unknown namespace for deletion: {self.namespace}") + return ids_list = ",".join([f"'{id}'" for id in ids]) delete_sql = f"DELETE FROM {table_name} WHERE workspace=:workspace AND id IN ({ids_list})" await self.db.execute(delete_sql, {"workspace": self.db.workspace}) logger.info(f"Successfully deleted {len(ids)} records from {self.namespace}") - return {"status": "success", "message": f"Successfully deleted {len(ids)} records"} except Exception as e: logger.error(f"Error deleting records from {self.namespace}: {e}") - return {"status": "error", "message": str(e)} + + async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: + """Delete specific records from storage by cache mode + + Args: + modes (list[str]): List of cache modes to be dropped from storage + + Returns: + bool: True if successful, False otherwise + """ + if not modes: + return False + + try: + table_name = namespace_to_table_name(self.namespace) + if not table_name: + return False + + if table_name != "LIGHTRAG_LLM_CACHE": + return False + + # 构建Oracle风格的IN查询 + modes_list = ", ".join([f"'{mode}'" for mode in modes]) + sql = f""" + DELETE FROM {table_name} + WHERE workspace = :workspace + AND cache_mode IN ({modes_list}) + """ + + logger.info(f"Deleting cache by modes: {modes}") + await self.db.execute(sql, {"workspace": self.db.workspace}) + return True + except Exception as e: + logger.error(f"Error deleting cache by modes {modes}: {e}") + return False async def drop(self) -> dict[str, str]: """Drop the storage""" diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index ff90d14b..34268e32 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -378,6 +378,67 @@ class PGKVStorage(BaseKVStorage): # PG handles persistence automatically pass + async def delete(self, ids: list[str]) -> None: + """Delete specific records from storage by their IDs + + Args: + ids (list[str]): List of document IDs to be deleted from storage + + Returns: + None + """ + if not ids: + return + + table_name = namespace_to_table_name(self.namespace) + if not table_name: + logger.error(f"Unknown namespace for deletion: {self.namespace}") + return + + delete_sql = f"DELETE FROM {table_name} WHERE workspace=$1 AND id = ANY($2)" + + try: + await self.db.execute(delete_sql, {"workspace": self.db.workspace, "ids": ids}) + logger.debug(f"Successfully deleted {len(ids)} records from {self.namespace}") + except Exception as e: + logger.error(f"Error while deleting records from {self.namespace}: {e}") + + async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: + """Delete specific records from storage by cache mode + + Args: + modes (list[str]): List of cache modes to be dropped from storage + + Returns: + bool: True if successful, False otherwise + """ + if not modes: + return False + + try: + table_name = namespace_to_table_name(self.namespace) + if not table_name: + return False + + if table_name != "LIGHTRAG_LLM_CACHE": + return False + + sql = f""" + DELETE FROM {table_name} + WHERE workspace = $1 AND mode = ANY($2) + """ + params = { + "workspace": self.db.workspace, + "modes": modes + } + + logger.info(f"Deleting cache by modes: {modes}") + await self.db.execute(sql, params) + return True + except Exception as e: + logger.error(f"Error deleting cache by modes {modes}: {e}") + return False + async def drop(self) -> dict[str, str]: """Drop the storage""" try: @@ -558,13 +619,10 @@ class PGVectorStorage(BaseVectorStorage): logger.error(f"Unknown namespace for vector deletion: {self.namespace}") return - ids_list = ",".join([f"'{id}'" for id in ids]) - delete_sql = ( - f"DELETE FROM {table_name} WHERE workspace=$1 AND id IN ({ids_list})" - ) + delete_sql = f"DELETE FROM {table_name} WHERE workspace=$1 AND id = ANY($2)" try: - await self.db.execute(delete_sql, {"workspace": self.db.workspace}) + await self.db.execute(delete_sql, {"workspace": self.db.workspace, "ids": ids}) logger.debug( f"Successfully deleted {len(ids)} vectors from {self.namespace}" ) diff --git a/lightrag/kg/redis_impl.py b/lightrag/kg/redis_impl.py index 9ff50008..964b0ad7 100644 --- a/lightrag/kg/redis_impl.py +++ b/lightrag/kg/redis_impl.py @@ -84,6 +84,28 @@ class RedisKVStorage(BaseKVStorage): f"Deleted {deleted_count} of {len(ids)} entries from {self.namespace}" ) + async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: + """Delete specific records from storage by by cache mode + + Importance notes for Redis storage: + 1. This will immediately delete the specified cache modes from Redis + + Args: + modes (list[str]): List of cache mode to be drop from storage + + Returns: + True: if the cache drop successfully + False: if the cache drop failed + """ + if not modes: + return False + + try: + await self.delete(modes) + return True + except Exception: + return False + async def drop(self) -> dict[str, str]: """Drop the storage by removing all keys under the current namespace. diff --git a/lightrag/kg/tidb_impl.py b/lightrag/kg/tidb_impl.py index 895e5ebb..3d06ad0a 100644 --- a/lightrag/kg/tidb_impl.py +++ b/lightrag/kg/tidb_impl.py @@ -278,34 +278,65 @@ class TiDBKVStorage(BaseKVStorage): # Ti handles persistence automatically pass - async def delete(self, ids: list[str]) -> dict[str, str]: + async def delete(self, ids: list[str]) -> None: """Delete records with specified IDs from the storage. Args: ids: List of record IDs to be deleted - - Returns: - Dictionary with status and message """ if not ids: - return {"status": "success", "message": "No IDs provided for deletion"} + return try: table_name = namespace_to_table_name(self.namespace) id_field = namespace_to_id(self.namespace) if not table_name or not id_field: - return {"status": "error", "message": f"Unknown namespace: {self.namespace}"} + logger.error(f"Unknown namespace for deletion: {self.namespace}") + return ids_list = ",".join([f"'{id}'" for id in ids]) delete_sql = f"DELETE FROM {table_name} WHERE workspace = :workspace AND {id_field} IN ({ids_list})" await self.db.execute(delete_sql, {"workspace": self.db.workspace}) logger.info(f"Successfully deleted {len(ids)} records from {self.namespace}") - return {"status": "success", "message": f"Successfully deleted {len(ids)} records"} except Exception as e: logger.error(f"Error deleting records from {self.namespace}: {e}") - return {"status": "error", "message": str(e)} + + async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: + """Delete specific records from storage by cache mode + + Args: + modes (list[str]): List of cache modes to be dropped from storage + + Returns: + bool: True if successful, False otherwise + """ + if not modes: + return False + + try: + table_name = namespace_to_table_name(self.namespace) + if not table_name: + return False + + if table_name != "LIGHTRAG_LLM_CACHE": + return False + + # 构建MySQL风格的IN查询 + modes_list = ", ".join([f"'{mode}'" for mode in modes]) + sql = f""" + DELETE FROM {table_name} + WHERE workspace = :workspace + AND mode IN ({modes_list}) + """ + + logger.info(f"Deleting cache by modes: {modes}") + await self.db.execute(sql, {"workspace": self.db.workspace}) + return True + except Exception as e: + logger.error(f"Error deleting cache by modes {modes}: {e}") + return False async def drop(self) -> dict[str, str]: """Drop the storage"""