Add drop funtions to storage implementations

This commit is contained in:
yangdx
2025-03-30 15:17:57 +08:00
parent 1a25a78e8a
commit 1df4b777d7
10 changed files with 339 additions and 15 deletions

View File

@@ -112,6 +112,31 @@ class StorageNameSpace(ABC):
async def index_done_callback(self) -> None:
"""Commit the storage operations after indexing"""
@abstractmethod
async def drop(self) -> dict[str, str]:
"""Drop all data from storage and clean up resources
This abstract method defines the contract for dropping all data from a storage implementation.
Each storage type must implement this method to:
1. Clear all data from memory and/or external storage
2. Remove any associated storage files if applicable
3. Reset the storage to its initial state
4. Handle cleanup of any resources
5. Notify other processes if necessary
Returns:
dict[str, str]: Operation status and message with the following format:
{
"status": str, # "success" or "error"
"message": str # "data dropped" on success, error details on failure
}
Implementation specific:
- On success: return {"status": "success", "message": "data dropped"}
- On failure: return {"status": "error", "message": "<error details>"}
- If not supported: return {"status": "error", "message": "unsupported"}
"""
@dataclass
class BaseVectorStorage(StorageNameSpace, ABC):

View File

@@ -1,4 +1,5 @@
import asyncio
import os
from dataclasses import dataclass
from typing import Any, final
import numpy as np
@@ -10,8 +11,8 @@ import pipmaster as pm
if not pm.is_installed("chromadb"):
pm.install("chromadb")
from chromadb import HttpClient, PersistentClient
from chromadb.config import Settings
from chromadb import HttpClient, PersistentClient # type: ignore
from chromadb.config import Settings # type: ignore
@final
@@ -335,3 +336,26 @@ class ChromaVectorDBStorage(BaseVectorStorage):
except Exception as e:
logger.error(f"Error retrieving vector data for IDs {ids}: {e}")
return []
async def drop(self) -> dict[str, str]:
"""Drop all vector data from storage and clean up resources
This method will delete all documents from the ChromaDB collection.
Returns:
dict[str, str]: Operation status and message
- On success: {"status": "success", "message": "data dropped"}
- On failure: {"status": "error", "message": "<error details>"}
"""
try:
# Get all IDs in the collection
result = self._collection.get(include=[])
if result and result["ids"] and len(result["ids"]) > 0:
# Delete all documents
self._collection.delete(ids=result["ids"])
logger.info(f"Process {os.getpid()} drop ChromaDB collection {self.namespace}")
return {"status": "success", "message": "data dropped"}
except Exception as e:
logger.error(f"Error dropping ChromaDB collection {self.namespace}: {e}")
return {"status": "error", "message": str(e)}

View File

@@ -429,3 +429,38 @@ class FaissVectorDBStorage(BaseVectorStorage):
results.append({**metadata, "id": metadata.get("__id__")})
return results
async def drop(self) -> dict[str, str]:
"""Drop all vector data from storage and clean up resources
This method will remove all vectors from the Faiss index and delete the storage files.
Returns:
dict[str, str]: Operation status and message
- On success: {"status": "success", "message": "data dropped"}
- On failure: {"status": "error", "message": "<error details>"}
"""
try:
async with self._storage_lock:
# Reset the index
self._index = faiss.IndexFlatIP(self._dim)
self._id_to_meta = {}
# Remove storage files if they exist
if os.path.exists(self._faiss_index_file):
os.remove(self._faiss_index_file)
if os.path.exists(self._meta_file):
os.remove(self._meta_file)
self._id_to_meta = {}
self._load_faiss_index()
# Notify other processes
await set_all_update_flags(self.namespace)
self.storage_updated.value = False
logger.info(f"Process {os.getpid()} drop FAISS index {self.namespace}")
return {"status": "success", "message": "data dropped"}
except Exception as e:
logger.error(f"Error dropping FAISS index {self.namespace}: {e}")
return {"status": "error", "message": str(e)}

View File

@@ -129,9 +129,25 @@ class JsonDocStatusStorage(DocStatusStorage):
await set_all_update_flags(self.namespace)
await self.index_done_callback()
async def drop(self) -> None:
"""Drop the storage"""
async def drop(self) -> dict[str, str]:
"""Drop all document status data from storage and clean up resources
This method will:
1. Clear all document status data from memory
2. Update flags to notify other processes
3. Trigger index_done_callback to save the empty state
Returns:
dict[str, str]: Operation status and message
- On success: {"status": "success", "message": "data dropped"}
- On failure: {"status": "error", "message": "<error details>"}
"""
try:
async with self._storage_lock:
self._data.clear()
await set_all_update_flags(self.namespace)
self._data.update({})
await self.index_done_callback()
logger.info(f"Process {os.getpid()} drop {self.namespace}")
return {"status": "success", "message": "data dropped"}
except Exception as e:
logger.error(f"Error dropping {self.namespace}: {e}")
return {"status": "error", "message": str(e)}

View File

@@ -127,3 +127,26 @@ class JsonKVStorage(BaseKVStorage):
self._data.pop(doc_id, None)
await set_all_update_flags(self.namespace)
await self.index_done_callback()
async def drop(self) -> dict[str, str]:
"""Drop all data from storage and clean up resources
This method will:
1. Clear all data from memory
2. Update flags to notify other processes
3. Trigger index_done_callback to save the empty state
Returns:
dict[str, str]: Operation status and message
- On success: {"status": "success", "message": "data dropped"}
- On failure: {"status": "error", "message": "<error details>"}
"""
try:
async with self._storage_lock:
self._data.update({})
await self.index_done_callback()
logger.info(f"Process {os.getpid()} drop {self.namespace}")
return {"status": "success", "message": "data dropped"}
except Exception as e:
logger.error(f"Error dropping {self.namespace}: {e}")
return {"status": "error", "message": str(e)}

View File

@@ -15,7 +15,7 @@ if not pm.is_installed("pymilvus"):
pm.install("pymilvus")
import configparser
from pymilvus import MilvusClient
from pymilvus import MilvusClient # type: ignore
config = configparser.ConfigParser()
config.read("config.ini", "utf-8")
@@ -287,3 +287,31 @@ class MilvusVectorDBStorage(BaseVectorStorage):
except Exception as e:
logger.error(f"Error retrieving vector data for IDs {ids}: {e}")
return []
async def drop(self) -> dict[str, str]:
"""Drop all vector data from storage and clean up resources
This method will delete all data from the Milvus collection.
Returns:
dict[str, str]: Operation status and message
- On success: {"status": "success", "message": "data dropped"}
- On failure: {"status": "error", "message": "<error details>"}
"""
try:
# Drop the collection and recreate it
if self._client.has_collection(self.namespace):
self._client.drop_collection(self.namespace)
# Recreate the collection
MilvusVectorDBStorage.create_collection_if_not_exist(
self._client,
self.namespace,
dimension=self.embedding_func.embedding_dim,
)
logger.info(f"Process {os.getpid()} drop Milvus collection {self.namespace}")
return {"status": "success", "message": "data dropped"}
except Exception as e:
logger.error(f"Error dropping Milvus collection {self.namespace}: {e}")
return {"status": "error", "message": str(e)}

View File

@@ -280,3 +280,39 @@ class NanoVectorDBStorage(BaseVectorStorage):
client = await self._get_client()
return client.get(ids)
async def drop(self) -> dict[str, str]:
"""Drop all vector data from storage and clean up resources
This method will:
1. Remove the vector database storage file if it exists
2. Reinitialize the vector database client
3. Update flags to notify other processes
4. Trigger index_done_callback to save the empty state
Returns:
dict[str, str]: Operation status and message
- On success: {"status": "success", "message": "data dropped"}
- On failure: {"status": "error", "message": "<error details>"}
"""
try:
async with self._storage_lock:
# delete _client_file_name
if os.path.exists(self._client_file_name):
os.remove(self._client_file_name)
self._client = NanoVectorDB(
self.embedding_func.embedding_dim,
storage_file=self._client_file_name,
)
# Notify other processes that data has been updated
await set_all_update_flags(self.namespace)
# Reset own update flag to avoid self-reloading
self.storage_updated.value = False
logger.info(f"Process {os.getpid()} drop {self.namespace}(file:{self._client_file_name})")
return {"status": "success", "message": "data dropped"}
except Exception as e:
logger.error(f"Error dropping {self.namespace}: {e}")
return {"status": "error", "message": str(e)}

View File

@@ -1028,3 +1028,26 @@ class Neo4JStorage(BaseGraphStorage):
self, algorithm: str
) -> tuple[np.ndarray[Any, Any], list[str]]:
raise NotImplementedError
async def drop(self) -> dict[str, str]:
"""Drop all data from storage and clean up resources
This method will delete all nodes and relationships in the Neo4j database.
Returns:
dict[str, str]: Operation status and message
- On success: {"status": "success", "message": "data dropped"}
- On failure: {"status": "error", "message": "<error details>"}
"""
try:
async with self._driver.session(database=self._DATABASE) as session:
# Delete all nodes and relationships
query = "MATCH (n) DETACH DELETE n"
result = await session.run(query)
await result.consume() # Ensure result is fully consumed
logger.info(f"Process {os.getpid()} drop Neo4j database {self._DATABASE}")
return {"status": "success", "message": "data dropped"}
except Exception as e:
logger.error(f"Error dropping Neo4j database {self._DATABASE}: {e}")
return {"status": "error", "message": str(e)}

View File

@@ -42,6 +42,7 @@ class NetworkXStorage(BaseGraphStorage):
)
nx.write_graphml(graph, file_name)
# TODOdeprecated, remove later
@staticmethod
def _stabilize_graph(graph: nx.Graph) -> nx.Graph:
"""Refer to https://github.com/microsoft/graphrag/index/graph/utils/stable_lcc.py
@@ -424,3 +425,33 @@ class NetworkXStorage(BaseGraphStorage):
return False # Return error
return True
async def drop(self) -> dict[str, str]:
"""Drop all graph data from storage and clean up resources
This method will:
1. Remove the graph storage file if it exists
2. Reset the graph to an empty state
3. Update flags to notify other processes
4. Trigger index_done_callback to save the empty state
Returns:
dict[str, str]: Operation status and message
- On success: {"status": "success", "message": "data dropped"}
- On failure: {"status": "error", "message": "<error details>"}
"""
try:
async with self._storage_lock:
# delete _client_file_name
if os.path.exists(self._graphml_xml_file):
os.remove(self._graphml_xml_file)
self._graph = nx.Graph()
# Notify other processes that data has been updated
await set_all_update_flags(self.namespace)
# Reset own update flag to avoid self-reloading
self.storage_updated.value = False
logger.info(f"Process {os.getpid()} drop graph {self.namespace} (file:{self._graphml_xml_file})")
return {"status": "success", "message": "data dropped"}
except Exception as e:
logger.error(f"Error dropping graph {self.namespace}: {e}")
return {"status": "error", "message": str(e)}

View File

@@ -8,18 +8,15 @@ import uuid
from ..utils import logger
from ..base import BaseVectorStorage
import configparser
config = configparser.ConfigParser()
config.read("config.ini", "utf-8")
import pipmaster as pm
if not pm.is_installed("qdrant-client"):
pm.install("qdrant-client")
from qdrant_client import QdrantClient, models
from qdrant_client import QdrantClient, models # type: ignore
config = configparser.ConfigParser()
config.read("config.ini", "utf-8")
def compute_mdhash_id_for_qdrant(
content: str, prefix: str = "", style: str = "simple"
@@ -275,3 +272,89 @@ class QdrantVectorDBStorage(BaseVectorStorage):
except Exception as e:
logger.error(f"Error searching for prefix '{prefix}': {e}")
return []
async def get_by_id(self, id: str) -> dict[str, Any] | None:
"""Get vector data by its ID
Args:
id: The unique identifier of the vector
Returns:
The vector data if found, or None if not found
"""
try:
# Convert to Qdrant compatible ID
qdrant_id = compute_mdhash_id_for_qdrant(id)
# Retrieve the point by ID
result = self._client.retrieve(
collection_name=self.namespace,
ids=[qdrant_id],
with_payload=True,
)
if not result:
return None
return result[0].payload
except Exception as e:
logger.error(f"Error retrieving vector data for ID {id}: {e}")
return None
async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
"""Get multiple vector data by their IDs
Args:
ids: List of unique identifiers
Returns:
List of vector data objects that were found
"""
if not ids:
return []
try:
# Convert to Qdrant compatible IDs
qdrant_ids = [compute_mdhash_id_for_qdrant(id) for id in ids]
# Retrieve the points by IDs
results = self._client.retrieve(
collection_name=self.namespace,
ids=qdrant_ids,
with_payload=True,
)
return [point.payload for point in results]
except Exception as e:
logger.error(f"Error retrieving vector data for IDs {ids}: {e}")
return []
async def drop(self) -> dict[str, str]:
"""Drop all vector data from storage and clean up resources
This method will delete all data from the Qdrant collection.
Returns:
dict[str, str]: Operation status and message
- On success: {"status": "success", "message": "data dropped"}
- On failure: {"status": "error", "message": "<error details>"}
"""
try:
# Delete the collection and recreate it
if self._client.collection_exists(self.namespace):
self._client.delete_collection(self.namespace)
# Recreate the collection
QdrantVectorDBStorage.create_collection_if_not_exist(
self._client,
self.namespace,
vectors_config=models.VectorParams(
size=self.embedding_func.embedding_dim, distance=models.Distance.COSINE
),
)
logger.info(f"Process {os.getpid()} drop Qdrant collection {self.namespace}")
return {"status": "success", "message": "data dropped"}
except Exception as e:
logger.error(f"Error dropping Qdrant collection {self.namespace}: {e}")
return {"status": "error", "message": str(e)}