diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index 2fb753fe..f19463ce 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -51,6 +51,7 @@ Usage: import asyncio import os from dataclasses import dataclass +from typing import Any, Union from lightrag.utils import ( logger, @@ -68,7 +69,7 @@ class JsonKVStorage(BaseKVStorage): def __post_init__(self): working_dir = self.global_config["working_dir"] self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json") - self._data = load_json(self._file_name) or {} + self._data: dict[str, Any] = load_json(self._file_name) or {} self._lock = asyncio.Lock() logger.info(f"Load KV {self.namespace} with {len(self._data)} data") @@ -78,15 +79,13 @@ class JsonKVStorage(BaseKVStorage): async def index_done_callback(self): write_json(self._data, self._file_name) - async def get_by_id(self, id): + async def get_by_id(self, id: str): return self._data.get(id, None) - async def get_by_ids(self, ids, fields=None): - if fields is None: - return [self._data.get(id, None) for id in ids] + async def get_by_ids(self, ids: list[str]): return [ ( - {k: v for k, v in self._data[id].items() if k in fields} + {k: v for k, v in self._data[id].items() } if self._data.get(id, None) else None ) @@ -96,7 +95,7 @@ class JsonKVStorage(BaseKVStorage): async def filter_keys(self, data: list[str]) -> set[str]: return set([s for s in data if s not in self._data]) - async def upsert(self, data: dict[str, dict]): + async def upsert(self, data: dict[str, dict[str, Any]]): left_data = {k: v for k, v in data.items() if k not in self._data} self._data.update(left_data) return left_data @@ -104,31 +103,8 @@ class JsonKVStorage(BaseKVStorage): async def drop(self): self._data = {} - async def filter(self, filter_func): - """Filter key-value pairs based on a filter function - - Args: - filter_func: The filter function, which takes a value as an argument and returns a boolean value - - Returns: - Dict: Key-value pairs that meet the condition - """ - result = {} - async with self._lock: - for key, value in self._data.items(): - if filter_func(value): - result[key] = value - return result - - async def delete(self, ids: list[str]): - """Delete data with specified IDs - - Args: - ids: List of IDs to delete - """ - async with self._lock: - for id in ids: - if id in self._data: - del self._data[id] - await self.index_done_callback() - logger.info(f"Successfully deleted {len(ids)} items from {self.namespace}") + async def get_by_status_and_ids( + self, status: str + ) -> Union[list[dict[str, Any]], None]: + result = [v for _, v in self._data.items() if v["status"] == status] + return result if result else None