From 7a89916bab12966f1b1398afcf9fb048b1704cf6 Mon Sep 17 00:00:00 2001 From: yangdx Date: Thu, 13 Feb 2025 01:27:27 +0800 Subject: [PATCH] Add method to retrieve in-progress documents in DocStatusStorage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Add get_processing_docs() abstract method • Override get_processing_docs() in PG storage • Method retrieves docs with PROCESSING status • Keep consistent with existing status methods --- lightrag/base.py | 4 ++++ lightrag/kg/postgres_impl.py | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/lightrag/base.py b/lightrag/base.py index bd79d990..147cb444 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -227,6 +227,10 @@ class DocStatusStorage(BaseKVStorage): """Get all pending documents""" raise NotImplementedError + async def get_processing_docs(self) -> dict[str, DocProcessingStatus]: + """Get all documents that are currently being processed""" + raise NotImplementedError + async def update_doc_status(self, data: dict[str, Any]) -> None: """Updates the status of a document. By default, it calls upsert.""" await self.upsert(data) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 08d559fb..4b6f524f 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -493,6 +493,10 @@ class PGDocStatusStorage(DocStatusStorage): """Get all pending documents""" return await self.get_docs_by_status(DocStatus.PENDING) + async def get_processing_docs(self) -> Dict[str, DocProcessingStatus]: + """Get all documents that are currently being processed""" + return await self.get_docs_by_status(DocStatus.PROCESSING) + async def index_done_callback(self): """Save data after indexing, but for PostgreSQL, we already saved them during the upsert stage, so no action to take here""" logger.info("Doc status had been saved into postgresql db!")