Fix linting

This commit is contained in:
yangdx
2025-03-31 23:22:27 +08:00
parent 3d4f8f67c9
commit 95a8ee27ed
18 changed files with 296 additions and 222 deletions

View File

@@ -60,7 +60,9 @@ class InsertResponse(BaseModel):
class ClearDocumentsResponse(BaseModel): class ClearDocumentsResponse(BaseModel):
status: str = Field(description="Status of the clear operation: success/partial_success/busy/fail") status: str = Field(
description="Status of the clear operation: success/partial_success/busy/fail"
)
message: str = Field(description="Message describing the operation result") message: str = Field(description="Message describing the operation result")
@@ -448,7 +450,7 @@ async def pipeline_index_texts(rag: LightRAG, texts: List[str]):
await rag.apipeline_process_enqueue_documents() await rag.apipeline_process_enqueue_documents()
# TODO: deprecate after /insert_file is removed # TODO: deprecate after /insert_file is removed
async def save_temp_file(input_dir: Path, file: UploadFile = File(...)) -> Path: async def save_temp_file(input_dir: Path, file: UploadFile = File(...)) -> Path:
"""Save the uploaded file to a temporary location """Save the uploaded file to a temporary location
@@ -783,7 +785,10 @@ def create_document_routes(
HTTPException: Raised when a serious error occurs during the clearing process, HTTPException: Raised when a serious error occurs during the clearing process,
with status code 500 and error details in the detail field. with status code 500 and error details in the detail field.
""" """
from lightrag.kg.shared_storage import get_namespace_data, get_pipeline_status_lock from lightrag.kg.shared_storage import (
get_namespace_data,
get_pipeline_status_lock,
)
# Get pipeline status and lock # Get pipeline status and lock
pipeline_status = await get_namespace_data("pipeline_status") pipeline_status = await get_namespace_data("pipeline_status")
@@ -794,14 +799,16 @@ def create_document_routes(
if pipeline_status.get("busy", False): if pipeline_status.get("busy", False):
return ClearDocumentsResponse( return ClearDocumentsResponse(
status="busy", status="busy",
message="Cannot clear documents while pipeline is busy" message="Cannot clear documents while pipeline is busy",
) )
# Set busy to true # Set busy to true
pipeline_status["busy"] = True pipeline_status["busy"] = True
pipeline_status["job_name"] = "Clearing Documents" pipeline_status["job_name"] = "Clearing Documents"
pipeline_status["latest_message"] = "Starting document clearing process" pipeline_status["latest_message"] = "Starting document clearing process"
if "history_messages" in pipeline_status: if "history_messages" in pipeline_status:
pipeline_status["history_messages"].append("Starting document clearing process") pipeline_status["history_messages"].append(
"Starting document clearing process"
)
try: try:
# Use drop method to clear all data # Use drop method to clear all data
@@ -813,25 +820,27 @@ def create_document_routes(
rag.relationships_vdb, rag.relationships_vdb,
rag.chunks_vdb, rag.chunks_vdb,
rag.chunk_entity_relation_graph, rag.chunk_entity_relation_graph,
rag.doc_status rag.doc_status,
] ]
# Log storage drop start # Log storage drop start
if "history_messages" in pipeline_status: if "history_messages" in pipeline_status:
pipeline_status["history_messages"].append("Starting to drop storage components") pipeline_status["history_messages"].append(
"Starting to drop storage components"
)
for storage in storages: for storage in storages:
if storage is not None: if storage is not None:
drop_tasks.append(storage.drop()) drop_tasks.append(storage.drop())
# Wait for all drop tasks to complete # Wait for all drop tasks to complete
drop_results = await asyncio.gather(*drop_tasks, return_exceptions=True) drop_results = await asyncio.gather(*drop_tasks, return_exceptions=True)
# Check for errors and log results # Check for errors and log results
errors = [] errors = []
storage_success_count = 0 storage_success_count = 0
storage_error_count = 0 storage_error_count = 0
for i, result in enumerate(drop_results): for i, result in enumerate(drop_results):
storage_name = storages[i].__class__.__name__ storage_name = storages[i].__class__.__name__
if isinstance(result, Exception): if isinstance(result, Exception):
@@ -842,7 +851,7 @@ def create_document_routes(
else: else:
logger.info(f"Successfully dropped {storage_name}") logger.info(f"Successfully dropped {storage_name}")
storage_success_count += 1 storage_success_count += 1
# Log storage drop results # Log storage drop results
if "history_messages" in pipeline_status: if "history_messages" in pipeline_status:
if storage_error_count > 0: if storage_error_count > 0:
@@ -853,26 +862,25 @@ def create_document_routes(
pipeline_status["history_messages"].append( pipeline_status["history_messages"].append(
f"Successfully dropped all {storage_success_count} storage components" f"Successfully dropped all {storage_success_count} storage components"
) )
# If all storage operations failed, return error status and don't proceed with file deletion # If all storage operations failed, return error status and don't proceed with file deletion
if storage_success_count == 0 and storage_error_count > 0: if storage_success_count == 0 and storage_error_count > 0:
error_message = "All storage drop operations failed. Aborting document clearing process." error_message = "All storage drop operations failed. Aborting document clearing process."
logger.error(error_message) logger.error(error_message)
if "history_messages" in pipeline_status: if "history_messages" in pipeline_status:
pipeline_status["history_messages"].append(error_message) pipeline_status["history_messages"].append(error_message)
return ClearDocumentsResponse( return ClearDocumentsResponse(status="fail", message=error_message)
status="fail",
message=error_message
)
# Log file deletion start # Log file deletion start
if "history_messages" in pipeline_status: if "history_messages" in pipeline_status:
pipeline_status["history_messages"].append("Starting to delete files in input directory") pipeline_status["history_messages"].append(
"Starting to delete files in input directory"
)
# Delete all files in input_dir # Delete all files in input_dir
deleted_files_count = 0 deleted_files_count = 0
file_errors_count = 0 file_errors_count = 0
for file_path in doc_manager.input_dir.glob("**/*"): for file_path in doc_manager.input_dir.glob("**/*"):
if file_path.is_file(): if file_path.is_file():
try: try:
@@ -881,7 +889,7 @@ def create_document_routes(
except Exception as e: except Exception as e:
logger.error(f"Error deleting file {file_path}: {str(e)}") logger.error(f"Error deleting file {file_path}: {str(e)}")
file_errors_count += 1 file_errors_count += 1
# Log file deletion results # Log file deletion results
if "history_messages" in pipeline_status: if "history_messages" in pipeline_status:
if file_errors_count > 0: if file_errors_count > 0:
@@ -893,7 +901,7 @@ def create_document_routes(
pipeline_status["history_messages"].append( pipeline_status["history_messages"].append(
f"Successfully deleted {deleted_files_count} files" f"Successfully deleted {deleted_files_count} files"
) )
# Prepare final result message # Prepare final result message
final_message = "" final_message = ""
if errors: if errors:
@@ -903,16 +911,12 @@ def create_document_routes(
final_message = f"All documents cleared successfully. Deleted {deleted_files_count} files." final_message = f"All documents cleared successfully. Deleted {deleted_files_count} files."
status = "success" status = "success"
# Log final result # Log final result
if "history_messages" in pipeline_status: if "history_messages" in pipeline_status:
pipeline_status["history_messages"].append(final_message) pipeline_status["history_messages"].append(final_message)
# Return response based on results # Return response based on results
return ClearDocumentsResponse( return ClearDocumentsResponse(status=status, message=final_message)
status=status,
message=final_message
)
except Exception as e: except Exception as e:
error_msg = f"Error clearing documents: {str(e)}" error_msg = f"Error clearing documents: {str(e)}"
logger.error(error_msg) logger.error(error_msg)

View File

@@ -111,11 +111,11 @@ class StorageNameSpace(ABC):
@abstractmethod @abstractmethod
async def index_done_callback(self) -> None: async def index_done_callback(self) -> None:
"""Commit the storage operations after indexing""" """Commit the storage operations after indexing"""
@abstractmethod @abstractmethod
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop all data from storage and clean up resources """Drop all data from storage and clean up resources
This abstract method defines the contract for dropping all data from a storage implementation. This abstract method defines the contract for dropping all data from a storage implementation.
Each storage type must implement this method to: Each storage type must implement this method to:
1. Clear all data from memory and/or external storage 1. Clear all data from memory and/or external storage
@@ -124,14 +124,14 @@ class StorageNameSpace(ABC):
4. Handle cleanup of any resources 4. Handle cleanup of any resources
5. Notify other processes if necessary 5. Notify other processes if necessary
6. This action should persistent the data to disk immediately. 6. This action should persistent the data to disk immediately.
Returns: Returns:
dict[str, str]: Operation status and message with the following format: dict[str, str]: Operation status and message with the following format:
{ {
"status": str, # "success" or "error" "status": str, # "success" or "error"
"message": str # "data dropped" on success, error details on failure "message": str # "data dropped" on success, error details on failure
} }
Implementation specific: Implementation specific:
- On success: return {"status": "success", "message": "data dropped"} - On success: return {"status": "success", "message": "data dropped"}
- On failure: return {"status": "error", "message": "<error details>"} - On failure: return {"status": "error", "message": "<error details>"}
@@ -238,42 +238,43 @@ class BaseKVStorage(StorageNameSpace, ABC):
@abstractmethod @abstractmethod
async def upsert(self, data: dict[str, dict[str, Any]]) -> None: async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
"""Upsert data """Upsert data
Importance notes for in-memory storage: Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback 1. Changes will be persisted to disk during the next index_done_callback
2. update flags to notify other processes that data persistence is needed 2. update flags to notify other processes that data persistence is needed
""" """
@abstractmethod @abstractmethod
async def delete(self, ids: list[str]) -> None: async def delete(self, ids: list[str]) -> None:
"""Delete specific records from storage by their IDs """Delete specific records from storage by their IDs
Importance notes for in-memory storage: Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback 1. Changes will be persisted to disk during the next index_done_callback
2. update flags to notify other processes that data persistence is needed 2. update flags to notify other processes that data persistence is needed
Args: Args:
ids (list[str]): List of document IDs to be deleted from storage ids (list[str]): List of document IDs to be deleted from storage
Returns: Returns:
None None
""" """
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
"""Delete specific records from storage by cache mode """Delete specific records from storage by cache mode
Importance notes for in-memory storage: Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback 1. Changes will be persisted to disk during the next index_done_callback
2. update flags to notify other processes that data persistence is needed 2. update flags to notify other processes that data persistence is needed
Args: Args:
modes (list[str]): List of cache modes to be dropped from storage modes (list[str]): List of cache modes to be dropped from storage
Returns: Returns:
True: if the cache drop successfully True: if the cache drop successfully
False: if the cache drop failed, or the cache mode is not supported False: if the cache drop failed, or the cache mode is not supported
""" """
@dataclass @dataclass
class BaseGraphStorage(StorageNameSpace, ABC): class BaseGraphStorage(StorageNameSpace, ABC):
embedding_func: EmbeddingFunc embedding_func: EmbeddingFunc
@@ -394,7 +395,7 @@ class DocStatusStorage(BaseKVStorage, ABC):
) -> dict[str, DocProcessingStatus]: ) -> dict[str, DocProcessingStatus]:
"""Get all documents with a specific status""" """Get all documents with a specific status"""
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
"""Drop cache is not supported for Doc Status storage""" """Drop cache is not supported for Doc Status storage"""
return False return False

View File

@@ -34,9 +34,9 @@ if not pm.is_installed("psycopg-pool"):
if not pm.is_installed("asyncpg"): if not pm.is_installed("asyncpg"):
pm.install("asyncpg") pm.install("asyncpg")
import psycopg # type: ignore import psycopg # type: ignore
from psycopg.rows import namedtuple_row # type: ignore from psycopg.rows import namedtuple_row # type: ignore
from psycopg_pool import AsyncConnectionPool, PoolTimeout # type: ignore from psycopg_pool import AsyncConnectionPool, PoolTimeout # type: ignore
class AGEQueryException(Exception): class AGEQueryException(Exception):
@@ -871,10 +871,10 @@ class AGEStorage(BaseGraphStorage):
async def index_done_callback(self) -> None: async def index_done_callback(self) -> None:
# AGES handles persistence automatically # AGES handles persistence automatically
pass pass
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop the storage by removing all nodes and relationships in the graph. """Drop the storage by removing all nodes and relationships in the graph.
Returns: Returns:
dict[str, str]: Status of the operation with keys 'status' and 'message' dict[str, str]: Status of the operation with keys 'status' and 'message'
""" """

View File

@@ -11,8 +11,8 @@ import pipmaster as pm
if not pm.is_installed("chromadb"): if not pm.is_installed("chromadb"):
pm.install("chromadb") pm.install("chromadb")
from chromadb import HttpClient, PersistentClient # type: ignore from chromadb import HttpClient, PersistentClient # type: ignore
from chromadb.config import Settings # type: ignore from chromadb.config import Settings # type: ignore
@final @final
@@ -336,12 +336,12 @@ class ChromaVectorDBStorage(BaseVectorStorage):
except Exception as e: except Exception as e:
logger.error(f"Error retrieving vector data for IDs {ids}: {e}") logger.error(f"Error retrieving vector data for IDs {ids}: {e}")
return [] return []
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop all vector data from storage and clean up resources """Drop all vector data from storage and clean up resources
This method will delete all documents from the ChromaDB collection. This method will delete all documents from the ChromaDB collection.
Returns: Returns:
dict[str, str]: Operation status and message dict[str, str]: Operation status and message
- On success: {"status": "success", "message": "data dropped"} - On success: {"status": "success", "message": "data dropped"}
@@ -353,8 +353,10 @@ class ChromaVectorDBStorage(BaseVectorStorage):
if result and result["ids"] and len(result["ids"]) > 0: if result and result["ids"] and len(result["ids"]) > 0:
# Delete all documents # Delete all documents
self._collection.delete(ids=result["ids"]) self._collection.delete(ids=result["ids"])
logger.info(f"Process {os.getpid()} drop ChromaDB collection {self.namespace}") logger.info(
f"Process {os.getpid()} drop ChromaDB collection {self.namespace}"
)
return {"status": "success", "message": "data dropped"} return {"status": "success", "message": "data dropped"}
except Exception as e: except Exception as e:
logger.error(f"Error dropping ChromaDB collection {self.namespace}: {e}") logger.error(f"Error dropping ChromaDB collection {self.namespace}: {e}")

View File

@@ -443,10 +443,10 @@ class FaissVectorDBStorage(BaseVectorStorage):
results.append({**metadata, "id": metadata.get("__id__")}) results.append({**metadata, "id": metadata.get("__id__")})
return results return results
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop all vector data from storage and clean up resources """Drop all vector data from storage and clean up resources
This method will: This method will:
1. Remove the vector database storage file if it exists 1. Remove the vector database storage file if it exists
2. Reinitialize the vector database client 2. Reinitialize the vector database client
@@ -454,7 +454,7 @@ class FaissVectorDBStorage(BaseVectorStorage):
4. Changes is persisted to disk immediately 4. Changes is persisted to disk immediately
This method will remove all vectors from the Faiss index and delete the storage files. This method will remove all vectors from the Faiss index and delete the storage files.
Returns: Returns:
dict[str, str]: Operation status and message dict[str, str]: Operation status and message
- On success: {"status": "success", "message": "data dropped"} - On success: {"status": "success", "message": "data dropped"}
@@ -465,7 +465,7 @@ class FaissVectorDBStorage(BaseVectorStorage):
# Reset the index # Reset the index
self._index = faiss.IndexFlatIP(self._dim) self._index = faiss.IndexFlatIP(self._dim)
self._id_to_meta = {} self._id_to_meta = {}
# Remove storage files if they exist # Remove storage files if they exist
if os.path.exists(self._faiss_index_file): if os.path.exists(self._faiss_index_file):
os.remove(self._faiss_index_file) os.remove(self._faiss_index_file)
@@ -478,7 +478,7 @@ class FaissVectorDBStorage(BaseVectorStorage):
# Notify other processes # Notify other processes
await set_all_update_flags(self.namespace) await set_all_update_flags(self.namespace)
self.storage_updated.value = False self.storage_updated.value = False
logger.info(f"Process {os.getpid()} drop FAISS index {self.namespace}") logger.info(f"Process {os.getpid()} drop FAISS index {self.namespace}")
return {"status": "success", "message": "data dropped"} return {"status": "success", "message": "data dropped"}
except Exception as e: except Exception as e:

View File

@@ -24,9 +24,9 @@ from ..base import BaseGraphStorage
if not pm.is_installed("gremlinpython"): if not pm.is_installed("gremlinpython"):
pm.install("gremlinpython") pm.install("gremlinpython")
from gremlin_python.driver import client, serializer # type: ignore from gremlin_python.driver import client, serializer # type: ignore
from gremlin_python.driver.aiohttp.transport import AiohttpTransport # type: ignore from gremlin_python.driver.aiohttp.transport import AiohttpTransport # type: ignore
from gremlin_python.driver.protocol import GremlinServerError # type: ignore from gremlin_python.driver.protocol import GremlinServerError # type: ignore
@final @final
@@ -695,13 +695,13 @@ class GremlinStorage(BaseGraphStorage):
except Exception as e: except Exception as e:
logger.error(f"Error during edge deletion: {str(e)}") logger.error(f"Error during edge deletion: {str(e)}")
raise raise
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop the storage by removing all nodes and relationships in the graph. """Drop the storage by removing all nodes and relationships in the graph.
This function deletes all nodes with the specified graph name property, This function deletes all nodes with the specified graph name property,
which automatically removes all associated edges. which automatically removes all associated edges.
Returns: Returns:
dict[str, str]: Status of the operation with keys 'status' and 'message' dict[str, str]: Status of the operation with keys 'status' and 'message'
""" """

View File

@@ -112,7 +112,7 @@ class JsonDocStatusStorage(DocStatusStorage):
""" """
Importance notes for in-memory storage: Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback 1. Changes will be persisted to disk during the next index_done_callback
2. update flags to notify other processes that data persistence is needed 2. update flags to notify other processes that data persistence is needed
""" """
if not data: if not data:
return return
@@ -129,14 +129,14 @@ class JsonDocStatusStorage(DocStatusStorage):
async def delete(self, doc_ids: list[str]) -> None: async def delete(self, doc_ids: list[str]) -> None:
"""Delete specific records from storage by their IDs """Delete specific records from storage by their IDs
Importance notes for in-memory storage: Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback 1. Changes will be persisted to disk during the next index_done_callback
2. update flags to notify other processes that data persistence is needed 2. update flags to notify other processes that data persistence is needed
Args: Args:
ids (list[str]): List of document IDs to be deleted from storage ids (list[str]): List of document IDs to be deleted from storage
Returns: Returns:
None None
""" """
@@ -147,12 +147,12 @@ class JsonDocStatusStorage(DocStatusStorage):
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop all document status data from storage and clean up resources """Drop all document status data from storage and clean up resources
This method will: This method will:
1. Clear all document status data from memory 1. Clear all document status data from memory
2. Update flags to notify other processes 2. Update flags to notify other processes
3. Trigger index_done_callback to save the empty state 3. Trigger index_done_callback to save the empty state
Returns: Returns:
dict[str, str]: Operation status and message dict[str, str]: Operation status and message
- On success: {"status": "success", "message": "data dropped"} - On success: {"status": "success", "message": "data dropped"}

View File

@@ -117,7 +117,7 @@ class JsonKVStorage(BaseKVStorage):
""" """
Importance notes for in-memory storage: Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback 1. Changes will be persisted to disk during the next index_done_callback
2. update flags to notify other processes that data persistence is needed 2. update flags to notify other processes that data persistence is needed
""" """
if not data: if not data:
return return
@@ -128,14 +128,14 @@ class JsonKVStorage(BaseKVStorage):
async def delete(self, ids: list[str]) -> None: async def delete(self, ids: list[str]) -> None:
"""Delete specific records from storage by their IDs """Delete specific records from storage by their IDs
Importance notes for in-memory storage: Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback 1. Changes will be persisted to disk during the next index_done_callback
2. update flags to notify other processes that data persistence is needed 2. update flags to notify other processes that data persistence is needed
Args: Args:
ids (list[str]): List of document IDs to be deleted from storage ids (list[str]): List of document IDs to be deleted from storage
Returns: Returns:
None None
""" """
@@ -144,39 +144,38 @@ class JsonKVStorage(BaseKVStorage):
self._data.pop(doc_id, None) self._data.pop(doc_id, None)
await set_all_update_flags(self.namespace) await set_all_update_flags(self.namespace)
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
"""Delete specific records from storage by by cache mode """Delete specific records from storage by by cache mode
Importance notes for in-memory storage: Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback 1. Changes will be persisted to disk during the next index_done_callback
2. update flags to notify other processes that data persistence is needed 2. update flags to notify other processes that data persistence is needed
Args: Args:
ids (list[str]): List of cache mode to be drop from storage ids (list[str]): List of cache mode to be drop from storage
Returns: Returns:
True: if the cache drop successfully True: if the cache drop successfully
False: if the cache drop failed False: if the cache drop failed
""" """
if not modes: if not modes:
return False return False
try: try:
await self.delete(modes) await self.delete(modes)
return True return True
except Exception: except Exception:
return False return False
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop all data from storage and clean up resources """Drop all data from storage and clean up resources
This action will persistent the data to disk immediately. This action will persistent the data to disk immediately.
This method will: This method will:
1. Clear all data from memory 1. Clear all data from memory
2. Update flags to notify other processes 2. Update flags to notify other processes
3. Trigger index_done_callback to save the empty state 3. Trigger index_done_callback to save the empty state
Returns: Returns:
dict[str, str]: Operation status and message dict[str, str]: Operation status and message
- On success: {"status": "success", "message": "data dropped"} - On success: {"status": "success", "message": "data dropped"}

View File

@@ -15,7 +15,7 @@ if not pm.is_installed("pymilvus"):
pm.install("pymilvus") pm.install("pymilvus")
import configparser import configparser
from pymilvus import MilvusClient # type: ignore from pymilvus import MilvusClient # type: ignore
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read("config.ini", "utf-8") config.read("config.ini", "utf-8")
@@ -287,12 +287,12 @@ class MilvusVectorDBStorage(BaseVectorStorage):
except Exception as e: except Exception as e:
logger.error(f"Error retrieving vector data for IDs {ids}: {e}") logger.error(f"Error retrieving vector data for IDs {ids}: {e}")
return [] return []
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop all vector data from storage and clean up resources """Drop all vector data from storage and clean up resources
This method will delete all data from the Milvus collection. This method will delete all data from the Milvus collection.
Returns: Returns:
dict[str, str]: Operation status and message dict[str, str]: Operation status and message
- On success: {"status": "success", "message": "data dropped"} - On success: {"status": "success", "message": "data dropped"}
@@ -302,15 +302,17 @@ class MilvusVectorDBStorage(BaseVectorStorage):
# Drop the collection and recreate it # Drop the collection and recreate it
if self._client.has_collection(self.namespace): if self._client.has_collection(self.namespace):
self._client.drop_collection(self.namespace) self._client.drop_collection(self.namespace)
# Recreate the collection # Recreate the collection
MilvusVectorDBStorage.create_collection_if_not_exist( MilvusVectorDBStorage.create_collection_if_not_exist(
self._client, self._client,
self.namespace, self.namespace,
dimension=self.embedding_func.embedding_dim, dimension=self.embedding_func.embedding_dim,
) )
logger.info(f"Process {os.getpid()} drop Milvus collection {self.namespace}") logger.info(
f"Process {os.getpid()} drop Milvus collection {self.namespace}"
)
return {"status": "success", "message": "data dropped"} return {"status": "success", "message": "data dropped"}
except Exception as e: except Exception as e:
logger.error(f"Error dropping Milvus collection {self.namespace}: {e}") logger.error(f"Error dropping Milvus collection {self.namespace}: {e}")

View File

@@ -25,13 +25,13 @@ if not pm.is_installed("pymongo"):
if not pm.is_installed("motor"): if not pm.is_installed("motor"):
pm.install("motor") pm.install("motor")
from motor.motor_asyncio import ( # type: ignore from motor.motor_asyncio import ( # type: ignore
AsyncIOMotorClient, AsyncIOMotorClient,
AsyncIOMotorDatabase, AsyncIOMotorDatabase,
AsyncIOMotorCollection, AsyncIOMotorCollection,
) )
from pymongo.operations import SearchIndexModel # type: ignore from pymongo.operations import SearchIndexModel # type: ignore
from pymongo.errors import PyMongoError # type: ignore from pymongo.errors import PyMongoError # type: ignore
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read("config.ini", "utf-8") config.read("config.ini", "utf-8")
@@ -149,34 +149,36 @@ class MongoKVStorage(BaseKVStorage):
async def index_done_callback(self) -> None: async def index_done_callback(self) -> None:
# Mongo handles persistence automatically # Mongo handles persistence automatically
pass pass
async def delete(self, ids: list[str]) -> None: async def delete(self, ids: list[str]) -> None:
"""Delete documents with specified IDs """Delete documents with specified IDs
Args: Args:
ids: List of document IDs to be deleted ids: List of document IDs to be deleted
""" """
if not ids: if not ids:
return return
try: try:
result = await self._data.delete_many({"_id": {"$in": ids}}) result = await self._data.delete_many({"_id": {"$in": ids}})
logger.info(f"Deleted {result.deleted_count} documents from {self.namespace}") logger.info(
f"Deleted {result.deleted_count} documents from {self.namespace}"
)
except PyMongoError as e: except PyMongoError as e:
logger.error(f"Error deleting documents from {self.namespace}: {e}") logger.error(f"Error deleting documents from {self.namespace}: {e}")
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
"""Delete specific records from storage by cache mode """Delete specific records from storage by cache mode
Args: Args:
modes (list[str]): List of cache modes to be dropped from storage modes (list[str]): List of cache modes to be dropped from storage
Returns: Returns:
bool: True if successful, False otherwise bool: True if successful, False otherwise
""" """
if not modes: if not modes:
return False return False
try: try:
# Build regex pattern to match documents with the specified modes # Build regex pattern to match documents with the specified modes
pattern = f"^({'|'.join(modes)})_" pattern = f"^({'|'.join(modes)})_"
@@ -189,16 +191,21 @@ class MongoKVStorage(BaseKVStorage):
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop the storage by removing all documents in the collection. """Drop the storage by removing all documents in the collection.
Returns: Returns:
dict[str, str]: Status of the operation with keys 'status' and 'message' dict[str, str]: Status of the operation with keys 'status' and 'message'
""" """
try: try:
result = await self._data.delete_many({}) result = await self._data.delete_many({})
deleted_count = result.deleted_count deleted_count = result.deleted_count
logger.info(f"Dropped {deleted_count} documents from doc status {self._collection_name}") logger.info(
return {"status": "success", "message": f"{deleted_count} documents dropped"} f"Dropped {deleted_count} documents from doc status {self._collection_name}"
)
return {
"status": "success",
"message": f"{deleted_count} documents dropped",
}
except PyMongoError as e: except PyMongoError as e:
logger.error(f"Error dropping doc status {self._collection_name}: {e}") logger.error(f"Error dropping doc status {self._collection_name}: {e}")
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}
@@ -282,19 +289,24 @@ class MongoDocStatusStorage(DocStatusStorage):
async def index_done_callback(self) -> None: async def index_done_callback(self) -> None:
# Mongo handles persistence automatically # Mongo handles persistence automatically
pass pass
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop the storage by removing all documents in the collection. """Drop the storage by removing all documents in the collection.
Returns: Returns:
dict[str, str]: Status of the operation with keys 'status' and 'message' dict[str, str]: Status of the operation with keys 'status' and 'message'
""" """
try: try:
result = await self._data.delete_many({}) result = await self._data.delete_many({})
deleted_count = result.deleted_count deleted_count = result.deleted_count
logger.info(f"Dropped {deleted_count} documents from doc status {self._collection_name}") logger.info(
return {"status": "success", "message": f"{deleted_count} documents dropped"} f"Dropped {deleted_count} documents from doc status {self._collection_name}"
)
return {
"status": "success",
"message": f"{deleted_count} documents dropped",
}
except PyMongoError as e: except PyMongoError as e:
logger.error(f"Error dropping doc status {self._collection_name}: {e}") logger.error(f"Error dropping doc status {self._collection_name}: {e}")
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}
@@ -911,16 +923,21 @@ class MongoGraphStorage(BaseGraphStorage):
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop the storage by removing all documents in the collection. """Drop the storage by removing all documents in the collection.
Returns: Returns:
dict[str, str]: Status of the operation with keys 'status' and 'message' dict[str, str]: Status of the operation with keys 'status' and 'message'
""" """
try: try:
result = await self.collection.delete_many({}) result = await self.collection.delete_many({})
deleted_count = result.deleted_count deleted_count = result.deleted_count
logger.info(f"Dropped {deleted_count} documents from graph {self._collection_name}") logger.info(
return {"status": "success", "message": f"{deleted_count} documents dropped"} f"Dropped {deleted_count} documents from graph {self._collection_name}"
)
return {
"status": "success",
"message": f"{deleted_count} documents dropped",
}
except PyMongoError as e: except PyMongoError as e:
logger.error(f"Error dropping graph {self._collection_name}: {e}") logger.error(f"Error dropping graph {self._collection_name}: {e}")
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}
@@ -1211,10 +1228,10 @@ class MongoVectorDBStorage(BaseVectorStorage):
except Exception as e: except Exception as e:
logger.error(f"Error retrieving vector data for IDs {ids}: {e}") logger.error(f"Error retrieving vector data for IDs {ids}: {e}")
return [] return []
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop the storage by removing all documents in the collection and recreating vector index. """Drop the storage by removing all documents in the collection and recreating vector index.
Returns: Returns:
dict[str, str]: Status of the operation with keys 'status' and 'message' dict[str, str]: Status of the operation with keys 'status' and 'message'
""" """
@@ -1222,12 +1239,17 @@ class MongoVectorDBStorage(BaseVectorStorage):
# Delete all documents # Delete all documents
result = await self._data.delete_many({}) result = await self._data.delete_many({})
deleted_count = result.deleted_count deleted_count = result.deleted_count
# Recreate vector index # Recreate vector index
await self.create_vector_index_if_not_exists() await self.create_vector_index_if_not_exists()
logger.info(f"Dropped {deleted_count} documents from vector storage {self._collection_name} and recreated vector index") logger.info(
return {"status": "success", "message": f"{deleted_count} documents dropped and vector index recreated"} f"Dropped {deleted_count} documents from vector storage {self._collection_name} and recreated vector index"
)
return {
"status": "success",
"message": f"{deleted_count} documents dropped and vector index recreated",
}
except PyMongoError as e: except PyMongoError as e:
logger.error(f"Error dropping vector storage {self._collection_name}: {e}") logger.error(f"Error dropping vector storage {self._collection_name}: {e}")
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}

View File

@@ -309,7 +309,7 @@ class NanoVectorDBStorage(BaseVectorStorage):
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop all vector data from storage and clean up resources """Drop all vector data from storage and clean up resources
This method will: This method will:
1. Remove the vector database storage file if it exists 1. Remove the vector database storage file if it exists
2. Reinitialize the vector database client 2. Reinitialize the vector database client
@@ -317,7 +317,7 @@ class NanoVectorDBStorage(BaseVectorStorage):
4. Changes is persisted to disk immediately 4. Changes is persisted to disk immediately
This method is intended for use in scenarios where all data needs to be removed, This method is intended for use in scenarios where all data needs to be removed,
Returns: Returns:
dict[str, str]: Operation status and message dict[str, str]: Operation status and message
- On success: {"status": "success", "message": "data dropped"} - On success: {"status": "success", "message": "data dropped"}
@@ -339,7 +339,9 @@ class NanoVectorDBStorage(BaseVectorStorage):
# Reset own update flag to avoid self-reloading # Reset own update flag to avoid self-reloading
self.storage_updated.value = False self.storage_updated.value = False
logger.info(f"Process {os.getpid()} drop {self.namespace}(file:{self._client_file_name})") logger.info(
f"Process {os.getpid()} drop {self.namespace}(file:{self._client_file_name})"
)
return {"status": "success", "message": "data dropped"} return {"status": "success", "message": "data dropped"}
except Exception as e: except Exception as e:
logger.error(f"Error dropping {self.namespace}: {e}") logger.error(f"Error dropping {self.namespace}: {e}")

View File

@@ -1028,12 +1028,12 @@ class Neo4JStorage(BaseGraphStorage):
self, algorithm: str self, algorithm: str
) -> tuple[np.ndarray[Any, Any], list[str]]: ) -> tuple[np.ndarray[Any, Any], list[str]]:
raise NotImplementedError raise NotImplementedError
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop all data from storage and clean up resources """Drop all data from storage and clean up resources
This method will delete all nodes and relationships in the Neo4j database. This method will delete all nodes and relationships in the Neo4j database.
Returns: Returns:
dict[str, str]: Operation status and message dict[str, str]: Operation status and message
- On success: {"status": "success", "message": "data dropped"} - On success: {"status": "success", "message": "data dropped"}
@@ -1045,8 +1045,10 @@ class Neo4JStorage(BaseGraphStorage):
query = "MATCH (n) DETACH DELETE n" query = "MATCH (n) DETACH DELETE n"
result = await session.run(query) result = await session.run(query)
await result.consume() # Ensure result is fully consumed await result.consume() # Ensure result is fully consumed
logger.info(f"Process {os.getpid()} drop Neo4j database {self._DATABASE}") logger.info(
f"Process {os.getpid()} drop Neo4j database {self._DATABASE}"
)
return {"status": "success", "message": "data dropped"} return {"status": "success", "message": "data dropped"}
except Exception as e: except Exception as e:
logger.error(f"Error dropping Neo4j database {self._DATABASE}: {e}") logger.error(f"Error dropping Neo4j database {self._DATABASE}: {e}")

View File

@@ -457,13 +457,13 @@ class NetworkXStorage(BaseGraphStorage):
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop all graph data from storage and clean up resources """Drop all graph data from storage and clean up resources
This method will: This method will:
1. Remove the graph storage file if it exists 1. Remove the graph storage file if it exists
2. Reset the graph to an empty state 2. Reset the graph to an empty state
3. Update flags to notify other processes 3. Update flags to notify other processes
4. Changes is persisted to disk immediately 4. Changes is persisted to disk immediately
Returns: Returns:
dict[str, str]: Operation status and message dict[str, str]: Operation status and message
- On success: {"status": "success", "message": "data dropped"} - On success: {"status": "success", "message": "data dropped"}
@@ -479,7 +479,9 @@ class NetworkXStorage(BaseGraphStorage):
await set_all_update_flags(self.namespace) await set_all_update_flags(self.namespace)
# Reset own update flag to avoid self-reloading # Reset own update flag to avoid self-reloading
self.storage_updated.value = False self.storage_updated.value = False
logger.info(f"Process {os.getpid()} drop graph {self.namespace} (file:{self._graphml_xml_file})") logger.info(
f"Process {os.getpid()} drop graph {self.namespace} (file:{self._graphml_xml_file})"
)
return {"status": "success", "message": "data dropped"} return {"status": "success", "message": "data dropped"}
except Exception as e: except Exception as e:
logger.error(f"Error dropping graph {self.namespace}: {e}") logger.error(f"Error dropping graph {self.namespace}: {e}")

View File

@@ -27,7 +27,7 @@ if not pm.is_installed("oracledb"):
pm.install("oracledb") pm.install("oracledb")
from graspologic import embed from graspologic import embed
import oracledb # type: ignore import oracledb # type: ignore
class OracleDB: class OracleDB:
@@ -406,43 +406,45 @@ class OracleKVStorage(BaseKVStorage):
if not table_name: if not table_name:
logger.error(f"Unknown namespace for deletion: {self.namespace}") logger.error(f"Unknown namespace for deletion: {self.namespace}")
return return
ids_list = ",".join([f"'{id}'" for id in ids]) ids_list = ",".join([f"'{id}'" for id in ids])
delete_sql = f"DELETE FROM {table_name} WHERE workspace=:workspace AND id IN ({ids_list})" delete_sql = f"DELETE FROM {table_name} WHERE workspace=:workspace AND id IN ({ids_list})"
await self.db.execute(delete_sql, {"workspace": self.db.workspace}) await self.db.execute(delete_sql, {"workspace": self.db.workspace})
logger.info(f"Successfully deleted {len(ids)} records from {self.namespace}") logger.info(
f"Successfully deleted {len(ids)} records from {self.namespace}"
)
except Exception as e: except Exception as e:
logger.error(f"Error deleting records from {self.namespace}: {e}") logger.error(f"Error deleting records from {self.namespace}: {e}")
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
"""Delete specific records from storage by cache mode """Delete specific records from storage by cache mode
Args: Args:
modes (list[str]): List of cache modes to be dropped from storage modes (list[str]): List of cache modes to be dropped from storage
Returns: Returns:
bool: True if successful, False otherwise bool: True if successful, False otherwise
""" """
if not modes: if not modes:
return False return False
try: try:
table_name = namespace_to_table_name(self.namespace) table_name = namespace_to_table_name(self.namespace)
if not table_name: if not table_name:
return False return False
if table_name != "LIGHTRAG_LLM_CACHE": if table_name != "LIGHTRAG_LLM_CACHE":
return False return False
# 构建Oracle风格的IN查询 # 构建Oracle风格的IN查询
modes_list = ", ".join([f"'{mode}'" for mode in modes]) modes_list = ", ".join([f"'{mode}'" for mode in modes])
sql = f""" sql = f"""
DELETE FROM {table_name} DELETE FROM {table_name}
WHERE workspace = :workspace WHERE workspace = :workspace
AND cache_mode IN ({modes_list}) AND cache_mode IN ({modes_list})
""" """
logger.info(f"Deleting cache by modes: {modes}") logger.info(f"Deleting cache by modes: {modes}")
await self.db.execute(sql, {"workspace": self.db.workspace}) await self.db.execute(sql, {"workspace": self.db.workspace})
return True return True
@@ -455,8 +457,11 @@ class OracleKVStorage(BaseKVStorage):
try: try:
table_name = namespace_to_table_name(self.namespace) table_name = namespace_to_table_name(self.namespace)
if not table_name: if not table_name:
return {"status": "error", "message": f"Unknown namespace: {self.namespace}"} return {
"status": "error",
"message": f"Unknown namespace: {self.namespace}",
}
drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
table_name=table_name table_name=table_name
) )
@@ -683,8 +688,11 @@ class OracleVectorDBStorage(BaseVectorStorage):
try: try:
table_name = namespace_to_table_name(self.namespace) table_name = namespace_to_table_name(self.namespace)
if not table_name: if not table_name:
return {"status": "error", "message": f"Unknown namespace: {self.namespace}"} return {
"status": "error",
"message": f"Unknown namespace: {self.namespace}",
}
drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
table_name=table_name table_name=table_name
) )
@@ -1025,12 +1033,16 @@ class OracleGraphStorage(BaseGraphStorage):
"""Drop the storage""" """Drop the storage"""
try: try:
# 使用图形查询删除所有节点和关系 # 使用图形查询删除所有节点和关系
delete_edges_sql = """DELETE FROM LIGHTRAG_GRAPH_EDGES WHERE workspace=:workspace""" delete_edges_sql = (
"""DELETE FROM LIGHTRAG_GRAPH_EDGES WHERE workspace=:workspace"""
)
await self.db.execute(delete_edges_sql, {"workspace": self.db.workspace}) await self.db.execute(delete_edges_sql, {"workspace": self.db.workspace})
delete_nodes_sql = """DELETE FROM LIGHTRAG_GRAPH_NODES WHERE workspace=:workspace""" delete_nodes_sql = (
"""DELETE FROM LIGHTRAG_GRAPH_NODES WHERE workspace=:workspace"""
)
await self.db.execute(delete_nodes_sql, {"workspace": self.db.workspace}) await self.db.execute(delete_nodes_sql, {"workspace": self.db.workspace})
return {"status": "success", "message": "graph data dropped"} return {"status": "success", "message": "graph data dropped"}
except Exception as e: except Exception as e:
logger.error(f"Error dropping graph: {e}") logger.error(f"Error dropping graph: {e}")

View File

@@ -380,10 +380,10 @@ class PGKVStorage(BaseKVStorage):
async def delete(self, ids: list[str]) -> None: async def delete(self, ids: list[str]) -> None:
"""Delete specific records from storage by their IDs """Delete specific records from storage by their IDs
Args: Args:
ids (list[str]): List of document IDs to be deleted from storage ids (list[str]): List of document IDs to be deleted from storage
Returns: Returns:
None None
""" """
@@ -398,40 +398,41 @@ class PGKVStorage(BaseKVStorage):
delete_sql = f"DELETE FROM {table_name} WHERE workspace=$1 AND id = ANY($2)" delete_sql = f"DELETE FROM {table_name} WHERE workspace=$1 AND id = ANY($2)"
try: try:
await self.db.execute(delete_sql, {"workspace": self.db.workspace, "ids": ids}) await self.db.execute(
logger.debug(f"Successfully deleted {len(ids)} records from {self.namespace}") delete_sql, {"workspace": self.db.workspace, "ids": ids}
)
logger.debug(
f"Successfully deleted {len(ids)} records from {self.namespace}"
)
except Exception as e: except Exception as e:
logger.error(f"Error while deleting records from {self.namespace}: {e}") logger.error(f"Error while deleting records from {self.namespace}: {e}")
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
"""Delete specific records from storage by cache mode """Delete specific records from storage by cache mode
Args: Args:
modes (list[str]): List of cache modes to be dropped from storage modes (list[str]): List of cache modes to be dropped from storage
Returns: Returns:
bool: True if successful, False otherwise bool: True if successful, False otherwise
""" """
if not modes: if not modes:
return False return False
try: try:
table_name = namespace_to_table_name(self.namespace) table_name = namespace_to_table_name(self.namespace)
if not table_name: if not table_name:
return False return False
if table_name != "LIGHTRAG_LLM_CACHE": if table_name != "LIGHTRAG_LLM_CACHE":
return False return False
sql = f""" sql = f"""
DELETE FROM {table_name} DELETE FROM {table_name}
WHERE workspace = $1 AND mode = ANY($2) WHERE workspace = $1 AND mode = ANY($2)
""" """
params = { params = {"workspace": self.db.workspace, "modes": modes}
"workspace": self.db.workspace,
"modes": modes
}
logger.info(f"Deleting cache by modes: {modes}") logger.info(f"Deleting cache by modes: {modes}")
await self.db.execute(sql, params) await self.db.execute(sql, params)
return True return True
@@ -444,8 +445,11 @@ class PGKVStorage(BaseKVStorage):
try: try:
table_name = namespace_to_table_name(self.namespace) table_name = namespace_to_table_name(self.namespace)
if not table_name: if not table_name:
return {"status": "error", "message": f"Unknown namespace: {self.namespace}"} return {
"status": "error",
"message": f"Unknown namespace: {self.namespace}",
}
drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
table_name=table_name table_name=table_name
) )
@@ -622,7 +626,9 @@ class PGVectorStorage(BaseVectorStorage):
delete_sql = f"DELETE FROM {table_name} WHERE workspace=$1 AND id = ANY($2)" delete_sql = f"DELETE FROM {table_name} WHERE workspace=$1 AND id = ANY($2)"
try: try:
await self.db.execute(delete_sql, {"workspace": self.db.workspace, "ids": ids}) await self.db.execute(
delete_sql, {"workspace": self.db.workspace, "ids": ids}
)
logger.debug( logger.debug(
f"Successfully deleted {len(ids)} vectors from {self.namespace}" f"Successfully deleted {len(ids)} vectors from {self.namespace}"
) )
@@ -759,8 +765,11 @@ class PGVectorStorage(BaseVectorStorage):
try: try:
table_name = namespace_to_table_name(self.namespace) table_name = namespace_to_table_name(self.namespace)
if not table_name: if not table_name:
return {"status": "error", "message": f"Unknown namespace: {self.namespace}"} return {
"status": "error",
"message": f"Unknown namespace: {self.namespace}",
}
drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
table_name=table_name table_name=table_name
) )
@@ -930,8 +939,11 @@ class PGDocStatusStorage(DocStatusStorage):
try: try:
table_name = namespace_to_table_name(self.namespace) table_name = namespace_to_table_name(self.namespace)
if not table_name: if not table_name:
return {"status": "error", "message": f"Unknown namespace: {self.namespace}"} return {
"status": "error",
"message": f"Unknown namespace: {self.namespace}",
}
drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
table_name=table_name table_name=table_name
) )
@@ -1626,7 +1638,7 @@ class PGGraphStorage(BaseGraphStorage):
MATCH (n) MATCH (n)
DETACH DELETE n DETACH DELETE n
$$) AS (result agtype)""" $$) AS (result agtype)"""
await self._query(drop_query, readonly=False) await self._query(drop_query, readonly=False)
return {"status": "success", "message": "graph data dropped"} return {"status": "success", "message": "graph data dropped"}
except Exception as e: except Exception as e:
@@ -1812,7 +1824,7 @@ SQL_TEMPLATES = {
chunk_ids=EXCLUDED.chunk_ids, chunk_ids=EXCLUDED.chunk_ids,
file_path=EXCLUDED.file_path, file_path=EXCLUDED.file_path,
update_time = CURRENT_TIMESTAMP update_time = CURRENT_TIMESTAMP
""", """,
"relationships": """ "relationships": """
WITH relevant_chunks AS ( WITH relevant_chunks AS (
SELECT id as chunk_id SELECT id as chunk_id

View File

@@ -13,11 +13,12 @@ import pipmaster as pm
if not pm.is_installed("qdrant-client"): if not pm.is_installed("qdrant-client"):
pm.install("qdrant-client") pm.install("qdrant-client")
from qdrant_client import QdrantClient, models # type: ignore from qdrant_client import QdrantClient, models # type: ignore
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read("config.ini", "utf-8") config.read("config.ini", "utf-8")
def compute_mdhash_id_for_qdrant( def compute_mdhash_id_for_qdrant(
content: str, prefix: str = "", style: str = "simple" content: str, prefix: str = "", style: str = "simple"
) -> str: ) -> str:
@@ -272,7 +273,7 @@ class QdrantVectorDBStorage(BaseVectorStorage):
except Exception as e: except Exception as e:
logger.error(f"Error searching for prefix '{prefix}': {e}") logger.error(f"Error searching for prefix '{prefix}': {e}")
return [] return []
async def get_by_id(self, id: str) -> dict[str, Any] | None: async def get_by_id(self, id: str) -> dict[str, Any] | None:
"""Get vector data by its ID """Get vector data by its ID
@@ -285,22 +286,22 @@ class QdrantVectorDBStorage(BaseVectorStorage):
try: try:
# Convert to Qdrant compatible ID # Convert to Qdrant compatible ID
qdrant_id = compute_mdhash_id_for_qdrant(id) qdrant_id = compute_mdhash_id_for_qdrant(id)
# Retrieve the point by ID # Retrieve the point by ID
result = self._client.retrieve( result = self._client.retrieve(
collection_name=self.namespace, collection_name=self.namespace,
ids=[qdrant_id], ids=[qdrant_id],
with_payload=True, with_payload=True,
) )
if not result: if not result:
return None return None
return result[0].payload return result[0].payload
except Exception as e: except Exception as e:
logger.error(f"Error retrieving vector data for ID {id}: {e}") logger.error(f"Error retrieving vector data for ID {id}: {e}")
return None return None
async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
"""Get multiple vector data by their IDs """Get multiple vector data by their IDs
@@ -312,28 +313,28 @@ class QdrantVectorDBStorage(BaseVectorStorage):
""" """
if not ids: if not ids:
return [] return []
try: try:
# Convert to Qdrant compatible IDs # Convert to Qdrant compatible IDs
qdrant_ids = [compute_mdhash_id_for_qdrant(id) for id in ids] qdrant_ids = [compute_mdhash_id_for_qdrant(id) for id in ids]
# Retrieve the points by IDs # Retrieve the points by IDs
results = self._client.retrieve( results = self._client.retrieve(
collection_name=self.namespace, collection_name=self.namespace,
ids=qdrant_ids, ids=qdrant_ids,
with_payload=True, with_payload=True,
) )
return [point.payload for point in results] return [point.payload for point in results]
except Exception as e: except Exception as e:
logger.error(f"Error retrieving vector data for IDs {ids}: {e}") logger.error(f"Error retrieving vector data for IDs {ids}: {e}")
return [] return []
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop all vector data from storage and clean up resources """Drop all vector data from storage and clean up resources
This method will delete all data from the Qdrant collection. This method will delete all data from the Qdrant collection.
Returns: Returns:
dict[str, str]: Operation status and message dict[str, str]: Operation status and message
- On success: {"status": "success", "message": "data dropped"} - On success: {"status": "success", "message": "data dropped"}
@@ -343,17 +344,20 @@ class QdrantVectorDBStorage(BaseVectorStorage):
# Delete the collection and recreate it # Delete the collection and recreate it
if self._client.collection_exists(self.namespace): if self._client.collection_exists(self.namespace):
self._client.delete_collection(self.namespace) self._client.delete_collection(self.namespace)
# Recreate the collection # Recreate the collection
QdrantVectorDBStorage.create_collection_if_not_exist( QdrantVectorDBStorage.create_collection_if_not_exist(
self._client, self._client,
self.namespace, self.namespace,
vectors_config=models.VectorParams( vectors_config=models.VectorParams(
size=self.embedding_func.embedding_dim, distance=models.Distance.COSINE size=self.embedding_func.embedding_dim,
distance=models.Distance.COSINE,
), ),
) )
logger.info(f"Process {os.getpid()} drop Qdrant collection {self.namespace}") logger.info(
f"Process {os.getpid()} drop Qdrant collection {self.namespace}"
)
return {"status": "success", "message": "data dropped"} return {"status": "success", "message": "data dropped"}
except Exception as e: except Exception as e:
logger.error(f"Error dropping Qdrant collection {self.namespace}: {e}") logger.error(f"Error dropping Qdrant collection {self.namespace}: {e}")

View File

@@ -8,7 +8,7 @@ if not pm.is_installed("redis"):
pm.install("redis") pm.install("redis")
# aioredis is a depricated library, replaced with redis # aioredis is a depricated library, replaced with redis
from redis.asyncio import Redis # type: ignore from redis.asyncio import Redis # type: ignore
from lightrag.utils import logger from lightrag.utils import logger
from lightrag.base import BaseKVStorage from lightrag.base import BaseKVStorage
import json import json
@@ -83,51 +83,51 @@ class RedisKVStorage(BaseKVStorage):
logger.info( logger.info(
f"Deleted {deleted_count} of {len(ids)} entries from {self.namespace}" f"Deleted {deleted_count} of {len(ids)} entries from {self.namespace}"
) )
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
"""Delete specific records from storage by by cache mode """Delete specific records from storage by by cache mode
Importance notes for Redis storage: Importance notes for Redis storage:
1. This will immediately delete the specified cache modes from Redis 1. This will immediately delete the specified cache modes from Redis
Args: Args:
modes (list[str]): List of cache mode to be drop from storage modes (list[str]): List of cache mode to be drop from storage
Returns: Returns:
True: if the cache drop successfully True: if the cache drop successfully
False: if the cache drop failed False: if the cache drop failed
""" """
if not modes: if not modes:
return False return False
try: try:
await self.delete(modes) await self.delete(modes)
return True return True
except Exception: except Exception:
return False return False
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop the storage by removing all keys under the current namespace. """Drop the storage by removing all keys under the current namespace.
Returns: Returns:
dict[str, str]: Status of the operation with keys 'status' and 'message' dict[str, str]: Status of the operation with keys 'status' and 'message'
""" """
try: try:
keys = await self._redis.keys(f"{self.namespace}:*") keys = await self._redis.keys(f"{self.namespace}:*")
if keys: if keys:
pipe = self._redis.pipeline() pipe = self._redis.pipeline()
for key in keys: for key in keys:
pipe.delete(key) pipe.delete(key)
results = await pipe.execute() results = await pipe.execute()
deleted_count = sum(results) deleted_count = sum(results)
logger.info(f"Dropped {deleted_count} keys from {self.namespace}") logger.info(f"Dropped {deleted_count} keys from {self.namespace}")
return {"status": "success", "message": f"{deleted_count} keys dropped"} return {"status": "success", "message": f"{deleted_count} keys dropped"}
else: else:
logger.info(f"No keys found to drop in {self.namespace}") logger.info(f"No keys found to drop in {self.namespace}")
return {"status": "success", "message": "no keys to drop"} return {"status": "success", "message": "no keys to drop"}
except Exception as e: except Exception as e:
logger.error(f"Error dropping keys from {self.namespace}: {e}") logger.error(f"Error dropping keys from {self.namespace}: {e}")
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}

View File

@@ -20,7 +20,7 @@ if not pm.is_installed("pymysql"):
if not pm.is_installed("sqlalchemy"): if not pm.is_installed("sqlalchemy"):
pm.install("sqlalchemy") pm.install("sqlalchemy")
from sqlalchemy import create_engine, text # type: ignore from sqlalchemy import create_engine, text # type: ignore
class TiDB: class TiDB:
@@ -290,47 +290,49 @@ class TiDBKVStorage(BaseKVStorage):
try: try:
table_name = namespace_to_table_name(self.namespace) table_name = namespace_to_table_name(self.namespace)
id_field = namespace_to_id(self.namespace) id_field = namespace_to_id(self.namespace)
if not table_name or not id_field: if not table_name or not id_field:
logger.error(f"Unknown namespace for deletion: {self.namespace}") logger.error(f"Unknown namespace for deletion: {self.namespace}")
return return
ids_list = ",".join([f"'{id}'" for id in ids]) ids_list = ",".join([f"'{id}'" for id in ids])
delete_sql = f"DELETE FROM {table_name} WHERE workspace = :workspace AND {id_field} IN ({ids_list})" delete_sql = f"DELETE FROM {table_name} WHERE workspace = :workspace AND {id_field} IN ({ids_list})"
await self.db.execute(delete_sql, {"workspace": self.db.workspace}) await self.db.execute(delete_sql, {"workspace": self.db.workspace})
logger.info(f"Successfully deleted {len(ids)} records from {self.namespace}") logger.info(
f"Successfully deleted {len(ids)} records from {self.namespace}"
)
except Exception as e: except Exception as e:
logger.error(f"Error deleting records from {self.namespace}: {e}") logger.error(f"Error deleting records from {self.namespace}: {e}")
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
"""Delete specific records from storage by cache mode """Delete specific records from storage by cache mode
Args: Args:
modes (list[str]): List of cache modes to be dropped from storage modes (list[str]): List of cache modes to be dropped from storage
Returns: Returns:
bool: True if successful, False otherwise bool: True if successful, False otherwise
""" """
if not modes: if not modes:
return False return False
try: try:
table_name = namespace_to_table_name(self.namespace) table_name = namespace_to_table_name(self.namespace)
if not table_name: if not table_name:
return False return False
if table_name != "LIGHTRAG_LLM_CACHE": if table_name != "LIGHTRAG_LLM_CACHE":
return False return False
# 构建MySQL风格的IN查询 # 构建MySQL风格的IN查询
modes_list = ", ".join([f"'{mode}'" for mode in modes]) modes_list = ", ".join([f"'{mode}'" for mode in modes])
sql = f""" sql = f"""
DELETE FROM {table_name} DELETE FROM {table_name}
WHERE workspace = :workspace WHERE workspace = :workspace
AND mode IN ({modes_list}) AND mode IN ({modes_list})
""" """
logger.info(f"Deleting cache by modes: {modes}") logger.info(f"Deleting cache by modes: {modes}")
await self.db.execute(sql, {"workspace": self.db.workspace}) await self.db.execute(sql, {"workspace": self.db.workspace})
return True return True
@@ -343,8 +345,11 @@ class TiDBKVStorage(BaseKVStorage):
try: try:
table_name = namespace_to_table_name(self.namespace) table_name = namespace_to_table_name(self.namespace)
if not table_name: if not table_name:
return {"status": "error", "message": f"Unknown namespace: {self.namespace}"} return {
"status": "error",
"message": f"Unknown namespace: {self.namespace}",
}
drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
table_name=table_name table_name=table_name
) )
@@ -492,7 +497,7 @@ class TiDBVectorDBStorage(BaseVectorStorage):
table_name = namespace_to_table_name(self.namespace) table_name = namespace_to_table_name(self.namespace)
id_field = namespace_to_id(self.namespace) id_field = namespace_to_id(self.namespace)
if not table_name or not id_field: if not table_name or not id_field:
logger.error(f"Unknown namespace for vector deletion: {self.namespace}") logger.error(f"Unknown namespace for vector deletion: {self.namespace}")
return return
@@ -502,7 +507,9 @@ class TiDBVectorDBStorage(BaseVectorStorage):
try: try:
await self.db.execute(delete_sql, {"workspace": self.db.workspace}) await self.db.execute(delete_sql, {"workspace": self.db.workspace})
logger.debug(f"Successfully deleted {len(ids)} vectors from {self.namespace}") logger.debug(
f"Successfully deleted {len(ids)} vectors from {self.namespace}"
)
except Exception as e: except Exception as e:
logger.error(f"Error while deleting vectors from {self.namespace}: {e}") logger.error(f"Error while deleting vectors from {self.namespace}: {e}")
@@ -551,8 +558,11 @@ class TiDBVectorDBStorage(BaseVectorStorage):
try: try:
table_name = namespace_to_table_name(self.namespace) table_name = namespace_to_table_name(self.namespace)
if not table_name: if not table_name:
return {"status": "error", "message": f"Unknown namespace: {self.namespace}"} return {
"status": "error",
"message": f"Unknown namespace: {self.namespace}",
}
drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
table_name=table_name table_name=table_name
) )