cleaned code
This commit is contained in:
@@ -21,7 +21,7 @@ class JsonKVStorage(BaseKVStorage):
|
|||||||
self._data: dict[str, Any] = load_json(self._file_name) or {}
|
self._data: dict[str, Any] = load_json(self._file_name) or {}
|
||||||
self._lock = asyncio.Lock()
|
self._lock = asyncio.Lock()
|
||||||
logger.info(f"Load KV {self.namespace} with {len(self._data)} data")
|
logger.info(f"Load KV {self.namespace} with {len(self._data)} data")
|
||||||
|
|
||||||
async def index_done_callback(self):
|
async def index_done_callback(self):
|
||||||
write_json(self._data, self._file_name)
|
write_json(self._data, self._file_name)
|
||||||
|
|
||||||
@@ -46,4 +46,4 @@ class JsonKVStorage(BaseKVStorage):
|
|||||||
self._data.update(left_data)
|
self._data.update(left_data)
|
||||||
|
|
||||||
async def drop(self) -> None:
|
async def drop(self) -> None:
|
||||||
self._data = {}
|
self._data = {}
|
||||||
|
@@ -50,7 +50,7 @@ Usage:
|
|||||||
|
|
||||||
import os
|
import os
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import Any, Union, Dict
|
from typing import Any, Union
|
||||||
|
|
||||||
from lightrag.utils import (
|
from lightrag.utils import (
|
||||||
logger,
|
logger,
|
||||||
@@ -85,18 +85,18 @@ class JsonDocStatusStorage(DocStatusStorage):
|
|||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
async def get_status_counts(self) -> Dict[str, int]:
|
async def get_status_counts(self) -> dict[str, int]:
|
||||||
"""Get counts of documents in each status"""
|
"""Get counts of documents in each status"""
|
||||||
counts = {status: 0 for status in DocStatus}
|
counts = {status: 0 for status in DocStatus}
|
||||||
for doc in self._data.values():
|
for doc in self._data.values():
|
||||||
counts[doc["status"]] += 1
|
counts[doc["status"]] += 1
|
||||||
return counts
|
return counts
|
||||||
|
|
||||||
async def get_failed_docs(self) -> Dict[str, DocProcessingStatus]:
|
async def get_failed_docs(self) -> dict[str, DocProcessingStatus]:
|
||||||
"""Get all failed documents"""
|
"""Get all failed documents"""
|
||||||
return {k: v for k, v in self._data.items() if v["status"] == DocStatus.FAILED}
|
return {k: v for k, v in self._data.items() if v["status"] == DocStatus.FAILED}
|
||||||
|
|
||||||
async def get_pending_docs(self) -> Dict[str, DocProcessingStatus]:
|
async def get_pending_docs(self) -> dict[str, DocProcessingStatus]:
|
||||||
"""Get all pending documents"""
|
"""Get all pending documents"""
|
||||||
return {k: v for k, v in self._data.items() if v["status"] == DocStatus.PENDING}
|
return {k: v for k, v in self._data.items() if v["status"] == DocStatus.PENDING}
|
||||||
|
|
||||||
|
@@ -74,6 +74,7 @@ class MongoKVStorage(BaseKVStorage):
|
|||||||
"""Drop the collection"""
|
"""Drop the collection"""
|
||||||
await self._data.drop()
|
await self._data.drop()
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class MongoGraphStorage(BaseGraphStorage):
|
class MongoGraphStorage(BaseGraphStorage):
|
||||||
"""
|
"""
|
||||||
|
@@ -20,7 +20,7 @@ class RedisKVStorage(BaseKVStorage):
|
|||||||
redis_url = os.environ.get("REDIS_URI", "redis://localhost:6379")
|
redis_url = os.environ.get("REDIS_URI", "redis://localhost:6379")
|
||||||
self._redis = Redis.from_url(redis_url, decode_responses=True)
|
self._redis = Redis.from_url(redis_url, decode_responses=True)
|
||||||
logger.info(f"Use Redis as KV {self.namespace}")
|
logger.info(f"Use Redis as KV {self.namespace}")
|
||||||
|
|
||||||
async def get_by_id(self, id):
|
async def get_by_id(self, id):
|
||||||
data = await self._redis.get(f"{self.namespace}:{id}")
|
data = await self._redis.get(f"{self.namespace}:{id}")
|
||||||
return json.loads(data) if data else None
|
return json.loads(data) if data else None
|
||||||
@@ -53,4 +53,4 @@ class RedisKVStorage(BaseKVStorage):
|
|||||||
async def drop(self) -> None:
|
async def drop(self) -> None:
|
||||||
keys = await self._redis.keys(f"{self.namespace}:*")
|
keys = await self._redis.keys(f"{self.namespace}:*")
|
||||||
if keys:
|
if keys:
|
||||||
await self._redis.delete(*keys)
|
await self._redis.delete(*keys)
|
||||||
|
Reference in New Issue
Block a user