diff --git a/lightrag/base.py b/lightrag/base.py index 798e3176..ee13a11f 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -260,8 +260,3 @@ class DocStatusStorage(BaseKVStorage, ABC): @abstractmethod async def get_processed_docs(self) -> dict[str, DocProcessingStatus]: """Get all procesed documents""" - - @abstractmethod - async def update_doc_status(self, data: dict[str, Any]) -> None: - """Updates the status of a document. By default, it calls upsert.""" - await self.upsert(data) diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index b96a744c..15fbfcde 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -75,7 +75,7 @@ class JsonDocStatusStorage(DocStatusStorage): async def index_done_callback(self) -> None: write_json(self._data, self._file_name) - async def upsert(self, data: dict[str, Any]) -> None: + async def upsert(self, data: dict[str, dict[str, Any]]) -> None: self._data.update(data) await self.index_done_callback() @@ -89,6 +89,3 @@ class JsonDocStatusStorage(DocStatusStorage): async def drop(self) -> None: raise NotImplementedError - - async def update_doc_status(self, data: dict[str, Any]) -> None: - raise NotImplementedError diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 2b7996d6..0dffa7d3 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -429,9 +429,6 @@ class PGVectorStorage(BaseVectorStorage): @final @dataclass class PGDocStatusStorage(DocStatusStorage): - # db instance must be injected before use - # db: PostgreSQLDB - async def filter_keys(self, keys: set[str]) -> set[str]: """Return keys that don't exist in storage""" keys = ",".join([f"'{_id}'" for _id in keys]) @@ -516,7 +513,7 @@ class PGDocStatusStorage(DocStatusStorage): async def index_done_callback(self) -> None: pass - async def upsert(self, data: dict[str, dict]): + async def upsert(self, data: dict[str, dict[str, Any]]) -> None: """Update or insert document status Args: @@ -547,32 +544,6 @@ class PGDocStatusStorage(DocStatusStorage): ) return data - async def update_doc_status(self, data: dict[str, dict]) -> None: - """ - Updates only the document status, chunk count, and updated timestamp. - - This method ensures that only relevant fields are updated instead of overwriting - the entire document record. If `updated_at` is not provided, the database will - automatically use the current timestamp. - """ - sql = """ - UPDATE LIGHTRAG_DOC_STATUS - SET status = $3, - chunks_count = $4, - updated_at = CURRENT_TIMESTAMP - WHERE workspace = $1 AND id = $2 - """ - for k, v in data.items(): - _data = { - "workspace": self.db.workspace, - "id": k, - "status": v["status"].value, # Convert Enum to string - "chunks_count": v.get( - "chunks_count", -1 - ), # Default to -1 if not provided - } - await self.db.execute(sql, _data) - async def drop(self) -> None: raise NotImplementedError diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 23c3df80..f4e9b770 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -779,7 +779,7 @@ class LightRAG: ] try: await asyncio.gather(*tasks) - await self.doc_status.update_doc_status( + await self.doc_status.upsert( { doc_status_id: { "status": DocStatus.PROCESSED, @@ -796,7 +796,7 @@ class LightRAG: except Exception as e: logger.error(f"Failed to process document {doc_id}: {str(e)}") - await self.doc_status.update_doc_status( + await self.doc_status.upsert( { doc_status_id: { "status": DocStatus.FAILED,