Add drop support for all storage type implementation for Mongo DB

This commit is contained in:
yangdx
2025-03-31 02:10:58 +08:00
parent b411ce2fed
commit 078cee390c

View File

@@ -25,13 +25,13 @@ if not pm.is_installed("pymongo"):
if not pm.is_installed("motor"): if not pm.is_installed("motor"):
pm.install("motor") pm.install("motor")
from motor.motor_asyncio import ( from motor.motor_asyncio import ( # type: ignore
AsyncIOMotorClient, AsyncIOMotorClient,
AsyncIOMotorDatabase, AsyncIOMotorDatabase,
AsyncIOMotorCollection, AsyncIOMotorCollection,
) )
from pymongo.operations import SearchIndexModel from pymongo.operations import SearchIndexModel # type: ignore
from pymongo.errors import PyMongoError from pymongo.errors import PyMongoError # type: ignore
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read("config.ini", "utf-8") config.read("config.ini", "utf-8")
@@ -150,6 +150,22 @@ class MongoKVStorage(BaseKVStorage):
# Mongo handles persistence automatically # Mongo handles persistence automatically
pass pass
async def drop(self) -> dict[str, str]:
"""Drop the storage by removing all documents in the collection.
Returns:
dict[str, str]: Status of the operation with keys 'status' and 'message'
"""
try:
result = await self._data.delete_many({})
deleted_count = result.deleted_count
logger.info(f"Dropped {deleted_count} documents from doc status {self._collection_name}")
return {"status": "success", "message": f"{deleted_count} documents dropped"}
except PyMongoError as e:
logger.error(f"Error dropping doc status {self._collection_name}: {e}")
return {"status": "error", "message": str(e)}
@final @final
@dataclass @dataclass
@@ -230,6 +246,22 @@ class MongoDocStatusStorage(DocStatusStorage):
# Mongo handles persistence automatically # Mongo handles persistence automatically
pass pass
async def drop(self) -> dict[str, str]:
"""Drop the storage by removing all documents in the collection.
Returns:
dict[str, str]: Status of the operation with keys 'status' and 'message'
"""
try:
result = await self._data.delete_many({})
deleted_count = result.deleted_count
logger.info(f"Dropped {deleted_count} documents from doc status {self._collection_name}")
return {"status": "success", "message": f"{deleted_count} documents dropped"}
except PyMongoError as e:
logger.error(f"Error dropping doc status {self._collection_name}: {e}")
return {"status": "error", "message": str(e)}
@final @final
@dataclass @dataclass
@@ -840,6 +872,22 @@ class MongoGraphStorage(BaseGraphStorage):
logger.debug(f"Successfully deleted edges: {edges}") logger.debug(f"Successfully deleted edges: {edges}")
async def drop(self) -> dict[str, str]:
"""Drop the storage by removing all documents in the collection.
Returns:
dict[str, str]: Status of the operation with keys 'status' and 'message'
"""
try:
result = await self.collection.delete_many({})
deleted_count = result.deleted_count
logger.info(f"Dropped {deleted_count} documents from graph {self._collection_name}")
return {"status": "success", "message": f"{deleted_count} documents dropped"}
except PyMongoError as e:
logger.error(f"Error dropping graph {self._collection_name}: {e}")
return {"status": "error", "message": str(e)}
@final @final
@dataclass @dataclass
@@ -1127,6 +1175,26 @@ class MongoVectorDBStorage(BaseVectorStorage):
logger.error(f"Error retrieving vector data for IDs {ids}: {e}") logger.error(f"Error retrieving vector data for IDs {ids}: {e}")
return [] return []
async def drop(self) -> dict[str, str]:
"""Drop the storage by removing all documents in the collection and recreating vector index.
Returns:
dict[str, str]: Status of the operation with keys 'status' and 'message'
"""
try:
# Delete all documents
result = await self._data.delete_many({})
deleted_count = result.deleted_count
# Recreate vector index
await self.create_vector_index_if_not_exists()
logger.info(f"Dropped {deleted_count} documents from vector storage {self._collection_name} and recreated vector index")
return {"status": "success", "message": f"{deleted_count} documents dropped and vector index recreated"}
except PyMongoError as e:
logger.error(f"Error dropping vector storage {self._collection_name}: {e}")
return {"status": "error", "message": str(e)}
async def get_or_create_collection(db: AsyncIOMotorDatabase, collection_name: str): async def get_or_create_collection(db: AsyncIOMotorDatabase, collection_name: str):
collection_names = await db.list_collection_names() collection_names = await db.list_collection_names()