diff --git a/lightrag/base.py b/lightrag/base.py index 1a7f9c2e..68978280 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -202,3 +202,7 @@ class DocStatusStorage(BaseKVStorage): async def get_pending_docs(self) -> dict[str, DocProcessingStatus]: """Get all pending documents""" raise NotImplementedError + + async def update_doc_status(self, data: dict[str, Any]) -> None: + """Updates the status of a document. By default, it calls upsert.""" + await self.upsert(data) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index d319f6f9..f797d2b6 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -471,7 +471,7 @@ class PGDocStatusStorage(DocStatusStorage): self, status: DocStatus ) -> Dict[str, DocProcessingStatus]: """Get all documents by status""" - sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and status=$1" + sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and status=$2" params = {"workspace": self.db.workspace, "status": status} result = await self.db.query(sql, params, True) return { @@ -505,8 +505,8 @@ class PGDocStatusStorage(DocStatusStorage): Args: data: Dictionary of document IDs and their status data """ - sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content_summary,content_length,chunks_count,status) - values($1,$2,$3,$4,$5,$6) + sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content,content_summary,content_length,chunks_count,status) + values($1,$2,$3,$4,$5,$6,$7) on conflict(id,workspace) do update set content = EXCLUDED.content, content_summary = EXCLUDED.content_summary, @@ -530,6 +530,30 @@ class PGDocStatusStorage(DocStatusStorage): ) return data + async def update_doc_status(self, data: dict[str, dict]) -> None: + """ + Updates only the document status, chunk count, and updated timestamp. + + This method ensures that only relevant fields are updated instead of overwriting + the entire document record. If `updated_at` is not provided, the database will + automatically use the current timestamp. + """ + sql = """ + UPDATE LIGHTRAG_DOC_STATUS + SET status = $3, + chunks_count = $4, + updated_at = CURRENT_TIMESTAMP + WHERE workspace = $1 AND id = $2 + """ + for k, v in data.items(): + _data = { + "workspace": self.db.workspace, + "id": k, + "status": v["status"].value, # Convert Enum to string + "chunks_count": v.get("chunks_count", -1), # Default to -1 if not provided + } + await self.db.execute(sql, _data) + class PGGraphQueryException(Exception): """Exception for the AGE queries.""" @@ -1103,6 +1127,7 @@ TABLES = { "ddl": """CREATE TABLE LIGHTRAG_DOC_STATUS ( workspace varchar(255) NOT NULL, id varchar(255) NOT NULL, + content TEXT, content_summary varchar(255) NULL, content_length int4 NULL, chunks_count int4 NULL, diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 347f0f4c..594d1997 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -542,6 +542,7 @@ class LightRAG: doc_status_id: { "status": DocStatus.PROCESSING, "updated_at": datetime.now().isoformat(), + "content": status_doc.content, "content_summary": status_doc.content_summary, "content_length": status_doc.content_length, "created_at": status_doc.created_at, @@ -573,7 +574,7 @@ class LightRAG: ] try: await asyncio.gather(*tasks) - await self.doc_status.upsert( + await self.doc_status.update_doc_status( { doc_status_id: { "status": DocStatus.PROCESSED, @@ -586,7 +587,7 @@ class LightRAG: except Exception as e: logger.error(f"Failed to process document {doc_id}: {str(e)}") - await self.doc_status.upsert( + await self.doc_status.update_doc_status( { doc_status_id: { "status": DocStatus.FAILED,