From dfd19b8d27e1079f2584c0e8be14f8ccbe27a97c Mon Sep 17 00:00:00 2001 From: zrguo Date: Mon, 17 Mar 2025 23:59:47 +0800 Subject: [PATCH] fix postgres support --- lightrag/kg/postgres_impl.py | 30 ++++++++++++++++++++++-------- lightrag/operate.py | 16 ++++++---------- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index d2630659..a862b8b6 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -423,6 +423,7 @@ class PGVectorStorage(BaseVectorStorage): "full_doc_id": item["full_doc_id"], "content": item["content"], "content_vector": json.dumps(item["__vector__"].tolist()), + "file_path": item["file_path"], } except Exception as e: logger.error(f"Error to prepare upsert,\nsql: {e}\nitem: {item}") @@ -445,6 +446,7 @@ class PGVectorStorage(BaseVectorStorage): "content": item["content"], "content_vector": json.dumps(item["__vector__"].tolist()), "chunk_ids": chunk_ids, + "file_path": item["file_path"], # TODO: add document_id } return upsert_sql, data @@ -465,6 +467,7 @@ class PGVectorStorage(BaseVectorStorage): "content": item["content"], "content_vector": json.dumps(item["__vector__"].tolist()), "chunk_ids": chunk_ids, + "file_path": item["file_path"], # TODO: add document_id } return upsert_sql, data @@ -740,6 +743,7 @@ class PGDocStatusStorage(DocStatusStorage): chunks_count=result[0]["chunks_count"], created_at=result[0]["created_at"], updated_at=result[0]["updated_at"], + file_path=result[0]["file_path"], ) async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: @@ -774,6 +778,7 @@ class PGDocStatusStorage(DocStatusStorage): created_at=element["created_at"], updated_at=element["updated_at"], chunks_count=element["chunks_count"], + file_path=element["file_path"], ) for element in result } @@ -793,14 +798,15 @@ class PGDocStatusStorage(DocStatusStorage): if not data: return - sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content,content_summary,content_length,chunks_count,status) - values($1,$2,$3,$4,$5,$6,$7) + sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content,content_summary,content_length,chunks_count,status,file_path) + values($1,$2,$3,$4,$5,$6,$7,$8) on conflict(id,workspace) do update set content = EXCLUDED.content, content_summary = EXCLUDED.content_summary, content_length = EXCLUDED.content_length, chunks_count = EXCLUDED.chunks_count, status = EXCLUDED.status, + file_path = EXCLUDED.file_path, updated_at = CURRENT_TIMESTAMP""" for k, v in data.items(): # chunks_count is optional @@ -814,6 +820,7 @@ class PGDocStatusStorage(DocStatusStorage): "content_length": v["content_length"], "chunks_count": v["chunks_count"] if "chunks_count" in v else -1, "status": v["status"], + "file_path": v["file_path"], }, ) @@ -1549,6 +1556,7 @@ TABLES = { tokens INTEGER, content TEXT, content_vector VECTOR, + file_path VARCHAR(256), create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, update_time TIMESTAMP, CONSTRAINT LIGHTRAG_DOC_CHUNKS_PK PRIMARY KEY (workspace, id) @@ -1564,6 +1572,7 @@ TABLES = { create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, update_time TIMESTAMP, chunk_id TEXT NULL, + file_path TEXT NULL, CONSTRAINT LIGHTRAG_VDB_ENTITY_PK PRIMARY KEY (workspace, id) )""" }, @@ -1578,6 +1587,7 @@ TABLES = { create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, update_time TIMESTAMP, chunk_id TEXT NULL, + file_path TEXT NULL, CONSTRAINT LIGHTRAG_VDB_RELATION_PK PRIMARY KEY (workspace, id) )""" }, @@ -1602,6 +1612,7 @@ TABLES = { content_length int4 NULL, chunks_count int4 NULL, status varchar(64) NULL, + file_path TEXT NULL, created_at timestamp DEFAULT CURRENT_TIMESTAMP NULL, updated_at timestamp DEFAULT CURRENT_TIMESTAMP NULL, CONSTRAINT LIGHTRAG_DOC_STATUS_PK PRIMARY KEY (workspace, id) @@ -1650,35 +1661,38 @@ SQL_TEMPLATES = { update_time = CURRENT_TIMESTAMP """, "upsert_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens, - chunk_order_index, full_doc_id, content, content_vector) - VALUES ($1, $2, $3, $4, $5, $6, $7) + chunk_order_index, full_doc_id, content, content_vector, file_path) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (workspace,id) DO UPDATE SET tokens=EXCLUDED.tokens, chunk_order_index=EXCLUDED.chunk_order_index, full_doc_id=EXCLUDED.full_doc_id, content = EXCLUDED.content, content_vector=EXCLUDED.content_vector, + file_path=EXCLUDED.file_path, update_time = CURRENT_TIMESTAMP """, "upsert_entity": """INSERT INTO LIGHTRAG_VDB_ENTITY (workspace, id, entity_name, content, - content_vector, chunk_ids) - VALUES ($1, $2, $3, $4, $5, $6::varchar[]) + content_vector, chunk_ids, file_path) + VALUES ($1, $2, $3, $4, $5, $6::varchar[], $7::varchar[]) ON CONFLICT (workspace,id) DO UPDATE SET entity_name=EXCLUDED.entity_name, content=EXCLUDED.content, content_vector=EXCLUDED.content_vector, chunk_ids=EXCLUDED.chunk_ids, + file_path=EXCLUDED.file_path, update_time=CURRENT_TIMESTAMP """, "upsert_relationship": """INSERT INTO LIGHTRAG_VDB_RELATION (workspace, id, source_id, - target_id, content, content_vector, chunk_ids) - VALUES ($1, $2, $3, $4, $5, $6, $7::varchar[]) + target_id, content, content_vector, chunk_ids, file_path) + VALUES ($1, $2, $3, $4, $5, $6, $7::varchar[], $8::varchar[]) ON CONFLICT (workspace,id) DO UPDATE SET source_id=EXCLUDED.source_id, target_id=EXCLUDED.target_id, content=EXCLUDED.content, content_vector=EXCLUDED.content_vector, chunk_ids=EXCLUDED.chunk_ids, + file_path=EXCLUDED.file_path, update_time = CURRENT_TIMESTAMP """, # SQL for VectorStorage diff --git a/lightrag/operate.py b/lightrag/operate.py index 11a09e40..ec450a1d 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -677,12 +677,10 @@ async def extract_entities( "entity_type": dp["entity_type"], "content": f"{dp['entity_name']}\n{dp['description']}", "source_id": dp["source_id"], - "file_path": dp.get("metadata", {}).get("file_path", "unknown_source"), + "file_path": dp.get("file_path", "unknown_source"), "metadata": { - "created_at": dp.get("metadata", {}).get("created_at", time.time()), - "file_path": dp.get("metadata", {}).get( - "file_path", "unknown_source" - ), + "created_at": dp.get("created_at", time.time()), + "file_path": dp.get("file_path", "unknown_source"), }, } for dp in all_entities_data @@ -697,12 +695,10 @@ async def extract_entities( "keywords": dp["keywords"], "content": f"{dp['src_id']}\t{dp['tgt_id']}\n{dp['keywords']}\n{dp['description']}", "source_id": dp["source_id"], - "file_path": dp.get("metadata", {}).get("file_path", "unknown_source"), + "file_path": dp.get("file_path", "unknown_source"), "metadata": { - "created_at": dp.get("metadata", {}).get("created_at", time.time()), - "file_path": dp.get("metadata", {}).get( - "file_path", "unknown_source" - ), + "created_at": dp.get("created_at", time.time()), + "file_path": dp.get("file_path", "unknown_source"), }, } for dp in all_relationships_data