diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 75de70f2..4cd81a0d 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -1,8 +1,8 @@ import asyncio import json import os -import time import datetime +from datetime import timezone from dataclasses import dataclass, field from typing import Any, Union, final import numpy as np @@ -545,7 +545,9 @@ class PGVectorStorage(BaseVectorStorage): await ClientManager.release_client(self.db) self.db = None - def _upsert_chunks(self, item: dict[str, Any]) -> tuple[str, dict[str, Any]]: + def _upsert_chunks( + self, item: dict[str, Any], current_time: datetime.datetime + ) -> tuple[str, dict[str, Any]]: try: upsert_sql = SQL_TEMPLATES["upsert_chunk"] data: dict[str, Any] = { @@ -557,6 +559,8 @@ class PGVectorStorage(BaseVectorStorage): "content": item["content"], "content_vector": json.dumps(item["__vector__"].tolist()), "file_path": item["file_path"], + "create_time": current_time, + "update_time": current_time, } except Exception as e: logger.error(f"Error to prepare upsert,\nsql: {e}\nitem: {item}") @@ -564,7 +568,9 @@ class PGVectorStorage(BaseVectorStorage): return upsert_sql, data - def _upsert_entities(self, item: dict[str, Any]) -> tuple[str, dict[str, Any]]: + def _upsert_entities( + self, item: dict[str, Any], current_time: datetime.datetime + ) -> tuple[str, dict[str, Any]]: upsert_sql = SQL_TEMPLATES["upsert_entity"] source_id = item["source_id"] if isinstance(source_id, str) and "" in source_id: @@ -580,10 +586,14 @@ class PGVectorStorage(BaseVectorStorage): "content_vector": json.dumps(item["__vector__"].tolist()), "chunk_ids": chunk_ids, "file_path": item.get("file_path", None), + "create_time": current_time, + "update_time": current_time, } return upsert_sql, data - def _upsert_relationships(self, item: dict[str, Any]) -> tuple[str, dict[str, Any]]: + def _upsert_relationships( + self, item: dict[str, Any], current_time: datetime.datetime + ) -> tuple[str, dict[str, Any]]: upsert_sql = SQL_TEMPLATES["upsert_relationship"] source_id = item["source_id"] if isinstance(source_id, str) and "" in source_id: @@ -600,6 +610,8 @@ class PGVectorStorage(BaseVectorStorage): "content_vector": json.dumps(item["__vector__"].tolist()), "chunk_ids": chunk_ids, "file_path": item.get("file_path", None), + "create_time": current_time, + "update_time": current_time, } return upsert_sql, data @@ -608,11 +620,13 @@ class PGVectorStorage(BaseVectorStorage): if not data: return - current_time = time.time() + # Get current time with UTC timezone + current_time_with_tz = datetime.datetime.now(timezone.utc) + # Remove timezone info to avoid timezone mismatch issues + current_time = current_time_with_tz.replace(tzinfo=None) list_data = [ { "__id__": k, - "__created_at__": current_time, **{k1: v1 for k1, v1 in v.items()}, } for k, v in data.items() @@ -631,11 +645,11 @@ class PGVectorStorage(BaseVectorStorage): d["__vector__"] = embeddings[i] for item in list_data: if is_namespace(self.namespace, NameSpace.VECTOR_STORE_CHUNKS): - upsert_sql, data = self._upsert_chunks(item) + upsert_sql, data = self._upsert_chunks(item, current_time) elif is_namespace(self.namespace, NameSpace.VECTOR_STORE_ENTITIES): - upsert_sql, data = self._upsert_entities(item) + upsert_sql, data = self._upsert_entities(item, current_time) elif is_namespace(self.namespace, NameSpace.VECTOR_STORE_RELATIONSHIPS): - upsert_sql, data = self._upsert_relationships(item) + upsert_sql, data = self._upsert_relationships(item, current_time) else: raise ValueError(f"{self.namespace} is not supported") @@ -776,7 +790,7 @@ class PGVectorStorage(BaseVectorStorage): logger.error(f"Unknown namespace for ID lookup: {self.namespace}") return None - query = f"SELECT * FROM {table_name} WHERE workspace=$1 AND id=$2" + query = f"SELECT *, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM {table_name} WHERE workspace=$1 AND id=$2" params = {"workspace": self.db.workspace, "id": id} try: @@ -806,7 +820,7 @@ class PGVectorStorage(BaseVectorStorage): return [] ids_str = ",".join([f"'{id}'" for id in ids]) - query = f"SELECT * FROM {table_name} WHERE workspace=$1 AND id IN ({ids_str})" + query = f"SELECT *, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM {table_name} WHERE workspace=$1 AND id IN ({ids_str})" params = {"workspace": self.db.workspace} try: @@ -2222,8 +2236,8 @@ TABLES = { doc_name VARCHAR(1024), content TEXT, meta JSONB, - create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - update_time TIMESTAMP, + create_time TIMESTAMP(0) + update_time TIMESTAMP(0) CONSTRAINT LIGHTRAG_DOC_FULL_PK PRIMARY KEY (workspace, id) )""" }, @@ -2237,8 +2251,8 @@ TABLES = { content TEXT, content_vector VECTOR, file_path VARCHAR(256), - create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - update_time TIMESTAMP, + create_time TIMESTAMP(0), + update_time TIMESTAMP(0), CONSTRAINT LIGHTRAG_DOC_CHUNKS_PK PRIMARY KEY (workspace, id) )""" }, @@ -2249,8 +2263,8 @@ TABLES = { entity_name VARCHAR(255), content TEXT, content_vector VECTOR, - create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - update_time TIMESTAMP, + create_time TIMESTAMP(0), + update_time TIMESTAMP(0), chunk_ids VARCHAR(255)[] NULL, file_path TEXT NULL, CONSTRAINT LIGHTRAG_VDB_ENTITY_PK PRIMARY KEY (workspace, id) @@ -2264,8 +2278,8 @@ TABLES = { target_id VARCHAR(256), content TEXT, content_vector VECTOR, - create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - update_time TIMESTAMP, + create_time TIMESTAMP(0), + update_time TIMESTAMP(0), chunk_ids VARCHAR(255)[] NULL, file_path TEXT NULL, CONSTRAINT LIGHTRAG_VDB_RELATION_PK PRIMARY KEY (workspace, id) @@ -2341,8 +2355,9 @@ SQL_TEMPLATES = { update_time = CURRENT_TIMESTAMP """, "upsert_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens, - chunk_order_index, full_doc_id, content, content_vector, file_path) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + chunk_order_index, full_doc_id, content, content_vector, file_path, + create_time, update_time) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (workspace,id) DO UPDATE SET tokens=EXCLUDED.tokens, chunk_order_index=EXCLUDED.chunk_order_index, @@ -2350,23 +2365,23 @@ SQL_TEMPLATES = { content = EXCLUDED.content, content_vector=EXCLUDED.content_vector, file_path=EXCLUDED.file_path, - update_time = CURRENT_TIMESTAMP + update_time = EXCLUDED.update_time """, # SQL for VectorStorage "upsert_entity": """INSERT INTO LIGHTRAG_VDB_ENTITY (workspace, id, entity_name, content, - content_vector, chunk_ids, file_path) - VALUES ($1, $2, $3, $4, $5, $6::varchar[], $7) + content_vector, chunk_ids, file_path, create_time, update_time) + VALUES ($1, $2, $3, $4, $5, $6::varchar[], $7, $8, $9) ON CONFLICT (workspace,id) DO UPDATE SET entity_name=EXCLUDED.entity_name, content=EXCLUDED.content, content_vector=EXCLUDED.content_vector, chunk_ids=EXCLUDED.chunk_ids, file_path=EXCLUDED.file_path, - update_time=CURRENT_TIMESTAMP + update_time=EXCLUDED.update_time """, "upsert_relationship": """INSERT INTO LIGHTRAG_VDB_RELATION (workspace, id, source_id, - target_id, content, content_vector, chunk_ids, file_path) - VALUES ($1, $2, $3, $4, $5, $6, $7::varchar[], $8) + target_id, content, content_vector, chunk_ids, file_path, create_time, update_time) + VALUES ($1, $2, $3, $4, $5, $6, $7::varchar[], $8, $9, $10) ON CONFLICT (workspace,id) DO UPDATE SET source_id=EXCLUDED.source_id, target_id=EXCLUDED.target_id, @@ -2374,7 +2389,7 @@ SQL_TEMPLATES = { content_vector=EXCLUDED.content_vector, chunk_ids=EXCLUDED.chunk_ids, file_path=EXCLUDED.file_path, - update_time = CURRENT_TIMESTAMP + update_time = EXCLUDED.update_time """, "relationships": """ WITH relevant_chunks AS ( @@ -2382,9 +2397,9 @@ SQL_TEMPLATES = { FROM LIGHTRAG_DOC_CHUNKS WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[]) ) - SELECT source_id as src_id, target_id as tgt_id + SELECT source_id as src_id, target_id as tgt_id, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM ( - SELECT r.id, r.source_id, r.target_id, 1 - (r.content_vector <=> '[{embedding_string}]'::vector) as distance + SELECT r.id, r.source_id, r.target_id, r.create_time, 1 - (r.content_vector <=> '[{embedding_string}]'::vector) as distance FROM LIGHTRAG_VDB_RELATION r JOIN relevant_chunks c ON c.chunk_id = ANY(r.chunk_ids) WHERE r.workspace=$1 @@ -2399,9 +2414,9 @@ SQL_TEMPLATES = { FROM LIGHTRAG_DOC_CHUNKS WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[]) ) - SELECT entity_name FROM + SELECT entity_name, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM ( - SELECT e.id, e.entity_name, 1 - (e.content_vector <=> '[{embedding_string}]'::vector) as distance + SELECT e.id, e.entity_name, e.create_time, 1 - (e.content_vector <=> '[{embedding_string}]'::vector) as distance FROM LIGHTRAG_VDB_ENTITY e JOIN relevant_chunks c ON c.chunk_id = ANY(e.chunk_ids) WHERE e.workspace=$1 @@ -2416,9 +2431,9 @@ SQL_TEMPLATES = { FROM LIGHTRAG_DOC_CHUNKS WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[]) ) - SELECT id, content, file_path FROM + SELECT id, content, file_path, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM ( - SELECT id, content, file_path, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance + SELECT id, content, file_path, create_time, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id IN (SELECT chunk_id FROM relevant_chunks)