fix(postgres): update document status with partial update instead of full upsert
This commit is contained in:
@@ -202,3 +202,7 @@ class DocStatusStorage(BaseKVStorage):
|
|||||||
async def get_pending_docs(self) -> dict[str, DocProcessingStatus]:
|
async def get_pending_docs(self) -> dict[str, DocProcessingStatus]:
|
||||||
"""Get all pending documents"""
|
"""Get all pending documents"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
async def update_doc_status(self, data: dict[str, Any]) -> None:
|
||||||
|
"""Updates the status of a document. By default, it calls upsert."""
|
||||||
|
await self.upsert(data)
|
||||||
|
@@ -471,7 +471,7 @@ class PGDocStatusStorage(DocStatusStorage):
|
|||||||
self, status: DocStatus
|
self, status: DocStatus
|
||||||
) -> Dict[str, DocProcessingStatus]:
|
) -> Dict[str, DocProcessingStatus]:
|
||||||
"""Get all documents by status"""
|
"""Get all documents by status"""
|
||||||
sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and status=$1"
|
sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and status=$2"
|
||||||
params = {"workspace": self.db.workspace, "status": status}
|
params = {"workspace": self.db.workspace, "status": status}
|
||||||
result = await self.db.query(sql, params, True)
|
result = await self.db.query(sql, params, True)
|
||||||
return {
|
return {
|
||||||
@@ -505,8 +505,8 @@ class PGDocStatusStorage(DocStatusStorage):
|
|||||||
Args:
|
Args:
|
||||||
data: Dictionary of document IDs and their status data
|
data: Dictionary of document IDs and their status data
|
||||||
"""
|
"""
|
||||||
sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content_summary,content_length,chunks_count,status)
|
sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content,content_summary,content_length,chunks_count,status)
|
||||||
values($1,$2,$3,$4,$5,$6)
|
values($1,$2,$3,$4,$5,$6,$7)
|
||||||
on conflict(id,workspace) do update set
|
on conflict(id,workspace) do update set
|
||||||
content = EXCLUDED.content,
|
content = EXCLUDED.content,
|
||||||
content_summary = EXCLUDED.content_summary,
|
content_summary = EXCLUDED.content_summary,
|
||||||
@@ -530,6 +530,30 @@ class PGDocStatusStorage(DocStatusStorage):
|
|||||||
)
|
)
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
async def update_doc_status(self, data: dict[str, dict]) -> None:
|
||||||
|
"""
|
||||||
|
Updates only the document status, chunk count, and updated timestamp.
|
||||||
|
|
||||||
|
This method ensures that only relevant fields are updated instead of overwriting
|
||||||
|
the entire document record. If `updated_at` is not provided, the database will
|
||||||
|
automatically use the current timestamp.
|
||||||
|
"""
|
||||||
|
sql = """
|
||||||
|
UPDATE LIGHTRAG_DOC_STATUS
|
||||||
|
SET status = $3,
|
||||||
|
chunks_count = $4,
|
||||||
|
updated_at = CURRENT_TIMESTAMP
|
||||||
|
WHERE workspace = $1 AND id = $2
|
||||||
|
"""
|
||||||
|
for k, v in data.items():
|
||||||
|
_data = {
|
||||||
|
"workspace": self.db.workspace,
|
||||||
|
"id": k,
|
||||||
|
"status": v["status"].value, # Convert Enum to string
|
||||||
|
"chunks_count": v.get("chunks_count", -1), # Default to -1 if not provided
|
||||||
|
}
|
||||||
|
await self.db.execute(sql, _data)
|
||||||
|
|
||||||
|
|
||||||
class PGGraphQueryException(Exception):
|
class PGGraphQueryException(Exception):
|
||||||
"""Exception for the AGE queries."""
|
"""Exception for the AGE queries."""
|
||||||
@@ -1103,6 +1127,7 @@ TABLES = {
|
|||||||
"ddl": """CREATE TABLE LIGHTRAG_DOC_STATUS (
|
"ddl": """CREATE TABLE LIGHTRAG_DOC_STATUS (
|
||||||
workspace varchar(255) NOT NULL,
|
workspace varchar(255) NOT NULL,
|
||||||
id varchar(255) NOT NULL,
|
id varchar(255) NOT NULL,
|
||||||
|
content TEXT,
|
||||||
content_summary varchar(255) NULL,
|
content_summary varchar(255) NULL,
|
||||||
content_length int4 NULL,
|
content_length int4 NULL,
|
||||||
chunks_count int4 NULL,
|
chunks_count int4 NULL,
|
||||||
|
@@ -542,6 +542,7 @@ class LightRAG:
|
|||||||
doc_status_id: {
|
doc_status_id: {
|
||||||
"status": DocStatus.PROCESSING,
|
"status": DocStatus.PROCESSING,
|
||||||
"updated_at": datetime.now().isoformat(),
|
"updated_at": datetime.now().isoformat(),
|
||||||
|
"content": status_doc.content,
|
||||||
"content_summary": status_doc.content_summary,
|
"content_summary": status_doc.content_summary,
|
||||||
"content_length": status_doc.content_length,
|
"content_length": status_doc.content_length,
|
||||||
"created_at": status_doc.created_at,
|
"created_at": status_doc.created_at,
|
||||||
@@ -573,7 +574,7 @@ class LightRAG:
|
|||||||
]
|
]
|
||||||
try:
|
try:
|
||||||
await asyncio.gather(*tasks)
|
await asyncio.gather(*tasks)
|
||||||
await self.doc_status.upsert(
|
await self.doc_status.update_doc_status(
|
||||||
{
|
{
|
||||||
doc_status_id: {
|
doc_status_id: {
|
||||||
"status": DocStatus.PROCESSED,
|
"status": DocStatus.PROCESSED,
|
||||||
@@ -586,7 +587,7 @@ class LightRAG:
|
|||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to process document {doc_id}: {str(e)}")
|
logger.error(f"Failed to process document {doc_id}: {str(e)}")
|
||||||
await self.doc_status.upsert(
|
await self.doc_status.update_doc_status(
|
||||||
{
|
{
|
||||||
doc_status_id: {
|
doc_status_id: {
|
||||||
"status": DocStatus.FAILED,
|
"status": DocStatus.FAILED,
|
||||||
|
Reference in New Issue
Block a user