Fix time handling bugs for PostgreSQL

This commit is contained in:
yangdx
2025-05-01 15:13:42 +08:00
parent 8c7e9aceb2
commit 1b049b4214

View File

@@ -1,8 +1,8 @@
import asyncio import asyncio
import json import json
import os import os
import time
import datetime import datetime
from datetime import timezone
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any, Union, final from typing import Any, Union, final
import numpy as np import numpy as np
@@ -545,7 +545,9 @@ class PGVectorStorage(BaseVectorStorage):
await ClientManager.release_client(self.db) await ClientManager.release_client(self.db)
self.db = None self.db = None
def _upsert_chunks(self, item: dict[str, Any]) -> tuple[str, dict[str, Any]]: def _upsert_chunks(
self, item: dict[str, Any], current_time: datetime.datetime
) -> tuple[str, dict[str, Any]]:
try: try:
upsert_sql = SQL_TEMPLATES["upsert_chunk"] upsert_sql = SQL_TEMPLATES["upsert_chunk"]
data: dict[str, Any] = { data: dict[str, Any] = {
@@ -557,6 +559,8 @@ class PGVectorStorage(BaseVectorStorage):
"content": item["content"], "content": item["content"],
"content_vector": json.dumps(item["__vector__"].tolist()), "content_vector": json.dumps(item["__vector__"].tolist()),
"file_path": item["file_path"], "file_path": item["file_path"],
"create_time": current_time,
"update_time": current_time,
} }
except Exception as e: except Exception as e:
logger.error(f"Error to prepare upsert,\nsql: {e}\nitem: {item}") logger.error(f"Error to prepare upsert,\nsql: {e}\nitem: {item}")
@@ -564,7 +568,9 @@ class PGVectorStorage(BaseVectorStorage):
return upsert_sql, data return upsert_sql, data
def _upsert_entities(self, item: dict[str, Any]) -> tuple[str, dict[str, Any]]: def _upsert_entities(
self, item: dict[str, Any], current_time: datetime.datetime
) -> tuple[str, dict[str, Any]]:
upsert_sql = SQL_TEMPLATES["upsert_entity"] upsert_sql = SQL_TEMPLATES["upsert_entity"]
source_id = item["source_id"] source_id = item["source_id"]
if isinstance(source_id, str) and "<SEP>" in source_id: if isinstance(source_id, str) and "<SEP>" in source_id:
@@ -580,10 +586,14 @@ class PGVectorStorage(BaseVectorStorage):
"content_vector": json.dumps(item["__vector__"].tolist()), "content_vector": json.dumps(item["__vector__"].tolist()),
"chunk_ids": chunk_ids, "chunk_ids": chunk_ids,
"file_path": item.get("file_path", None), "file_path": item.get("file_path", None),
"create_time": current_time,
"update_time": current_time,
} }
return upsert_sql, data return upsert_sql, data
def _upsert_relationships(self, item: dict[str, Any]) -> tuple[str, dict[str, Any]]: def _upsert_relationships(
self, item: dict[str, Any], current_time: datetime.datetime
) -> tuple[str, dict[str, Any]]:
upsert_sql = SQL_TEMPLATES["upsert_relationship"] upsert_sql = SQL_TEMPLATES["upsert_relationship"]
source_id = item["source_id"] source_id = item["source_id"]
if isinstance(source_id, str) and "<SEP>" in source_id: if isinstance(source_id, str) and "<SEP>" in source_id:
@@ -600,6 +610,8 @@ class PGVectorStorage(BaseVectorStorage):
"content_vector": json.dumps(item["__vector__"].tolist()), "content_vector": json.dumps(item["__vector__"].tolist()),
"chunk_ids": chunk_ids, "chunk_ids": chunk_ids,
"file_path": item.get("file_path", None), "file_path": item.get("file_path", None),
"create_time": current_time,
"update_time": current_time,
} }
return upsert_sql, data return upsert_sql, data
@@ -608,11 +620,13 @@ class PGVectorStorage(BaseVectorStorage):
if not data: if not data:
return return
current_time = time.time() # Get current time with UTC timezone
current_time_with_tz = datetime.datetime.now(timezone.utc)
# Remove timezone info to avoid timezone mismatch issues
current_time = current_time_with_tz.replace(tzinfo=None)
list_data = [ list_data = [
{ {
"__id__": k, "__id__": k,
"__created_at__": current_time,
**{k1: v1 for k1, v1 in v.items()}, **{k1: v1 for k1, v1 in v.items()},
} }
for k, v in data.items() for k, v in data.items()
@@ -631,11 +645,11 @@ class PGVectorStorage(BaseVectorStorage):
d["__vector__"] = embeddings[i] d["__vector__"] = embeddings[i]
for item in list_data: for item in list_data:
if is_namespace(self.namespace, NameSpace.VECTOR_STORE_CHUNKS): if is_namespace(self.namespace, NameSpace.VECTOR_STORE_CHUNKS):
upsert_sql, data = self._upsert_chunks(item) upsert_sql, data = self._upsert_chunks(item, current_time)
elif is_namespace(self.namespace, NameSpace.VECTOR_STORE_ENTITIES): elif is_namespace(self.namespace, NameSpace.VECTOR_STORE_ENTITIES):
upsert_sql, data = self._upsert_entities(item) upsert_sql, data = self._upsert_entities(item, current_time)
elif is_namespace(self.namespace, NameSpace.VECTOR_STORE_RELATIONSHIPS): elif is_namespace(self.namespace, NameSpace.VECTOR_STORE_RELATIONSHIPS):
upsert_sql, data = self._upsert_relationships(item) upsert_sql, data = self._upsert_relationships(item, current_time)
else: else:
raise ValueError(f"{self.namespace} is not supported") raise ValueError(f"{self.namespace} is not supported")
@@ -776,7 +790,7 @@ class PGVectorStorage(BaseVectorStorage):
logger.error(f"Unknown namespace for ID lookup: {self.namespace}") logger.error(f"Unknown namespace for ID lookup: {self.namespace}")
return None return None
query = f"SELECT * FROM {table_name} WHERE workspace=$1 AND id=$2" query = f"SELECT *, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM {table_name} WHERE workspace=$1 AND id=$2"
params = {"workspace": self.db.workspace, "id": id} params = {"workspace": self.db.workspace, "id": id}
try: try:
@@ -806,7 +820,7 @@ class PGVectorStorage(BaseVectorStorage):
return [] return []
ids_str = ",".join([f"'{id}'" for id in ids]) ids_str = ",".join([f"'{id}'" for id in ids])
query = f"SELECT * FROM {table_name} WHERE workspace=$1 AND id IN ({ids_str})" query = f"SELECT *, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM {table_name} WHERE workspace=$1 AND id IN ({ids_str})"
params = {"workspace": self.db.workspace} params = {"workspace": self.db.workspace}
try: try:
@@ -2222,8 +2236,8 @@ TABLES = {
doc_name VARCHAR(1024), doc_name VARCHAR(1024),
content TEXT, content TEXT,
meta JSONB, meta JSONB,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, create_time TIMESTAMP(0)
update_time TIMESTAMP, update_time TIMESTAMP(0)
CONSTRAINT LIGHTRAG_DOC_FULL_PK PRIMARY KEY (workspace, id) CONSTRAINT LIGHTRAG_DOC_FULL_PK PRIMARY KEY (workspace, id)
)""" )"""
}, },
@@ -2237,8 +2251,8 @@ TABLES = {
content TEXT, content TEXT,
content_vector VECTOR, content_vector VECTOR,
file_path VARCHAR(256), file_path VARCHAR(256),
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, create_time TIMESTAMP(0),
update_time TIMESTAMP, update_time TIMESTAMP(0),
CONSTRAINT LIGHTRAG_DOC_CHUNKS_PK PRIMARY KEY (workspace, id) CONSTRAINT LIGHTRAG_DOC_CHUNKS_PK PRIMARY KEY (workspace, id)
)""" )"""
}, },
@@ -2249,8 +2263,8 @@ TABLES = {
entity_name VARCHAR(255), entity_name VARCHAR(255),
content TEXT, content TEXT,
content_vector VECTOR, content_vector VECTOR,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, create_time TIMESTAMP(0),
update_time TIMESTAMP, update_time TIMESTAMP(0),
chunk_ids VARCHAR(255)[] NULL, chunk_ids VARCHAR(255)[] NULL,
file_path TEXT NULL, file_path TEXT NULL,
CONSTRAINT LIGHTRAG_VDB_ENTITY_PK PRIMARY KEY (workspace, id) CONSTRAINT LIGHTRAG_VDB_ENTITY_PK PRIMARY KEY (workspace, id)
@@ -2264,8 +2278,8 @@ TABLES = {
target_id VARCHAR(256), target_id VARCHAR(256),
content TEXT, content TEXT,
content_vector VECTOR, content_vector VECTOR,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, create_time TIMESTAMP(0),
update_time TIMESTAMP, update_time TIMESTAMP(0),
chunk_ids VARCHAR(255)[] NULL, chunk_ids VARCHAR(255)[] NULL,
file_path TEXT NULL, file_path TEXT NULL,
CONSTRAINT LIGHTRAG_VDB_RELATION_PK PRIMARY KEY (workspace, id) CONSTRAINT LIGHTRAG_VDB_RELATION_PK PRIMARY KEY (workspace, id)
@@ -2341,8 +2355,9 @@ SQL_TEMPLATES = {
update_time = CURRENT_TIMESTAMP update_time = CURRENT_TIMESTAMP
""", """,
"upsert_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens, "upsert_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens,
chunk_order_index, full_doc_id, content, content_vector, file_path) chunk_order_index, full_doc_id, content, content_vector, file_path,
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (workspace,id) DO UPDATE ON CONFLICT (workspace,id) DO UPDATE
SET tokens=EXCLUDED.tokens, SET tokens=EXCLUDED.tokens,
chunk_order_index=EXCLUDED.chunk_order_index, chunk_order_index=EXCLUDED.chunk_order_index,
@@ -2350,23 +2365,23 @@ SQL_TEMPLATES = {
content = EXCLUDED.content, content = EXCLUDED.content,
content_vector=EXCLUDED.content_vector, content_vector=EXCLUDED.content_vector,
file_path=EXCLUDED.file_path, file_path=EXCLUDED.file_path,
update_time = CURRENT_TIMESTAMP update_time = EXCLUDED.update_time
""", """,
# SQL for VectorStorage # SQL for VectorStorage
"upsert_entity": """INSERT INTO LIGHTRAG_VDB_ENTITY (workspace, id, entity_name, content, "upsert_entity": """INSERT INTO LIGHTRAG_VDB_ENTITY (workspace, id, entity_name, content,
content_vector, chunk_ids, file_path) content_vector, chunk_ids, file_path, create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6::varchar[], $7) VALUES ($1, $2, $3, $4, $5, $6::varchar[], $7, $8, $9)
ON CONFLICT (workspace,id) DO UPDATE ON CONFLICT (workspace,id) DO UPDATE
SET entity_name=EXCLUDED.entity_name, SET entity_name=EXCLUDED.entity_name,
content=EXCLUDED.content, content=EXCLUDED.content,
content_vector=EXCLUDED.content_vector, content_vector=EXCLUDED.content_vector,
chunk_ids=EXCLUDED.chunk_ids, chunk_ids=EXCLUDED.chunk_ids,
file_path=EXCLUDED.file_path, file_path=EXCLUDED.file_path,
update_time=CURRENT_TIMESTAMP update_time=EXCLUDED.update_time
""", """,
"upsert_relationship": """INSERT INTO LIGHTRAG_VDB_RELATION (workspace, id, source_id, "upsert_relationship": """INSERT INTO LIGHTRAG_VDB_RELATION (workspace, id, source_id,
target_id, content, content_vector, chunk_ids, file_path) target_id, content, content_vector, chunk_ids, file_path, create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6, $7::varchar[], $8) VALUES ($1, $2, $3, $4, $5, $6, $7::varchar[], $8, $9, $10)
ON CONFLICT (workspace,id) DO UPDATE ON CONFLICT (workspace,id) DO UPDATE
SET source_id=EXCLUDED.source_id, SET source_id=EXCLUDED.source_id,
target_id=EXCLUDED.target_id, target_id=EXCLUDED.target_id,
@@ -2374,7 +2389,7 @@ SQL_TEMPLATES = {
content_vector=EXCLUDED.content_vector, content_vector=EXCLUDED.content_vector,
chunk_ids=EXCLUDED.chunk_ids, chunk_ids=EXCLUDED.chunk_ids,
file_path=EXCLUDED.file_path, file_path=EXCLUDED.file_path,
update_time = CURRENT_TIMESTAMP update_time = EXCLUDED.update_time
""", """,
"relationships": """ "relationships": """
WITH relevant_chunks AS ( WITH relevant_chunks AS (
@@ -2382,9 +2397,9 @@ SQL_TEMPLATES = {
FROM LIGHTRAG_DOC_CHUNKS FROM LIGHTRAG_DOC_CHUNKS
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[]) WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
) )
SELECT source_id as src_id, target_id as tgt_id SELECT source_id as src_id, target_id as tgt_id, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at
FROM ( FROM (
SELECT r.id, r.source_id, r.target_id, 1 - (r.content_vector <=> '[{embedding_string}]'::vector) as distance SELECT r.id, r.source_id, r.target_id, r.create_time, 1 - (r.content_vector <=> '[{embedding_string}]'::vector) as distance
FROM LIGHTRAG_VDB_RELATION r FROM LIGHTRAG_VDB_RELATION r
JOIN relevant_chunks c ON c.chunk_id = ANY(r.chunk_ids) JOIN relevant_chunks c ON c.chunk_id = ANY(r.chunk_ids)
WHERE r.workspace=$1 WHERE r.workspace=$1
@@ -2399,9 +2414,9 @@ SQL_TEMPLATES = {
FROM LIGHTRAG_DOC_CHUNKS FROM LIGHTRAG_DOC_CHUNKS
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[]) WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
) )
SELECT entity_name FROM SELECT entity_name, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM
( (
SELECT e.id, e.entity_name, 1 - (e.content_vector <=> '[{embedding_string}]'::vector) as distance SELECT e.id, e.entity_name, e.create_time, 1 - (e.content_vector <=> '[{embedding_string}]'::vector) as distance
FROM LIGHTRAG_VDB_ENTITY e FROM LIGHTRAG_VDB_ENTITY e
JOIN relevant_chunks c ON c.chunk_id = ANY(e.chunk_ids) JOIN relevant_chunks c ON c.chunk_id = ANY(e.chunk_ids)
WHERE e.workspace=$1 WHERE e.workspace=$1
@@ -2416,9 +2431,9 @@ SQL_TEMPLATES = {
FROM LIGHTRAG_DOC_CHUNKS FROM LIGHTRAG_DOC_CHUNKS
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[]) WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
) )
SELECT id, content, file_path FROM SELECT id, content, file_path, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM
( (
SELECT id, content, file_path, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance SELECT id, content, file_path, create_time, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance
FROM LIGHTRAG_DOC_CHUNKS FROM LIGHTRAG_DOC_CHUNKS
WHERE workspace=$1 WHERE workspace=$1
AND id IN (SELECT chunk_id FROM relevant_chunks) AND id IN (SELECT chunk_id FROM relevant_chunks)