updated wrong status

This commit is contained in:
Yannick Stephan
2025-02-16 15:52:59 +01:00
parent a1607bbcb9
commit 2bf238396e
5 changed files with 17 additions and 78 deletions

View File

@@ -1,9 +1,9 @@
from __future__ import annotations from __future__ import annotations
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from enum import StrEnum
import os import os
from dataclasses import dataclass, field from dataclasses import dataclass, field
from enum import Enum
from typing import ( from typing import (
Any, Any,
Literal, Literal,
@@ -203,7 +203,7 @@ class BaseGraphStorage(StorageNameSpace, ABC):
"""Retrieve a subgraph of the knowledge graph starting from a given node.""" """Retrieve a subgraph of the knowledge graph starting from a given node."""
class DocStatus(str, Enum): class DocStatus(StrEnum):
"""Document processing status enum""" """Document processing status enum"""
PENDING = "pending" PENDING = "pending"
@@ -245,18 +245,7 @@ class DocStatusStorage(BaseKVStorage, ABC):
"""Get counts of documents in each status""" """Get counts of documents in each status"""
@abstractmethod @abstractmethod
async def get_failed_docs(self) -> dict[str, DocProcessingStatus]: async def get_docs_by_status(
"""Get all failed documents""" self, status: DocStatus
) -> dict[str, DocProcessingStatus]:
@abstractmethod """Get all documents with a specific status"""
async def get_pending_docs(self) -> dict[str, DocProcessingStatus]:
"""Get all pending documents"""
raise NotImplementedError
@abstractmethod
async def get_processing_docs(self) -> dict[str, DocProcessingStatus]:
"""Get all processing documents"""
@abstractmethod
async def get_processed_docs(self) -> dict[str, DocProcessingStatus]:
"""Get all procesed documents"""

View File

@@ -44,33 +44,15 @@ class JsonDocStatusStorage(DocStatusStorage):
counts[doc["status"]] += 1 counts[doc["status"]] += 1
return counts return counts
async def get_failed_docs(self) -> dict[str, DocProcessingStatus]: async def get_docs_by_status(
self, status: DocStatus
) -> dict[str, DocProcessingStatus]:
"""Get all documents with a specific status"""
return { return {
k: DocProcessingStatus(**v) k: DocProcessingStatus(**v)
for k, v in self._data.items() for k, v in self._data.items()
if v["status"] == DocStatus.FAILED if v["status"] == status.value
} }
async def get_pending_docs(self) -> dict[str, DocProcessingStatus]:
return {
k: DocProcessingStatus(**v)
for k, v in self._data.items()
if v["status"] == DocStatus.PENDING
}
async def get_processed_docs(self) -> dict[str, DocProcessingStatus]:
return {
k: DocProcessingStatus(**v)
for k, v in self._data.items()
if v["status"] == DocStatus.PROCESSED
}
async def get_processing_docs(self) -> dict[str, DocProcessingStatus]:
return {
k: DocProcessingStatus(**v)
for k, v in self._data.items()
if v["status"] == DocStatus.PROCESSING
}
async def index_done_callback(self) -> None: async def index_done_callback(self) -> None:
write_json(self._data, self._file_name) write_json(self._data, self._file_name)

View File

@@ -201,22 +201,6 @@ class MongoDocStatusStorage(DocStatusStorage):
for doc in result for doc in result
} }
async def get_failed_docs(self) -> dict[str, DocProcessingStatus]:
"""Get all failed documents"""
return await self.get_docs_by_status(DocStatus.FAILED)
async def get_pending_docs(self) -> dict[str, DocProcessingStatus]:
"""Get all pending documents"""
return await self.get_docs_by_status(DocStatus.PENDING)
async def get_processing_docs(self) -> dict[str, DocProcessingStatus]:
"""Get all processing documents"""
return await self.get_docs_by_status(DocStatus.PROCESSING)
async def get_processed_docs(self) -> dict[str, DocProcessingStatus]:
"""Get all procesed documents"""
return await self.get_docs_by_status(DocStatus.PROCESSED)
async def index_done_callback(self) -> None: async def index_done_callback(self) -> None:
# Implement the method here # Implement the method here
pass pass

View File

@@ -484,7 +484,7 @@ class PGDocStatusStorage(DocStatusStorage):
) -> Dict[str, DocProcessingStatus]: ) -> Dict[str, DocProcessingStatus]:
"""all documents with a specific status""" """all documents with a specific status"""
sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and status=$2" sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and status=$2"
params = {"workspace": self.db.workspace, "status": status} params = {"workspace": self.db.workspace, "status": status.value}
result = await self.db.query(sql, params, True) result = await self.db.query(sql, params, True)
return { return {
element["id"]: DocProcessingStatus( element["id"]: DocProcessingStatus(
@@ -499,22 +499,6 @@ class PGDocStatusStorage(DocStatusStorage):
for element in result for element in result
} }
async def get_failed_docs(self) -> Dict[str, DocProcessingStatus]:
"""Get all failed documents"""
return await self.get_docs_by_status(DocStatus.FAILED)
async def get_pending_docs(self) -> Dict[str, DocProcessingStatus]:
"""Get all pending documents"""
return await self.get_docs_by_status(DocStatus.PENDING)
async def get_processing_docs(self) -> dict[str, DocProcessingStatus]:
"""Get all processing documents"""
return await self.get_docs_by_status(DocStatus.PROCESSING)
async def get_processed_docs(self) -> dict[str, DocProcessingStatus]:
"""Get all procesed documents"""
return await self.get_docs_by_status(DocStatus.PROCESSED)
async def index_done_callback(self) -> None: async def index_done_callback(self) -> None:
pass pass

View File

@@ -1,5 +1,5 @@
import os import os
from typing import Any from typing import Any, final
from tqdm.asyncio import tqdm as tqdm_async from tqdm.asyncio import tqdm as tqdm_async
from dataclasses import dataclass from dataclasses import dataclass
import pipmaster as pm import pipmaster as pm
@@ -18,7 +18,7 @@ import json
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read("config.ini", "utf-8") config.read("config.ini", "utf-8")
@final
@dataclass @dataclass
class RedisKVStorage(BaseKVStorage): class RedisKVStorage(BaseKVStorage):
def __post_init__(self): def __post_init__(self):