From d274beb9d24eb16280e7599a955564ac7447186d Mon Sep 17 00:00:00 2001 From: Saifeddine ALOUI Date: Mon, 27 Jan 2025 09:08:14 +0100 Subject: [PATCH] Create jsondocstatus_storage.py --- lightrag/storage/jsondocstatus_storage.py | 139 ++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 lightrag/storage/jsondocstatus_storage.py diff --git a/lightrag/storage/jsondocstatus_storage.py b/lightrag/storage/jsondocstatus_storage.py new file mode 100644 index 00000000..27da40db --- /dev/null +++ b/lightrag/storage/jsondocstatus_storage.py @@ -0,0 +1,139 @@ +""" +JsonDocStatus Storage Module +======================= + +This module provides a storage interface for graphs using NetworkX, a popular Python library for creating, manipulating, and studying the structure, dynamics, and functions of complex networks. + +The `NetworkXStorage` class extends the `BaseGraphStorage` class from the LightRAG library, providing methods to load, save, manipulate, and query graphs using NetworkX. + +Author: lightrag team +Created: 2024-01-25 +License: MIT + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + +Version: 1.0.0 + +Dependencies: + - NetworkX + - NumPy + - LightRAG + - graspologic + +Features: + - Load and save graphs in various formats (e.g., GEXF, GraphML, JSON) + - Query graph nodes and edges + - Calculate node and edge degrees + - Embed nodes using various algorithms (e.g., Node2Vec) + - Remove nodes and edges from the graph + +Usage: + from lightrag.storage.networkx_storage import NetworkXStorage + +""" + + +import asyncio +import html +import os +from tqdm.asyncio import tqdm as tqdm_async +from dataclasses import dataclass +from typing import Any, Union, cast, Dict +import numpy as np + +import time + +from lightrag.utils import ( + logger, + load_json, + write_json, + compute_mdhash_id, +) + +from lightrag.base import ( + BaseGraphStorage, + BaseKVStorage, + BaseVectorStorage, + DocStatus, + DocProcessingStatus, + DocStatusStorage, +) + + +@dataclass +class JsonDocStatusStorage(DocStatusStorage): + """JSON implementation of document status storage""" + + def __post_init__(self): + working_dir = self.global_config["working_dir"] + self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json") + self._data = load_json(self._file_name) or {} + logger.info(f"Loaded document status storage with {len(self._data)} records") + + async def filter_keys(self, data: list[str]) -> set[str]: + """Return keys that should be processed (not in storage or not successfully processed)""" + return set( + [ + k + for k in data + if k not in self._data or self._data[k]["status"] != DocStatus.PROCESSED + ] + ) + + async def get_status_counts(self) -> Dict[str, int]: + """Get counts of documents in each status""" + counts = {status: 0 for status in DocStatus} + for doc in self._data.values(): + counts[doc["status"]] += 1 + return counts + + async def get_failed_docs(self) -> Dict[str, DocProcessingStatus]: + """Get all failed documents""" + return {k: v for k, v in self._data.items() if v["status"] == DocStatus.FAILED} + + async def get_pending_docs(self) -> Dict[str, DocProcessingStatus]: + """Get all pending documents""" + return {k: v for k, v in self._data.items() if v["status"] == DocStatus.PENDING} + + async def index_done_callback(self): + """Save data to file after indexing""" + write_json(self._data, self._file_name) + + async def upsert(self, data: dict[str, dict]): + """Update or insert document status + + Args: + data: Dictionary of document IDs and their status data + """ + self._data.update(data) + await self.index_done_callback() + return data + + async def get_by_id(self, id: str): + return self._data.get(id) + + async def get(self, doc_id: str) -> Union[DocProcessingStatus, None]: + """Get document status by ID""" + return self._data.get(doc_id) + + async def delete(self, doc_ids: list[str]): + """Delete document status by IDs""" + for doc_id in doc_ids: + self._data.pop(doc_id, None) + await self.index_done_callback()