From 1df4b777d7bbdcef0161f9b84cdf594a7c3f2fd1 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 30 Mar 2025 15:17:57 +0800 Subject: [PATCH] Add drop funtions to storage implementations --- lightrag/base.py | 25 ++++++++ lightrag/kg/chroma_impl.py | 28 ++++++++- lightrag/kg/faiss_impl.py | 35 +++++++++++ lightrag/kg/json_doc_status_impl.py | 28 +++++++-- lightrag/kg/json_kv_impl.py | 23 +++++++ lightrag/kg/milvus_impl.py | 30 ++++++++- lightrag/kg/nano_vector_db_impl.py | 36 +++++++++++ lightrag/kg/neo4j_impl.py | 23 +++++++ lightrag/kg/networkx_impl.py | 31 ++++++++++ lightrag/kg/qdrant_impl.py | 95 +++++++++++++++++++++++++++-- 10 files changed, 339 insertions(+), 15 deletions(-) diff --git a/lightrag/base.py b/lightrag/base.py index ad41fc58..bff92b34 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -111,6 +111,31 @@ class StorageNameSpace(ABC): @abstractmethod async def index_done_callback(self) -> None: """Commit the storage operations after indexing""" + + @abstractmethod + async def drop(self) -> dict[str, str]: + """Drop all data from storage and clean up resources + + This abstract method defines the contract for dropping all data from a storage implementation. + Each storage type must implement this method to: + 1. Clear all data from memory and/or external storage + 2. Remove any associated storage files if applicable + 3. Reset the storage to its initial state + 4. Handle cleanup of any resources + 5. Notify other processes if necessary + + Returns: + dict[str, str]: Operation status and message with the following format: + { + "status": str, # "success" or "error" + "message": str # "data dropped" on success, error details on failure + } + + Implementation specific: + - On success: return {"status": "success", "message": "data dropped"} + - On failure: return {"status": "error", "message": ""} + - If not supported: return {"status": "error", "message": "unsupported"} + """ @dataclass diff --git a/lightrag/kg/chroma_impl.py b/lightrag/kg/chroma_impl.py index 84d43326..052088d4 100644 --- a/lightrag/kg/chroma_impl.py +++ b/lightrag/kg/chroma_impl.py @@ -1,4 +1,5 @@ import asyncio +import os from dataclasses import dataclass from typing import Any, final import numpy as np @@ -10,8 +11,8 @@ import pipmaster as pm if not pm.is_installed("chromadb"): pm.install("chromadb") -from chromadb import HttpClient, PersistentClient -from chromadb.config import Settings +from chromadb import HttpClient, PersistentClient # type: ignore +from chromadb.config import Settings # type: ignore @final @@ -335,3 +336,26 @@ class ChromaVectorDBStorage(BaseVectorStorage): except Exception as e: logger.error(f"Error retrieving vector data for IDs {ids}: {e}") return [] + + async def drop(self) -> dict[str, str]: + """Drop all vector data from storage and clean up resources + + This method will delete all documents from the ChromaDB collection. + + Returns: + dict[str, str]: Operation status and message + - On success: {"status": "success", "message": "data dropped"} + - On failure: {"status": "error", "message": ""} + """ + try: + # Get all IDs in the collection + result = self._collection.get(include=[]) + if result and result["ids"] and len(result["ids"]) > 0: + # Delete all documents + self._collection.delete(ids=result["ids"]) + + logger.info(f"Process {os.getpid()} drop ChromaDB collection {self.namespace}") + return {"status": "success", "message": "data dropped"} + except Exception as e: + logger.error(f"Error dropping ChromaDB collection {self.namespace}: {e}") + return {"status": "error", "message": str(e)} diff --git a/lightrag/kg/faiss_impl.py b/lightrag/kg/faiss_impl.py index b8176037..42133090 100644 --- a/lightrag/kg/faiss_impl.py +++ b/lightrag/kg/faiss_impl.py @@ -429,3 +429,38 @@ class FaissVectorDBStorage(BaseVectorStorage): results.append({**metadata, "id": metadata.get("__id__")}) return results + + async def drop(self) -> dict[str, str]: + """Drop all vector data from storage and clean up resources + + This method will remove all vectors from the Faiss index and delete the storage files. + + Returns: + dict[str, str]: Operation status and message + - On success: {"status": "success", "message": "data dropped"} + - On failure: {"status": "error", "message": ""} + """ + try: + async with self._storage_lock: + # Reset the index + self._index = faiss.IndexFlatIP(self._dim) + self._id_to_meta = {} + + # Remove storage files if they exist + if os.path.exists(self._faiss_index_file): + os.remove(self._faiss_index_file) + if os.path.exists(self._meta_file): + os.remove(self._meta_file) + + self._id_to_meta = {} + self._load_faiss_index() + + # Notify other processes + await set_all_update_flags(self.namespace) + self.storage_updated.value = False + + logger.info(f"Process {os.getpid()} drop FAISS index {self.namespace}") + return {"status": "success", "message": "data dropped"} + except Exception as e: + logger.error(f"Error dropping FAISS index {self.namespace}: {e}") + return {"status": "error", "message": str(e)} diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index 22da07b5..0a3f5470 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -129,9 +129,25 @@ class JsonDocStatusStorage(DocStatusStorage): await set_all_update_flags(self.namespace) await self.index_done_callback() - async def drop(self) -> None: - """Drop the storage""" - async with self._storage_lock: - self._data.clear() - await set_all_update_flags(self.namespace) - await self.index_done_callback() + async def drop(self) -> dict[str, str]: + """Drop all document status data from storage and clean up resources + + This method will: + 1. Clear all document status data from memory + 2. Update flags to notify other processes + 3. Trigger index_done_callback to save the empty state + + Returns: + dict[str, str]: Operation status and message + - On success: {"status": "success", "message": "data dropped"} + - On failure: {"status": "error", "message": ""} + """ + try: + async with self._storage_lock: + self._data.update({}) + await self.index_done_callback() + logger.info(f"Process {os.getpid()} drop {self.namespace}") + return {"status": "success", "message": "data dropped"} + except Exception as e: + logger.error(f"Error dropping {self.namespace}: {e}") + return {"status": "error", "message": str(e)} diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index e7deaf15..2ca9c03e 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -127,3 +127,26 @@ class JsonKVStorage(BaseKVStorage): self._data.pop(doc_id, None) await set_all_update_flags(self.namespace) await self.index_done_callback() + + async def drop(self) -> dict[str, str]: + """Drop all data from storage and clean up resources + + This method will: + 1. Clear all data from memory + 2. Update flags to notify other processes + 3. Trigger index_done_callback to save the empty state + + Returns: + dict[str, str]: Operation status and message + - On success: {"status": "success", "message": "data dropped"} + - On failure: {"status": "error", "message": ""} + """ + try: + async with self._storage_lock: + self._data.update({}) + await self.index_done_callback() + logger.info(f"Process {os.getpid()} drop {self.namespace}") + return {"status": "success", "message": "data dropped"} + except Exception as e: + logger.error(f"Error dropping {self.namespace}: {e}") + return {"status": "error", "message": str(e)} diff --git a/lightrag/kg/milvus_impl.py b/lightrag/kg/milvus_impl.py index 4b4577ca..74cf416a 100644 --- a/lightrag/kg/milvus_impl.py +++ b/lightrag/kg/milvus_impl.py @@ -15,7 +15,7 @@ if not pm.is_installed("pymilvus"): pm.install("pymilvus") import configparser -from pymilvus import MilvusClient +from pymilvus import MilvusClient # type: ignore config = configparser.ConfigParser() config.read("config.ini", "utf-8") @@ -287,3 +287,31 @@ class MilvusVectorDBStorage(BaseVectorStorage): except Exception as e: logger.error(f"Error retrieving vector data for IDs {ids}: {e}") return [] + + async def drop(self) -> dict[str, str]: + """Drop all vector data from storage and clean up resources + + This method will delete all data from the Milvus collection. + + Returns: + dict[str, str]: Operation status and message + - On success: {"status": "success", "message": "data dropped"} + - On failure: {"status": "error", "message": ""} + """ + try: + # Drop the collection and recreate it + if self._client.has_collection(self.namespace): + self._client.drop_collection(self.namespace) + + # Recreate the collection + MilvusVectorDBStorage.create_collection_if_not_exist( + self._client, + self.namespace, + dimension=self.embedding_func.embedding_dim, + ) + + logger.info(f"Process {os.getpid()} drop Milvus collection {self.namespace}") + return {"status": "success", "message": "data dropped"} + except Exception as e: + logger.error(f"Error dropping Milvus collection {self.namespace}: {e}") + return {"status": "error", "message": str(e)} diff --git a/lightrag/kg/nano_vector_db_impl.py b/lightrag/kg/nano_vector_db_impl.py index 553ba0b2..0f907a42 100644 --- a/lightrag/kg/nano_vector_db_impl.py +++ b/lightrag/kg/nano_vector_db_impl.py @@ -280,3 +280,39 @@ class NanoVectorDBStorage(BaseVectorStorage): client = await self._get_client() return client.get(ids) + + async def drop(self) -> dict[str, str]: + """Drop all vector data from storage and clean up resources + + This method will: + 1. Remove the vector database storage file if it exists + 2. Reinitialize the vector database client + 3. Update flags to notify other processes + 4. Trigger index_done_callback to save the empty state + + Returns: + dict[str, str]: Operation status and message + - On success: {"status": "success", "message": "data dropped"} + - On failure: {"status": "error", "message": ""} + """ + try: + async with self._storage_lock: + # delete _client_file_name + if os.path.exists(self._client_file_name): + os.remove(self._client_file_name) + + self._client = NanoVectorDB( + self.embedding_func.embedding_dim, + storage_file=self._client_file_name, + ) + + # Notify other processes that data has been updated + await set_all_update_flags(self.namespace) + # Reset own update flag to avoid self-reloading + self.storage_updated.value = False + + logger.info(f"Process {os.getpid()} drop {self.namespace}(file:{self._client_file_name})") + return {"status": "success", "message": "data dropped"} + except Exception as e: + logger.error(f"Error dropping {self.namespace}: {e}") + return {"status": "error", "message": str(e)} diff --git a/lightrag/kg/neo4j_impl.py b/lightrag/kg/neo4j_impl.py index 2df420df..3f2545a7 100644 --- a/lightrag/kg/neo4j_impl.py +++ b/lightrag/kg/neo4j_impl.py @@ -1028,3 +1028,26 @@ class Neo4JStorage(BaseGraphStorage): self, algorithm: str ) -> tuple[np.ndarray[Any, Any], list[str]]: raise NotImplementedError + + async def drop(self) -> dict[str, str]: + """Drop all data from storage and clean up resources + + This method will delete all nodes and relationships in the Neo4j database. + + Returns: + dict[str, str]: Operation status and message + - On success: {"status": "success", "message": "data dropped"} + - On failure: {"status": "error", "message": ""} + """ + try: + async with self._driver.session(database=self._DATABASE) as session: + # Delete all nodes and relationships + query = "MATCH (n) DETACH DELETE n" + result = await session.run(query) + await result.consume() # Ensure result is fully consumed + + logger.info(f"Process {os.getpid()} drop Neo4j database {self._DATABASE}") + return {"status": "success", "message": "data dropped"} + except Exception as e: + logger.error(f"Error dropping Neo4j database {self._DATABASE}: {e}") + return {"status": "error", "message": str(e)} diff --git a/lightrag/kg/networkx_impl.py b/lightrag/kg/networkx_impl.py index 324fe7af..99e0e223 100644 --- a/lightrag/kg/networkx_impl.py +++ b/lightrag/kg/networkx_impl.py @@ -42,6 +42,7 @@ class NetworkXStorage(BaseGraphStorage): ) nx.write_graphml(graph, file_name) + # TODO:deprecated, remove later @staticmethod def _stabilize_graph(graph: nx.Graph) -> nx.Graph: """Refer to https://github.com/microsoft/graphrag/index/graph/utils/stable_lcc.py @@ -424,3 +425,33 @@ class NetworkXStorage(BaseGraphStorage): return False # Return error return True + + async def drop(self) -> dict[str, str]: + """Drop all graph data from storage and clean up resources + + This method will: + 1. Remove the graph storage file if it exists + 2. Reset the graph to an empty state + 3. Update flags to notify other processes + 4. Trigger index_done_callback to save the empty state + + Returns: + dict[str, str]: Operation status and message + - On success: {"status": "success", "message": "data dropped"} + - On failure: {"status": "error", "message": ""} + """ + try: + async with self._storage_lock: + # delete _client_file_name + if os.path.exists(self._graphml_xml_file): + os.remove(self._graphml_xml_file) + self._graph = nx.Graph() + # Notify other processes that data has been updated + await set_all_update_flags(self.namespace) + # Reset own update flag to avoid self-reloading + self.storage_updated.value = False + logger.info(f"Process {os.getpid()} drop graph {self.namespace} (file:{self._graphml_xml_file})") + return {"status": "success", "message": "data dropped"} + except Exception as e: + logger.error(f"Error dropping graph {self.namespace}: {e}") + return {"status": "error", "message": str(e)} diff --git a/lightrag/kg/qdrant_impl.py b/lightrag/kg/qdrant_impl.py index e32c4335..855b98ae 100644 --- a/lightrag/kg/qdrant_impl.py +++ b/lightrag/kg/qdrant_impl.py @@ -8,18 +8,15 @@ import uuid from ..utils import logger from ..base import BaseVectorStorage import configparser - - -config = configparser.ConfigParser() -config.read("config.ini", "utf-8") - import pipmaster as pm if not pm.is_installed("qdrant-client"): pm.install("qdrant-client") -from qdrant_client import QdrantClient, models +from qdrant_client import QdrantClient, models # type: ignore +config = configparser.ConfigParser() +config.read("config.ini", "utf-8") def compute_mdhash_id_for_qdrant( content: str, prefix: str = "", style: str = "simple" @@ -275,3 +272,89 @@ class QdrantVectorDBStorage(BaseVectorStorage): except Exception as e: logger.error(f"Error searching for prefix '{prefix}': {e}") return [] + + async def get_by_id(self, id: str) -> dict[str, Any] | None: + """Get vector data by its ID + + Args: + id: The unique identifier of the vector + + Returns: + The vector data if found, or None if not found + """ + try: + # Convert to Qdrant compatible ID + qdrant_id = compute_mdhash_id_for_qdrant(id) + + # Retrieve the point by ID + result = self._client.retrieve( + collection_name=self.namespace, + ids=[qdrant_id], + with_payload=True, + ) + + if not result: + return None + + return result[0].payload + except Exception as e: + logger.error(f"Error retrieving vector data for ID {id}: {e}") + return None + + async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: + """Get multiple vector data by their IDs + + Args: + ids: List of unique identifiers + + Returns: + List of vector data objects that were found + """ + if not ids: + return [] + + try: + # Convert to Qdrant compatible IDs + qdrant_ids = [compute_mdhash_id_for_qdrant(id) for id in ids] + + # Retrieve the points by IDs + results = self._client.retrieve( + collection_name=self.namespace, + ids=qdrant_ids, + with_payload=True, + ) + + return [point.payload for point in results] + except Exception as e: + logger.error(f"Error retrieving vector data for IDs {ids}: {e}") + return [] + + async def drop(self) -> dict[str, str]: + """Drop all vector data from storage and clean up resources + + This method will delete all data from the Qdrant collection. + + Returns: + dict[str, str]: Operation status and message + - On success: {"status": "success", "message": "data dropped"} + - On failure: {"status": "error", "message": ""} + """ + try: + # Delete the collection and recreate it + if self._client.collection_exists(self.namespace): + self._client.delete_collection(self.namespace) + + # Recreate the collection + QdrantVectorDBStorage.create_collection_if_not_exist( + self._client, + self.namespace, + vectors_config=models.VectorParams( + size=self.embedding_func.embedding_dim, distance=models.Distance.COSINE + ), + ) + + logger.info(f"Process {os.getpid()} drop Qdrant collection {self.namespace}") + return {"status": "success", "message": "data dropped"} + except Exception as e: + logger.error(f"Error dropping Qdrant collection {self.namespace}: {e}") + return {"status": "error", "message": str(e)}