From 95a8ee27ed707ab2e0ee4aa440fae3fafbb655a5 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 31 Mar 2025 23:22:27 +0800 Subject: [PATCH] Fix linting --- lightrag/api/routers/document_routes.py | 64 ++++++++++--------- lightrag/base.py | 29 ++++----- lightrag/kg/age_impl.py | 10 +-- lightrag/kg/chroma_impl.py | 16 ++--- lightrag/kg/faiss_impl.py | 10 +-- lightrag/kg/gremlin_impl.py | 12 ++-- lightrag/kg/json_doc_status_impl.py | 14 ++--- lightrag/kg/json_kv_impl.py | 23 ++++--- lightrag/kg/milvus_impl.py | 16 ++--- lightrag/kg/mongo_impl.py | 82 ++++++++++++++++--------- lightrag/kg/nano_vector_db_impl.py | 8 ++- lightrag/kg/neo4j_impl.py | 12 ++-- lightrag/kg/networkx_impl.py | 8 ++- lightrag/kg/oracle_impl.py | 50 +++++++++------ lightrag/kg/postgres_impl.py | 58 ++++++++++------- lightrag/kg/qdrant_impl.py | 36 ++++++----- lightrag/kg/redis_impl.py | 24 ++++---- lightrag/kg/tidb_impl.py | 46 ++++++++------ 18 files changed, 296 insertions(+), 222 deletions(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 987695f7..144b1274 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -60,7 +60,9 @@ class InsertResponse(BaseModel): class ClearDocumentsResponse(BaseModel): - status: str = Field(description="Status of the clear operation: success/partial_success/busy/fail") + status: str = Field( + description="Status of the clear operation: success/partial_success/busy/fail" + ) message: str = Field(description="Message describing the operation result") @@ -448,7 +450,7 @@ async def pipeline_index_texts(rag: LightRAG, texts: List[str]): await rag.apipeline_process_enqueue_documents() -# TODO: deprecate after /insert_file is removed +# TODO: deprecate after /insert_file is removed async def save_temp_file(input_dir: Path, file: UploadFile = File(...)) -> Path: """Save the uploaded file to a temporary location @@ -783,7 +785,10 @@ def create_document_routes( HTTPException: Raised when a serious error occurs during the clearing process, with status code 500 and error details in the detail field. """ - from lightrag.kg.shared_storage import get_namespace_data, get_pipeline_status_lock + from lightrag.kg.shared_storage import ( + get_namespace_data, + get_pipeline_status_lock, + ) # Get pipeline status and lock pipeline_status = await get_namespace_data("pipeline_status") @@ -794,14 +799,16 @@ def create_document_routes( if pipeline_status.get("busy", False): return ClearDocumentsResponse( status="busy", - message="Cannot clear documents while pipeline is busy" + message="Cannot clear documents while pipeline is busy", ) # Set busy to true pipeline_status["busy"] = True pipeline_status["job_name"] = "Clearing Documents" pipeline_status["latest_message"] = "Starting document clearing process" if "history_messages" in pipeline_status: - pipeline_status["history_messages"].append("Starting document clearing process") + pipeline_status["history_messages"].append( + "Starting document clearing process" + ) try: # Use drop method to clear all data @@ -813,25 +820,27 @@ def create_document_routes( rag.relationships_vdb, rag.chunks_vdb, rag.chunk_entity_relation_graph, - rag.doc_status + rag.doc_status, ] - + # Log storage drop start if "history_messages" in pipeline_status: - pipeline_status["history_messages"].append("Starting to drop storage components") - + pipeline_status["history_messages"].append( + "Starting to drop storage components" + ) + for storage in storages: if storage is not None: drop_tasks.append(storage.drop()) - + # Wait for all drop tasks to complete drop_results = await asyncio.gather(*drop_tasks, return_exceptions=True) - + # Check for errors and log results errors = [] storage_success_count = 0 storage_error_count = 0 - + for i, result in enumerate(drop_results): storage_name = storages[i].__class__.__name__ if isinstance(result, Exception): @@ -842,7 +851,7 @@ def create_document_routes( else: logger.info(f"Successfully dropped {storage_name}") storage_success_count += 1 - + # Log storage drop results if "history_messages" in pipeline_status: if storage_error_count > 0: @@ -853,26 +862,25 @@ def create_document_routes( pipeline_status["history_messages"].append( f"Successfully dropped all {storage_success_count} storage components" ) - + # If all storage operations failed, return error status and don't proceed with file deletion if storage_success_count == 0 and storage_error_count > 0: error_message = "All storage drop operations failed. Aborting document clearing process." logger.error(error_message) if "history_messages" in pipeline_status: pipeline_status["history_messages"].append(error_message) - return ClearDocumentsResponse( - status="fail", - message=error_message - ) - + return ClearDocumentsResponse(status="fail", message=error_message) + # Log file deletion start if "history_messages" in pipeline_status: - pipeline_status["history_messages"].append("Starting to delete files in input directory") - + pipeline_status["history_messages"].append( + "Starting to delete files in input directory" + ) + # Delete all files in input_dir deleted_files_count = 0 file_errors_count = 0 - + for file_path in doc_manager.input_dir.glob("**/*"): if file_path.is_file(): try: @@ -881,7 +889,7 @@ def create_document_routes( except Exception as e: logger.error(f"Error deleting file {file_path}: {str(e)}") file_errors_count += 1 - + # Log file deletion results if "history_messages" in pipeline_status: if file_errors_count > 0: @@ -893,7 +901,7 @@ def create_document_routes( pipeline_status["history_messages"].append( f"Successfully deleted {deleted_files_count} files" ) - + # Prepare final result message final_message = "" if errors: @@ -903,16 +911,12 @@ def create_document_routes( final_message = f"All documents cleared successfully. Deleted {deleted_files_count} files." status = "success" - # Log final result if "history_messages" in pipeline_status: pipeline_status["history_messages"].append(final_message) - + # Return response based on results - return ClearDocumentsResponse( - status=status, - message=final_message - ) + return ClearDocumentsResponse(status=status, message=final_message) except Exception as e: error_msg = f"Error clearing documents: {str(e)}" logger.error(error_msg) diff --git a/lightrag/base.py b/lightrag/base.py index 05f30d3c..223cc7c9 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -111,11 +111,11 @@ class StorageNameSpace(ABC): @abstractmethod async def index_done_callback(self) -> None: """Commit the storage operations after indexing""" - + @abstractmethod async def drop(self) -> dict[str, str]: """Drop all data from storage and clean up resources - + This abstract method defines the contract for dropping all data from a storage implementation. Each storage type must implement this method to: 1. Clear all data from memory and/or external storage @@ -124,14 +124,14 @@ class StorageNameSpace(ABC): 4. Handle cleanup of any resources 5. Notify other processes if necessary 6. This action should persistent the data to disk immediately. - + Returns: dict[str, str]: Operation status and message with the following format: { "status": str, # "success" or "error" "message": str # "data dropped" on success, error details on failure } - + Implementation specific: - On success: return {"status": "success", "message": "data dropped"} - On failure: return {"status": "error", "message": ""} @@ -238,42 +238,43 @@ class BaseKVStorage(StorageNameSpace, ABC): @abstractmethod async def upsert(self, data: dict[str, dict[str, Any]]) -> None: """Upsert data - + Importance notes for in-memory storage: 1. Changes will be persisted to disk during the next index_done_callback - 2. update flags to notify other processes that data persistence is needed + 2. update flags to notify other processes that data persistence is needed """ @abstractmethod async def delete(self, ids: list[str]) -> None: """Delete specific records from storage by their IDs - + Importance notes for in-memory storage: 1. Changes will be persisted to disk during the next index_done_callback 2. update flags to notify other processes that data persistence is needed - + Args: ids (list[str]): List of document IDs to be deleted from storage - + Returns: None """ - async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: + async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: """Delete specific records from storage by cache mode - + Importance notes for in-memory storage: 1. Changes will be persisted to disk during the next index_done_callback 2. update flags to notify other processes that data persistence is needed - + Args: modes (list[str]): List of cache modes to be dropped from storage - + Returns: True: if the cache drop successfully False: if the cache drop failed, or the cache mode is not supported """ + @dataclass class BaseGraphStorage(StorageNameSpace, ABC): embedding_func: EmbeddingFunc @@ -394,7 +395,7 @@ class DocStatusStorage(BaseKVStorage, ABC): ) -> dict[str, DocProcessingStatus]: """Get all documents with a specific status""" - async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: + async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: """Drop cache is not supported for Doc Status storage""" return False diff --git a/lightrag/kg/age_impl.py b/lightrag/kg/age_impl.py index 8530e12d..b744ae1e 100644 --- a/lightrag/kg/age_impl.py +++ b/lightrag/kg/age_impl.py @@ -34,9 +34,9 @@ if not pm.is_installed("psycopg-pool"): if not pm.is_installed("asyncpg"): pm.install("asyncpg") -import psycopg # type: ignore -from psycopg.rows import namedtuple_row # type: ignore -from psycopg_pool import AsyncConnectionPool, PoolTimeout # type: ignore +import psycopg # type: ignore +from psycopg.rows import namedtuple_row # type: ignore +from psycopg_pool import AsyncConnectionPool, PoolTimeout # type: ignore class AGEQueryException(Exception): @@ -871,10 +871,10 @@ class AGEStorage(BaseGraphStorage): async def index_done_callback(self) -> None: # AGES handles persistence automatically pass - + async def drop(self) -> dict[str, str]: """Drop the storage by removing all nodes and relationships in the graph. - + Returns: dict[str, str]: Status of the operation with keys 'status' and 'message' """ diff --git a/lightrag/kg/chroma_impl.py b/lightrag/kg/chroma_impl.py index 052088d4..020e358f 100644 --- a/lightrag/kg/chroma_impl.py +++ b/lightrag/kg/chroma_impl.py @@ -11,8 +11,8 @@ import pipmaster as pm if not pm.is_installed("chromadb"): pm.install("chromadb") -from chromadb import HttpClient, PersistentClient # type: ignore -from chromadb.config import Settings # type: ignore +from chromadb import HttpClient, PersistentClient # type: ignore +from chromadb.config import Settings # type: ignore @final @@ -336,12 +336,12 @@ class ChromaVectorDBStorage(BaseVectorStorage): except Exception as e: logger.error(f"Error retrieving vector data for IDs {ids}: {e}") return [] - + async def drop(self) -> dict[str, str]: """Drop all vector data from storage and clean up resources - + This method will delete all documents from the ChromaDB collection. - + Returns: dict[str, str]: Operation status and message - On success: {"status": "success", "message": "data dropped"} @@ -353,8 +353,10 @@ class ChromaVectorDBStorage(BaseVectorStorage): if result and result["ids"] and len(result["ids"]) > 0: # Delete all documents self._collection.delete(ids=result["ids"]) - - logger.info(f"Process {os.getpid()} drop ChromaDB collection {self.namespace}") + + logger.info( + f"Process {os.getpid()} drop ChromaDB collection {self.namespace}" + ) return {"status": "success", "message": "data dropped"} except Exception as e: logger.error(f"Error dropping ChromaDB collection {self.namespace}: {e}") diff --git a/lightrag/kg/faiss_impl.py b/lightrag/kg/faiss_impl.py index 1e0659cb..cf870b3a 100644 --- a/lightrag/kg/faiss_impl.py +++ b/lightrag/kg/faiss_impl.py @@ -443,10 +443,10 @@ class FaissVectorDBStorage(BaseVectorStorage): results.append({**metadata, "id": metadata.get("__id__")}) return results - + async def drop(self) -> dict[str, str]: """Drop all vector data from storage and clean up resources - + This method will: 1. Remove the vector database storage file if it exists 2. Reinitialize the vector database client @@ -454,7 +454,7 @@ class FaissVectorDBStorage(BaseVectorStorage): 4. Changes is persisted to disk immediately This method will remove all vectors from the Faiss index and delete the storage files. - + Returns: dict[str, str]: Operation status and message - On success: {"status": "success", "message": "data dropped"} @@ -465,7 +465,7 @@ class FaissVectorDBStorage(BaseVectorStorage): # Reset the index self._index = faiss.IndexFlatIP(self._dim) self._id_to_meta = {} - + # Remove storage files if they exist if os.path.exists(self._faiss_index_file): os.remove(self._faiss_index_file) @@ -478,7 +478,7 @@ class FaissVectorDBStorage(BaseVectorStorage): # Notify other processes await set_all_update_flags(self.namespace) self.storage_updated.value = False - + logger.info(f"Process {os.getpid()} drop FAISS index {self.namespace}") return {"status": "success", "message": "data dropped"} except Exception as e: diff --git a/lightrag/kg/gremlin_impl.py b/lightrag/kg/gremlin_impl.py index d616a409..e27c561e 100644 --- a/lightrag/kg/gremlin_impl.py +++ b/lightrag/kg/gremlin_impl.py @@ -24,9 +24,9 @@ from ..base import BaseGraphStorage if not pm.is_installed("gremlinpython"): pm.install("gremlinpython") -from gremlin_python.driver import client, serializer # type: ignore -from gremlin_python.driver.aiohttp.transport import AiohttpTransport # type: ignore -from gremlin_python.driver.protocol import GremlinServerError # type: ignore +from gremlin_python.driver import client, serializer # type: ignore +from gremlin_python.driver.aiohttp.transport import AiohttpTransport # type: ignore +from gremlin_python.driver.protocol import GremlinServerError # type: ignore @final @@ -695,13 +695,13 @@ class GremlinStorage(BaseGraphStorage): except Exception as e: logger.error(f"Error during edge deletion: {str(e)}") raise - + async def drop(self) -> dict[str, str]: """Drop the storage by removing all nodes and relationships in the graph. - + This function deletes all nodes with the specified graph name property, which automatically removes all associated edges. - + Returns: dict[str, str]: Status of the operation with keys 'status' and 'message' """ diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index 003a6733..a1d10a62 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -112,7 +112,7 @@ class JsonDocStatusStorage(DocStatusStorage): """ Importance notes for in-memory storage: 1. Changes will be persisted to disk during the next index_done_callback - 2. update flags to notify other processes that data persistence is needed + 2. update flags to notify other processes that data persistence is needed """ if not data: return @@ -129,14 +129,14 @@ class JsonDocStatusStorage(DocStatusStorage): async def delete(self, doc_ids: list[str]) -> None: """Delete specific records from storage by their IDs - + Importance notes for in-memory storage: 1. Changes will be persisted to disk during the next index_done_callback - 2. update flags to notify other processes that data persistence is needed - + 2. update flags to notify other processes that data persistence is needed + Args: ids (list[str]): List of document IDs to be deleted from storage - + Returns: None """ @@ -147,12 +147,12 @@ class JsonDocStatusStorage(DocStatusStorage): async def drop(self) -> dict[str, str]: """Drop all document status data from storage and clean up resources - + This method will: 1. Clear all document status data from memory 2. Update flags to notify other processes 3. Trigger index_done_callback to save the empty state - + Returns: dict[str, str]: Operation status and message - On success: {"status": "success", "message": "data dropped"} diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index 8857aa9a..79a043c8 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -117,7 +117,7 @@ class JsonKVStorage(BaseKVStorage): """ Importance notes for in-memory storage: 1. Changes will be persisted to disk during the next index_done_callback - 2. update flags to notify other processes that data persistence is needed + 2. update flags to notify other processes that data persistence is needed """ if not data: return @@ -128,14 +128,14 @@ class JsonKVStorage(BaseKVStorage): async def delete(self, ids: list[str]) -> None: """Delete specific records from storage by their IDs - + Importance notes for in-memory storage: 1. Changes will be persisted to disk during the next index_done_callback 2. update flags to notify other processes that data persistence is needed - + Args: ids (list[str]): List of document IDs to be deleted from storage - + Returns: None """ @@ -144,39 +144,38 @@ class JsonKVStorage(BaseKVStorage): self._data.pop(doc_id, None) await set_all_update_flags(self.namespace) - async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: + async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: """Delete specific records from storage by by cache mode - + Importance notes for in-memory storage: 1. Changes will be persisted to disk during the next index_done_callback 2. update flags to notify other processes that data persistence is needed - + Args: ids (list[str]): List of cache mode to be drop from storage - + Returns: True: if the cache drop successfully False: if the cache drop failed """ if not modes: return False - + try: await self.delete(modes) return True except Exception: return False - async def drop(self) -> dict[str, str]: """Drop all data from storage and clean up resources This action will persistent the data to disk immediately. - + This method will: 1. Clear all data from memory 2. Update flags to notify other processes 3. Trigger index_done_callback to save the empty state - + Returns: dict[str, str]: Operation status and message - On success: {"status": "success", "message": "data dropped"} diff --git a/lightrag/kg/milvus_impl.py b/lightrag/kg/milvus_impl.py index 74cf416a..2cff0079 100644 --- a/lightrag/kg/milvus_impl.py +++ b/lightrag/kg/milvus_impl.py @@ -15,7 +15,7 @@ if not pm.is_installed("pymilvus"): pm.install("pymilvus") import configparser -from pymilvus import MilvusClient # type: ignore +from pymilvus import MilvusClient # type: ignore config = configparser.ConfigParser() config.read("config.ini", "utf-8") @@ -287,12 +287,12 @@ class MilvusVectorDBStorage(BaseVectorStorage): except Exception as e: logger.error(f"Error retrieving vector data for IDs {ids}: {e}") return [] - + async def drop(self) -> dict[str, str]: """Drop all vector data from storage and clean up resources - + This method will delete all data from the Milvus collection. - + Returns: dict[str, str]: Operation status and message - On success: {"status": "success", "message": "data dropped"} @@ -302,15 +302,17 @@ class MilvusVectorDBStorage(BaseVectorStorage): # Drop the collection and recreate it if self._client.has_collection(self.namespace): self._client.drop_collection(self.namespace) - + # Recreate the collection MilvusVectorDBStorage.create_collection_if_not_exist( self._client, self.namespace, dimension=self.embedding_func.embedding_dim, ) - - logger.info(f"Process {os.getpid()} drop Milvus collection {self.namespace}") + + logger.info( + f"Process {os.getpid()} drop Milvus collection {self.namespace}" + ) return {"status": "success", "message": "data dropped"} except Exception as e: logger.error(f"Error dropping Milvus collection {self.namespace}: {e}") diff --git a/lightrag/kg/mongo_impl.py b/lightrag/kg/mongo_impl.py index 24c215be..dd4f7447 100644 --- a/lightrag/kg/mongo_impl.py +++ b/lightrag/kg/mongo_impl.py @@ -25,13 +25,13 @@ if not pm.is_installed("pymongo"): if not pm.is_installed("motor"): pm.install("motor") -from motor.motor_asyncio import ( # type: ignore +from motor.motor_asyncio import ( # type: ignore AsyncIOMotorClient, AsyncIOMotorDatabase, AsyncIOMotorCollection, ) -from pymongo.operations import SearchIndexModel # type: ignore -from pymongo.errors import PyMongoError # type: ignore +from pymongo.operations import SearchIndexModel # type: ignore +from pymongo.errors import PyMongoError # type: ignore config = configparser.ConfigParser() config.read("config.ini", "utf-8") @@ -149,34 +149,36 @@ class MongoKVStorage(BaseKVStorage): async def index_done_callback(self) -> None: # Mongo handles persistence automatically pass - + async def delete(self, ids: list[str]) -> None: """Delete documents with specified IDs - + Args: ids: List of document IDs to be deleted """ if not ids: return - + try: result = await self._data.delete_many({"_id": {"$in": ids}}) - logger.info(f"Deleted {result.deleted_count} documents from {self.namespace}") + logger.info( + f"Deleted {result.deleted_count} documents from {self.namespace}" + ) except PyMongoError as e: logger.error(f"Error deleting documents from {self.namespace}: {e}") - + async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: """Delete specific records from storage by cache mode - + Args: modes (list[str]): List of cache modes to be dropped from storage - + Returns: bool: True if successful, False otherwise """ if not modes: return False - + try: # Build regex pattern to match documents with the specified modes pattern = f"^({'|'.join(modes)})_" @@ -189,16 +191,21 @@ class MongoKVStorage(BaseKVStorage): async def drop(self) -> dict[str, str]: """Drop the storage by removing all documents in the collection. - + Returns: dict[str, str]: Status of the operation with keys 'status' and 'message' """ try: result = await self._data.delete_many({}) deleted_count = result.deleted_count - - logger.info(f"Dropped {deleted_count} documents from doc status {self._collection_name}") - return {"status": "success", "message": f"{deleted_count} documents dropped"} + + logger.info( + f"Dropped {deleted_count} documents from doc status {self._collection_name}" + ) + return { + "status": "success", + "message": f"{deleted_count} documents dropped", + } except PyMongoError as e: logger.error(f"Error dropping doc status {self._collection_name}: {e}") return {"status": "error", "message": str(e)} @@ -282,19 +289,24 @@ class MongoDocStatusStorage(DocStatusStorage): async def index_done_callback(self) -> None: # Mongo handles persistence automatically pass - + async def drop(self) -> dict[str, str]: """Drop the storage by removing all documents in the collection. - + Returns: dict[str, str]: Status of the operation with keys 'status' and 'message' """ try: result = await self._data.delete_many({}) deleted_count = result.deleted_count - - logger.info(f"Dropped {deleted_count} documents from doc status {self._collection_name}") - return {"status": "success", "message": f"{deleted_count} documents dropped"} + + logger.info( + f"Dropped {deleted_count} documents from doc status {self._collection_name}" + ) + return { + "status": "success", + "message": f"{deleted_count} documents dropped", + } except PyMongoError as e: logger.error(f"Error dropping doc status {self._collection_name}: {e}") return {"status": "error", "message": str(e)} @@ -911,16 +923,21 @@ class MongoGraphStorage(BaseGraphStorage): async def drop(self) -> dict[str, str]: """Drop the storage by removing all documents in the collection. - + Returns: dict[str, str]: Status of the operation with keys 'status' and 'message' """ try: result = await self.collection.delete_many({}) deleted_count = result.deleted_count - - logger.info(f"Dropped {deleted_count} documents from graph {self._collection_name}") - return {"status": "success", "message": f"{deleted_count} documents dropped"} + + logger.info( + f"Dropped {deleted_count} documents from graph {self._collection_name}" + ) + return { + "status": "success", + "message": f"{deleted_count} documents dropped", + } except PyMongoError as e: logger.error(f"Error dropping graph {self._collection_name}: {e}") return {"status": "error", "message": str(e)} @@ -1211,10 +1228,10 @@ class MongoVectorDBStorage(BaseVectorStorage): except Exception as e: logger.error(f"Error retrieving vector data for IDs {ids}: {e}") return [] - + async def drop(self) -> dict[str, str]: """Drop the storage by removing all documents in the collection and recreating vector index. - + Returns: dict[str, str]: Status of the operation with keys 'status' and 'message' """ @@ -1222,12 +1239,17 @@ class MongoVectorDBStorage(BaseVectorStorage): # Delete all documents result = await self._data.delete_many({}) deleted_count = result.deleted_count - + # Recreate vector index await self.create_vector_index_if_not_exists() - - logger.info(f"Dropped {deleted_count} documents from vector storage {self._collection_name} and recreated vector index") - return {"status": "success", "message": f"{deleted_count} documents dropped and vector index recreated"} + + logger.info( + f"Dropped {deleted_count} documents from vector storage {self._collection_name} and recreated vector index" + ) + return { + "status": "success", + "message": f"{deleted_count} documents dropped and vector index recreated", + } except PyMongoError as e: logger.error(f"Error dropping vector storage {self._collection_name}: {e}") return {"status": "error", "message": str(e)} diff --git a/lightrag/kg/nano_vector_db_impl.py b/lightrag/kg/nano_vector_db_impl.py index 8c00437d..56a52b92 100644 --- a/lightrag/kg/nano_vector_db_impl.py +++ b/lightrag/kg/nano_vector_db_impl.py @@ -309,7 +309,7 @@ class NanoVectorDBStorage(BaseVectorStorage): async def drop(self) -> dict[str, str]: """Drop all vector data from storage and clean up resources - + This method will: 1. Remove the vector database storage file if it exists 2. Reinitialize the vector database client @@ -317,7 +317,7 @@ class NanoVectorDBStorage(BaseVectorStorage): 4. Changes is persisted to disk immediately This method is intended for use in scenarios where all data needs to be removed, - + Returns: dict[str, str]: Operation status and message - On success: {"status": "success", "message": "data dropped"} @@ -339,7 +339,9 @@ class NanoVectorDBStorage(BaseVectorStorage): # Reset own update flag to avoid self-reloading self.storage_updated.value = False - logger.info(f"Process {os.getpid()} drop {self.namespace}(file:{self._client_file_name})") + logger.info( + f"Process {os.getpid()} drop {self.namespace}(file:{self._client_file_name})" + ) return {"status": "success", "message": "data dropped"} except Exception as e: logger.error(f"Error dropping {self.namespace}: {e}") diff --git a/lightrag/kg/neo4j_impl.py b/lightrag/kg/neo4j_impl.py index 3f2545a7..4ee88da2 100644 --- a/lightrag/kg/neo4j_impl.py +++ b/lightrag/kg/neo4j_impl.py @@ -1028,12 +1028,12 @@ class Neo4JStorage(BaseGraphStorage): self, algorithm: str ) -> tuple[np.ndarray[Any, Any], list[str]]: raise NotImplementedError - + async def drop(self) -> dict[str, str]: """Drop all data from storage and clean up resources - + This method will delete all nodes and relationships in the Neo4j database. - + Returns: dict[str, str]: Operation status and message - On success: {"status": "success", "message": "data dropped"} @@ -1045,8 +1045,10 @@ class Neo4JStorage(BaseGraphStorage): query = "MATCH (n) DETACH DELETE n" result = await session.run(query) await result.consume() # Ensure result is fully consumed - - logger.info(f"Process {os.getpid()} drop Neo4j database {self._DATABASE}") + + logger.info( + f"Process {os.getpid()} drop Neo4j database {self._DATABASE}" + ) return {"status": "success", "message": "data dropped"} except Exception as e: logger.error(f"Error dropping Neo4j database {self._DATABASE}: {e}") diff --git a/lightrag/kg/networkx_impl.py b/lightrag/kg/networkx_impl.py index 0baa72a3..7a9cb203 100644 --- a/lightrag/kg/networkx_impl.py +++ b/lightrag/kg/networkx_impl.py @@ -457,13 +457,13 @@ class NetworkXStorage(BaseGraphStorage): async def drop(self) -> dict[str, str]: """Drop all graph data from storage and clean up resources - + This method will: 1. Remove the graph storage file if it exists 2. Reset the graph to an empty state 3. Update flags to notify other processes 4. Changes is persisted to disk immediately - + Returns: dict[str, str]: Operation status and message - On success: {"status": "success", "message": "data dropped"} @@ -479,7 +479,9 @@ class NetworkXStorage(BaseGraphStorage): await set_all_update_flags(self.namespace) # Reset own update flag to avoid self-reloading self.storage_updated.value = False - logger.info(f"Process {os.getpid()} drop graph {self.namespace} (file:{self._graphml_xml_file})") + logger.info( + f"Process {os.getpid()} drop graph {self.namespace} (file:{self._graphml_xml_file})" + ) return {"status": "success", "message": "data dropped"} except Exception as e: logger.error(f"Error dropping graph {self.namespace}: {e}") diff --git a/lightrag/kg/oracle_impl.py b/lightrag/kg/oracle_impl.py index 6d3e2e8d..7ba9f428 100644 --- a/lightrag/kg/oracle_impl.py +++ b/lightrag/kg/oracle_impl.py @@ -27,7 +27,7 @@ if not pm.is_installed("oracledb"): pm.install("oracledb") from graspologic import embed -import oracledb # type: ignore +import oracledb # type: ignore class OracleDB: @@ -406,43 +406,45 @@ class OracleKVStorage(BaseKVStorage): if not table_name: logger.error(f"Unknown namespace for deletion: {self.namespace}") return - + ids_list = ",".join([f"'{id}'" for id in ids]) delete_sql = f"DELETE FROM {table_name} WHERE workspace=:workspace AND id IN ({ids_list})" - + await self.db.execute(delete_sql, {"workspace": self.db.workspace}) - logger.info(f"Successfully deleted {len(ids)} records from {self.namespace}") + logger.info( + f"Successfully deleted {len(ids)} records from {self.namespace}" + ) except Exception as e: logger.error(f"Error deleting records from {self.namespace}: {e}") async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: """Delete specific records from storage by cache mode - + Args: modes (list[str]): List of cache modes to be dropped from storage - + Returns: bool: True if successful, False otherwise """ if not modes: return False - + try: table_name = namespace_to_table_name(self.namespace) if not table_name: return False - + if table_name != "LIGHTRAG_LLM_CACHE": return False - + # 构建Oracle风格的IN查询 modes_list = ", ".join([f"'{mode}'" for mode in modes]) sql = f""" DELETE FROM {table_name} - WHERE workspace = :workspace + WHERE workspace = :workspace AND cache_mode IN ({modes_list}) """ - + logger.info(f"Deleting cache by modes: {modes}") await self.db.execute(sql, {"workspace": self.db.workspace}) return True @@ -455,8 +457,11 @@ class OracleKVStorage(BaseKVStorage): try: table_name = namespace_to_table_name(self.namespace) if not table_name: - return {"status": "error", "message": f"Unknown namespace: {self.namespace}"} - + return { + "status": "error", + "message": f"Unknown namespace: {self.namespace}", + } + drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( table_name=table_name ) @@ -683,8 +688,11 @@ class OracleVectorDBStorage(BaseVectorStorage): try: table_name = namespace_to_table_name(self.namespace) if not table_name: - return {"status": "error", "message": f"Unknown namespace: {self.namespace}"} - + return { + "status": "error", + "message": f"Unknown namespace: {self.namespace}", + } + drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( table_name=table_name ) @@ -1025,12 +1033,16 @@ class OracleGraphStorage(BaseGraphStorage): """Drop the storage""" try: # 使用图形查询删除所有节点和关系 - delete_edges_sql = """DELETE FROM LIGHTRAG_GRAPH_EDGES WHERE workspace=:workspace""" + delete_edges_sql = ( + """DELETE FROM LIGHTRAG_GRAPH_EDGES WHERE workspace=:workspace""" + ) await self.db.execute(delete_edges_sql, {"workspace": self.db.workspace}) - - delete_nodes_sql = """DELETE FROM LIGHTRAG_GRAPH_NODES WHERE workspace=:workspace""" + + delete_nodes_sql = ( + """DELETE FROM LIGHTRAG_GRAPH_NODES WHERE workspace=:workspace""" + ) await self.db.execute(delete_nodes_sql, {"workspace": self.db.workspace}) - + return {"status": "success", "message": "graph data dropped"} except Exception as e: logger.error(f"Error dropping graph: {e}") diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 34268e32..9e4f20bd 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -380,10 +380,10 @@ class PGKVStorage(BaseKVStorage): async def delete(self, ids: list[str]) -> None: """Delete specific records from storage by their IDs - + Args: ids (list[str]): List of document IDs to be deleted from storage - + Returns: None """ @@ -398,40 +398,41 @@ class PGKVStorage(BaseKVStorage): delete_sql = f"DELETE FROM {table_name} WHERE workspace=$1 AND id = ANY($2)" try: - await self.db.execute(delete_sql, {"workspace": self.db.workspace, "ids": ids}) - logger.debug(f"Successfully deleted {len(ids)} records from {self.namespace}") + await self.db.execute( + delete_sql, {"workspace": self.db.workspace, "ids": ids} + ) + logger.debug( + f"Successfully deleted {len(ids)} records from {self.namespace}" + ) except Exception as e: logger.error(f"Error while deleting records from {self.namespace}: {e}") async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: """Delete specific records from storage by cache mode - + Args: modes (list[str]): List of cache modes to be dropped from storage - + Returns: bool: True if successful, False otherwise """ if not modes: return False - + try: table_name = namespace_to_table_name(self.namespace) if not table_name: return False - + if table_name != "LIGHTRAG_LLM_CACHE": return False - + sql = f""" DELETE FROM {table_name} WHERE workspace = $1 AND mode = ANY($2) """ - params = { - "workspace": self.db.workspace, - "modes": modes - } - + params = {"workspace": self.db.workspace, "modes": modes} + logger.info(f"Deleting cache by modes: {modes}") await self.db.execute(sql, params) return True @@ -444,8 +445,11 @@ class PGKVStorage(BaseKVStorage): try: table_name = namespace_to_table_name(self.namespace) if not table_name: - return {"status": "error", "message": f"Unknown namespace: {self.namespace}"} - + return { + "status": "error", + "message": f"Unknown namespace: {self.namespace}", + } + drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( table_name=table_name ) @@ -622,7 +626,9 @@ class PGVectorStorage(BaseVectorStorage): delete_sql = f"DELETE FROM {table_name} WHERE workspace=$1 AND id = ANY($2)" try: - await self.db.execute(delete_sql, {"workspace": self.db.workspace, "ids": ids}) + await self.db.execute( + delete_sql, {"workspace": self.db.workspace, "ids": ids} + ) logger.debug( f"Successfully deleted {len(ids)} vectors from {self.namespace}" ) @@ -759,8 +765,11 @@ class PGVectorStorage(BaseVectorStorage): try: table_name = namespace_to_table_name(self.namespace) if not table_name: - return {"status": "error", "message": f"Unknown namespace: {self.namespace}"} - + return { + "status": "error", + "message": f"Unknown namespace: {self.namespace}", + } + drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( table_name=table_name ) @@ -930,8 +939,11 @@ class PGDocStatusStorage(DocStatusStorage): try: table_name = namespace_to_table_name(self.namespace) if not table_name: - return {"status": "error", "message": f"Unknown namespace: {self.namespace}"} - + return { + "status": "error", + "message": f"Unknown namespace: {self.namespace}", + } + drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( table_name=table_name ) @@ -1626,7 +1638,7 @@ class PGGraphStorage(BaseGraphStorage): MATCH (n) DETACH DELETE n $$) AS (result agtype)""" - + await self._query(drop_query, readonly=False) return {"status": "success", "message": "graph data dropped"} except Exception as e: @@ -1812,7 +1824,7 @@ SQL_TEMPLATES = { chunk_ids=EXCLUDED.chunk_ids, file_path=EXCLUDED.file_path, update_time = CURRENT_TIMESTAMP - """, + """, "relationships": """ WITH relevant_chunks AS ( SELECT id as chunk_id diff --git a/lightrag/kg/qdrant_impl.py b/lightrag/kg/qdrant_impl.py index 855b98ae..d758ca5c 100644 --- a/lightrag/kg/qdrant_impl.py +++ b/lightrag/kg/qdrant_impl.py @@ -13,11 +13,12 @@ import pipmaster as pm if not pm.is_installed("qdrant-client"): pm.install("qdrant-client") -from qdrant_client import QdrantClient, models # type: ignore +from qdrant_client import QdrantClient, models # type: ignore config = configparser.ConfigParser() config.read("config.ini", "utf-8") + def compute_mdhash_id_for_qdrant( content: str, prefix: str = "", style: str = "simple" ) -> str: @@ -272,7 +273,7 @@ class QdrantVectorDBStorage(BaseVectorStorage): except Exception as e: logger.error(f"Error searching for prefix '{prefix}': {e}") return [] - + async def get_by_id(self, id: str) -> dict[str, Any] | None: """Get vector data by its ID @@ -285,22 +286,22 @@ class QdrantVectorDBStorage(BaseVectorStorage): try: # Convert to Qdrant compatible ID qdrant_id = compute_mdhash_id_for_qdrant(id) - + # Retrieve the point by ID result = self._client.retrieve( collection_name=self.namespace, ids=[qdrant_id], with_payload=True, ) - + if not result: return None - + return result[0].payload except Exception as e: logger.error(f"Error retrieving vector data for ID {id}: {e}") return None - + async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: """Get multiple vector data by their IDs @@ -312,28 +313,28 @@ class QdrantVectorDBStorage(BaseVectorStorage): """ if not ids: return [] - + try: # Convert to Qdrant compatible IDs qdrant_ids = [compute_mdhash_id_for_qdrant(id) for id in ids] - + # Retrieve the points by IDs results = self._client.retrieve( collection_name=self.namespace, ids=qdrant_ids, with_payload=True, ) - + return [point.payload for point in results] except Exception as e: logger.error(f"Error retrieving vector data for IDs {ids}: {e}") return [] - + async def drop(self) -> dict[str, str]: """Drop all vector data from storage and clean up resources - + This method will delete all data from the Qdrant collection. - + Returns: dict[str, str]: Operation status and message - On success: {"status": "success", "message": "data dropped"} @@ -343,17 +344,20 @@ class QdrantVectorDBStorage(BaseVectorStorage): # Delete the collection and recreate it if self._client.collection_exists(self.namespace): self._client.delete_collection(self.namespace) - + # Recreate the collection QdrantVectorDBStorage.create_collection_if_not_exist( self._client, self.namespace, vectors_config=models.VectorParams( - size=self.embedding_func.embedding_dim, distance=models.Distance.COSINE + size=self.embedding_func.embedding_dim, + distance=models.Distance.COSINE, ), ) - - logger.info(f"Process {os.getpid()} drop Qdrant collection {self.namespace}") + + logger.info( + f"Process {os.getpid()} drop Qdrant collection {self.namespace}" + ) return {"status": "success", "message": "data dropped"} except Exception as e: logger.error(f"Error dropping Qdrant collection {self.namespace}: {e}") diff --git a/lightrag/kg/redis_impl.py b/lightrag/kg/redis_impl.py index 964b0ad7..4452d55f 100644 --- a/lightrag/kg/redis_impl.py +++ b/lightrag/kg/redis_impl.py @@ -8,7 +8,7 @@ if not pm.is_installed("redis"): pm.install("redis") # aioredis is a depricated library, replaced with redis -from redis.asyncio import Redis # type: ignore +from redis.asyncio import Redis # type: ignore from lightrag.utils import logger from lightrag.base import BaseKVStorage import json @@ -83,51 +83,51 @@ class RedisKVStorage(BaseKVStorage): logger.info( f"Deleted {deleted_count} of {len(ids)} entries from {self.namespace}" ) - - async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: + + async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: """Delete specific records from storage by by cache mode - + Importance notes for Redis storage: 1. This will immediately delete the specified cache modes from Redis - + Args: modes (list[str]): List of cache mode to be drop from storage - + Returns: True: if the cache drop successfully False: if the cache drop failed """ if not modes: return False - + try: await self.delete(modes) return True except Exception: return False - + async def drop(self) -> dict[str, str]: """Drop the storage by removing all keys under the current namespace. - + Returns: dict[str, str]: Status of the operation with keys 'status' and 'message' """ try: keys = await self._redis.keys(f"{self.namespace}:*") - + if keys: pipe = self._redis.pipeline() for key in keys: pipe.delete(key) results = await pipe.execute() deleted_count = sum(results) - + logger.info(f"Dropped {deleted_count} keys from {self.namespace}") return {"status": "success", "message": f"{deleted_count} keys dropped"} else: logger.info(f"No keys found to drop in {self.namespace}") return {"status": "success", "message": "no keys to drop"} - + except Exception as e: logger.error(f"Error dropping keys from {self.namespace}: {e}") return {"status": "error", "message": str(e)} diff --git a/lightrag/kg/tidb_impl.py b/lightrag/kg/tidb_impl.py index 3d06ad0a..e57357de 100644 --- a/lightrag/kg/tidb_impl.py +++ b/lightrag/kg/tidb_impl.py @@ -20,7 +20,7 @@ if not pm.is_installed("pymysql"): if not pm.is_installed("sqlalchemy"): pm.install("sqlalchemy") -from sqlalchemy import create_engine, text # type: ignore +from sqlalchemy import create_engine, text # type: ignore class TiDB: @@ -290,47 +290,49 @@ class TiDBKVStorage(BaseKVStorage): try: table_name = namespace_to_table_name(self.namespace) id_field = namespace_to_id(self.namespace) - + if not table_name or not id_field: logger.error(f"Unknown namespace for deletion: {self.namespace}") return - + ids_list = ",".join([f"'{id}'" for id in ids]) delete_sql = f"DELETE FROM {table_name} WHERE workspace = :workspace AND {id_field} IN ({ids_list})" - + await self.db.execute(delete_sql, {"workspace": self.db.workspace}) - logger.info(f"Successfully deleted {len(ids)} records from {self.namespace}") + logger.info( + f"Successfully deleted {len(ids)} records from {self.namespace}" + ) except Exception as e: logger.error(f"Error deleting records from {self.namespace}: {e}") async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: """Delete specific records from storage by cache mode - + Args: modes (list[str]): List of cache modes to be dropped from storage - + Returns: bool: True if successful, False otherwise """ if not modes: return False - + try: table_name = namespace_to_table_name(self.namespace) if not table_name: return False - + if table_name != "LIGHTRAG_LLM_CACHE": return False - + # 构建MySQL风格的IN查询 modes_list = ", ".join([f"'{mode}'" for mode in modes]) sql = f""" DELETE FROM {table_name} - WHERE workspace = :workspace + WHERE workspace = :workspace AND mode IN ({modes_list}) """ - + logger.info(f"Deleting cache by modes: {modes}") await self.db.execute(sql, {"workspace": self.db.workspace}) return True @@ -343,8 +345,11 @@ class TiDBKVStorage(BaseKVStorage): try: table_name = namespace_to_table_name(self.namespace) if not table_name: - return {"status": "error", "message": f"Unknown namespace: {self.namespace}"} - + return { + "status": "error", + "message": f"Unknown namespace: {self.namespace}", + } + drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( table_name=table_name ) @@ -492,7 +497,7 @@ class TiDBVectorDBStorage(BaseVectorStorage): table_name = namespace_to_table_name(self.namespace) id_field = namespace_to_id(self.namespace) - + if not table_name or not id_field: logger.error(f"Unknown namespace for vector deletion: {self.namespace}") return @@ -502,7 +507,9 @@ class TiDBVectorDBStorage(BaseVectorStorage): try: await self.db.execute(delete_sql, {"workspace": self.db.workspace}) - logger.debug(f"Successfully deleted {len(ids)} vectors from {self.namespace}") + logger.debug( + f"Successfully deleted {len(ids)} vectors from {self.namespace}" + ) except Exception as e: logger.error(f"Error while deleting vectors from {self.namespace}: {e}") @@ -551,8 +558,11 @@ class TiDBVectorDBStorage(BaseVectorStorage): try: table_name = namespace_to_table_name(self.namespace) if not table_name: - return {"status": "error", "message": f"Unknown namespace: {self.namespace}"} - + return { + "status": "error", + "message": f"Unknown namespace: {self.namespace}", + } + drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( table_name=table_name )