diff --git a/lightrag/kg/mongo_impl.py b/lightrag/kg/mongo_impl.py index e4ae0a8d..8a9f1f3a 100644 --- a/lightrag/kg/mongo_impl.py +++ b/lightrag/kg/mongo_impl.py @@ -25,13 +25,13 @@ if not pm.is_installed("pymongo"): if not pm.is_installed("motor"): pm.install("motor") -from motor.motor_asyncio import ( +from motor.motor_asyncio import ( # type: ignore AsyncIOMotorClient, AsyncIOMotorDatabase, AsyncIOMotorCollection, ) -from pymongo.operations import SearchIndexModel -from pymongo.errors import PyMongoError +from pymongo.operations import SearchIndexModel # type: ignore +from pymongo.errors import PyMongoError # type: ignore config = configparser.ConfigParser() config.read("config.ini", "utf-8") @@ -149,6 +149,22 @@ class MongoKVStorage(BaseKVStorage): async def index_done_callback(self) -> None: # Mongo handles persistence automatically pass + + async def drop(self) -> dict[str, str]: + """Drop the storage by removing all documents in the collection. + + Returns: + dict[str, str]: Status of the operation with keys 'status' and 'message' + """ + try: + result = await self._data.delete_many({}) + deleted_count = result.deleted_count + + logger.info(f"Dropped {deleted_count} documents from doc status {self._collection_name}") + return {"status": "success", "message": f"{deleted_count} documents dropped"} + except PyMongoError as e: + logger.error(f"Error dropping doc status {self._collection_name}: {e}") + return {"status": "error", "message": str(e)} @final @@ -229,6 +245,22 @@ class MongoDocStatusStorage(DocStatusStorage): async def index_done_callback(self) -> None: # Mongo handles persistence automatically pass + + async def drop(self) -> dict[str, str]: + """Drop the storage by removing all documents in the collection. + + Returns: + dict[str, str]: Status of the operation with keys 'status' and 'message' + """ + try: + result = await self._data.delete_many({}) + deleted_count = result.deleted_count + + logger.info(f"Dropped {deleted_count} documents from doc status {self._collection_name}") + return {"status": "success", "message": f"{deleted_count} documents dropped"} + except PyMongoError as e: + logger.error(f"Error dropping doc status {self._collection_name}: {e}") + return {"status": "error", "message": str(e)} @final @@ -840,6 +872,22 @@ class MongoGraphStorage(BaseGraphStorage): logger.debug(f"Successfully deleted edges: {edges}") + async def drop(self) -> dict[str, str]: + """Drop the storage by removing all documents in the collection. + + Returns: + dict[str, str]: Status of the operation with keys 'status' and 'message' + """ + try: + result = await self.collection.delete_many({}) + deleted_count = result.deleted_count + + logger.info(f"Dropped {deleted_count} documents from graph {self._collection_name}") + return {"status": "success", "message": f"{deleted_count} documents dropped"} + except PyMongoError as e: + logger.error(f"Error dropping graph {self._collection_name}: {e}") + return {"status": "error", "message": str(e)} + @final @dataclass @@ -1126,6 +1174,26 @@ class MongoVectorDBStorage(BaseVectorStorage): except Exception as e: logger.error(f"Error retrieving vector data for IDs {ids}: {e}") return [] + + async def drop(self) -> dict[str, str]: + """Drop the storage by removing all documents in the collection and recreating vector index. + + Returns: + dict[str, str]: Status of the operation with keys 'status' and 'message' + """ + try: + # Delete all documents + result = await self._data.delete_many({}) + deleted_count = result.deleted_count + + # Recreate vector index + await self.create_vector_index_if_not_exists() + + logger.info(f"Dropped {deleted_count} documents from vector storage {self._collection_name} and recreated vector index") + return {"status": "success", "message": f"{deleted_count} documents dropped and vector index recreated"} + except PyMongoError as e: + logger.error(f"Error dropping vector storage {self._collection_name}: {e}") + return {"status": "error", "message": str(e)} async def get_or_create_collection(db: AsyncIOMotorDatabase, collection_name: str):