Add method to retrieve in-progress documents in DocStatusStorage

• Add get_processing_docs() abstract method
• Override get_processing_docs() in PG storage
• Method retrieves docs with PROCESSING status
• Keep consistent with existing status methods
This commit is contained in:
yangdx
2025-02-13 01:27:27 +08:00
parent 4c39cf399d
commit 7a89916bab
2 changed files with 8 additions and 0 deletions

View File

@@ -227,6 +227,10 @@ class DocStatusStorage(BaseKVStorage):
"""Get all pending documents""" """Get all pending documents"""
raise NotImplementedError raise NotImplementedError
async def get_processing_docs(self) -> dict[str, DocProcessingStatus]:
"""Get all documents that are currently being processed"""
raise NotImplementedError
async def update_doc_status(self, data: dict[str, Any]) -> None: async def update_doc_status(self, data: dict[str, Any]) -> None:
"""Updates the status of a document. By default, it calls upsert.""" """Updates the status of a document. By default, it calls upsert."""
await self.upsert(data) await self.upsert(data)

View File

@@ -493,6 +493,10 @@ class PGDocStatusStorage(DocStatusStorage):
"""Get all pending documents""" """Get all pending documents"""
return await self.get_docs_by_status(DocStatus.PENDING) return await self.get_docs_by_status(DocStatus.PENDING)
async def get_processing_docs(self) -> Dict[str, DocProcessingStatus]:
"""Get all documents that are currently being processed"""
return await self.get_docs_by_status(DocStatus.PROCESSING)
async def index_done_callback(self): async def index_done_callback(self):
"""Save data after indexing, but for PostgreSQL, we already saved them during the upsert stage, so no action to take here""" """Save data after indexing, but for PostgreSQL, we already saved them during the upsert stage, so no action to take here"""
logger.info("Doc status had been saved into postgresql db!") logger.info("Doc status had been saved into postgresql db!")