Fix created_at problem for Tidb vector storage

This commit is contained in:
yangdx
2025-05-03 00:46:10 +08:00
parent 81071df982
commit debe8f329a

View File

@@ -44,7 +44,66 @@ class TiDB:
logger.error(f"TiDB database error: {e}")
raise
async def _migrate_timestamp_columns(self):
"""将表中的时间戳列迁移为带时区的类型假设原始数据是UTC时间"""
# 需要迁移的表和列
tables_to_migrate = {
"LIGHTRAG_GRAPH_NODES": ["createtime", "updatetime"],
"LIGHTRAG_GRAPH_EDGES": ["createtime", "updatetime"],
"LIGHTRAG_DOC_CHUNKS": ["createtime", "updatetime"],
}
for table_name, columns in tables_to_migrate.items():
for column_name in columns:
try:
# 检查列是否存在
check_column_sql = f"""
SELECT COLUMN_NAME, DATA_TYPE, COLUMN_TYPE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = '{table_name}'
AND COLUMN_NAME = '{column_name}'
"""
column_info = await self.query(check_column_sql)
if not column_info:
logger.warning(
f"{table_name}.{column_name} 不存在,跳过迁移"
)
continue
# 检查列类型
data_type = column_info.get("DATA_TYPE", "").lower()
column_type = column_info.get("COLUMN_TYPE", "").lower()
# 如果已经是timestamp类型检查是否包含时区信息
if data_type == "timestamp" and "time zone" in column_type:
logger.info(
f"{table_name}.{column_name} 已经是带时区的timestamp类型无需迁移"
)
continue
# 如果是datetime类型需要迁移到timestamp
if data_type == "datetime" or (
data_type == "timestamp" and "time zone" not in column_type
):
logger.info(
f"正在迁移 {table_name}.{column_name} 到timestamp类型"
)
migration_sql = f"""
ALTER TABLE {table_name}
MODIFY COLUMN {column_name} TIMESTAMP
"""
await self.execute(migration_sql)
logger.info(
f"成功迁移 {table_name}.{column_name} 到timestamp类型"
)
except Exception as e:
# 记录错误但不中断流程
logger.warning(f"迁移 {table_name}.{column_name} 失败: {e}")
async def check_tables(self):
# 首先创建所有表格
for k, v in TABLES.items():
try:
await self.query(f"SELECT 1 FROM {k}".format(k=k))
@@ -58,6 +117,13 @@ class TiDB:
logger.error(f"Failed to create table {k} in TiDB database")
logger.error(f"TiDB database error: {e}")
# 所有表格创建完成后,尝试迁移时间字段
try:
await self._migrate_timestamp_columns()
except Exception as e:
logger.error(f"TiDB, Failed to migrate timestamp columns: {e}")
# 不抛出异常,允许初始化过程继续
async def query(
self, sql: str, params: dict = None, multirows: bool = False
) -> Union[dict, None]:
@@ -244,6 +310,11 @@ class TiDBKVStorage(BaseKVStorage):
for i, d in enumerate(list_data):
d["__vector__"] = embeddings[i]
# Get current time as UNIX timestamp
import time
current_time = int(time.time())
merge_sql = SQL_TEMPLATES["upsert_chunk"]
data = []
for item in list_data:
@@ -256,6 +327,7 @@ class TiDBKVStorage(BaseKVStorage):
"full_doc_id": item["full_doc_id"],
"content_vector": f"{item['__vector__'].tolist()}",
"workspace": self.db.workspace,
"timestamp": current_time,
}
)
await self.db.execute(merge_sql, data)
@@ -406,7 +478,6 @@ class TiDBVectorDBStorage(BaseVectorStorage):
results = await self.db.query(
SQL_TEMPLATES[self.namespace], params=params, multirows=True
)
print("vector search result:", results)
if not results:
return []
return results
@@ -416,22 +487,18 @@ class TiDBVectorDBStorage(BaseVectorStorage):
logger.info(f"Inserting {len(data)} to {self.namespace}")
if not data:
return
if is_namespace(self.namespace, NameSpace.VECTOR_STORE_CHUNKS):
return
logger.info(f"Inserting {len(data)} vectors to {self.namespace}")
# Get current time with UTC timezone
import datetime
from datetime import timezone
current_time_with_tz = datetime.datetime.now(timezone.utc)
# Remove timezone info to avoid timezone mismatch issues
current_time = current_time_with_tz.replace(tzinfo=None)
# Get current time as UNIX timestamp
import time
current_time = int(time.time())
list_data = [
{
"id": k,
"created_at": current_time,
"timestamp": current_time,
**{k1: v1 for k1, v1 in v.items()},
}
for k, v in data.items()
@@ -448,8 +515,20 @@ class TiDBVectorDBStorage(BaseVectorStorage):
for i, d in enumerate(list_data):
d["content_vector"] = embeddings[i]
if is_namespace(self.namespace, NameSpace.VECTOR_STORE_ENTITIES):
data = []
if is_namespace(self.namespace, NameSpace.VECTOR_STORE_CHUNKS):
for item in list_data:
param = {
"id": item["id"],
"content": item["content"],
"tokens": item.get("tokens", 0),
"chunk_order_index": item.get("chunk_order_index", 0),
"full_doc_id": item.get("full_doc_id", ""),
"content_vector": f"{item['content_vector'].tolist()}",
"workspace": self.db.workspace,
"timestamp": item["timestamp"],
}
await self.db.execute(SQL_TEMPLATES["upsert_chunk"], param)
elif is_namespace(self.namespace, NameSpace.VECTOR_STORE_ENTITIES):
for item in list_data:
param = {
"id": item["id"],
@@ -457,20 +536,10 @@ class TiDBVectorDBStorage(BaseVectorStorage):
"content": item["content"],
"content_vector": f"{item['content_vector'].tolist()}",
"workspace": self.db.workspace,
"timestamp": item["timestamp"],
}
# update entity_id if node inserted by graph_storage_instance before
has = await self.db.query(SQL_TEMPLATES["has_entity"], param)
if has["cnt"] != 0:
await self.db.execute(SQL_TEMPLATES["update_entity"], param)
continue
data.append(param)
if data:
merge_sql = SQL_TEMPLATES["insert_entity"]
await self.db.execute(merge_sql, data)
await self.db.execute(SQL_TEMPLATES["upsert_entity"], param)
elif is_namespace(self.namespace, NameSpace.VECTOR_STORE_RELATIONSHIPS):
data = []
for item in list_data:
param = {
"id": item["id"],
@@ -479,17 +548,9 @@ class TiDBVectorDBStorage(BaseVectorStorage):
"content": item["content"],
"content_vector": f"{item['content_vector'].tolist()}",
"workspace": self.db.workspace,
"timestamp": item["timestamp"],
}
# update relation_id if node inserted by graph_storage_instance before
has = await self.db.query(SQL_TEMPLATES["has_relationship"], param)
if has["cnt"] != 0:
await self.db.execute(SQL_TEMPLATES["update_relationship"], param)
continue
data.append(param)
if data:
merge_sql = SQL_TEMPLATES["insert_relationship"]
await self.db.execute(merge_sql, data)
await self.db.execute(SQL_TEMPLATES["upsert_relationship"], param)
async def get_by_status(self, status: str) -> Union[list[dict[str, Any]], None]:
SQL = SQL_TEMPLATES["get_by_status_" + self.namespace]
@@ -1085,8 +1146,8 @@ TABLES = {
`tokens` INT,
`content` LONGTEXT,
`content_vector` VECTOR,
`createtime` DATETIME DEFAULT CURRENT_TIMESTAMP,
`updatetime` DATETIME DEFAULT NULL,
`createtime` TIMESTAMP,
`updatetime` TIMESTAMP,
UNIQUE KEY (`chunk_id`)
);
"""
@@ -1103,8 +1164,8 @@ TABLES = {
`source_chunk_id` VARCHAR(256),
`content` LONGTEXT,
`content_vector` VECTOR,
`createtime` DATETIME DEFAULT CURRENT_TIMESTAMP,
`updatetime` DATETIME DEFAULT NULL,
`createtime` TIMESTAMP,
`updatetime` TIMESTAMP,
KEY (`entity_id`)
);
"""
@@ -1123,8 +1184,8 @@ TABLES = {
`source_chunk_id` varchar(256),
`content` LONGTEXT,
`content_vector` VECTOR,
`createtime` DATETIME DEFAULT CURRENT_TIMESTAMP,
`updatetime` DATETIME DEFAULT NULL,
`createtime` TIMESTAMP,
`updatetime` TIMESTAMP,
KEY (`relation_id`)
);
"""
@@ -1158,11 +1219,11 @@ SQL_TEMPLATES = {
ON DUPLICATE KEY UPDATE content = VALUES(content), workspace = VALUES(workspace), updatetime = CURRENT_TIMESTAMP
""",
"upsert_chunk": """
INSERT INTO LIGHTRAG_DOC_CHUNKS(chunk_id, content, tokens, chunk_order_index, full_doc_id, content_vector, workspace)
VALUES (:id, :content, :tokens, :chunk_order_index, :full_doc_id, :content_vector, :workspace)
INSERT INTO LIGHTRAG_DOC_CHUNKS(chunk_id, content, tokens, chunk_order_index, full_doc_id, content_vector, workspace, createtime, updatetime)
VALUES (:id, :content, :tokens, :chunk_order_index, :full_doc_id, :content_vector, :workspace, FROM_UNIXTIME(:timestamp), FROM_UNIXTIME(:timestamp))
ON DUPLICATE KEY UPDATE
content = VALUES(content), tokens = VALUES(tokens), chunk_order_index = VALUES(chunk_order_index),
full_doc_id = VALUES(full_doc_id), content_vector = VALUES(content_vector), workspace = VALUES(workspace), updatetime = CURRENT_TIMESTAMP
full_doc_id = VALUES(full_doc_id), content_vector = VALUES(content_vector), workspace = VALUES(workspace), updatetime = FROM_UNIXTIME(:timestamp)
""",
# SQL for VectorStorage
"entities": """SELECT n.name as entity_name, UNIX_TIMESTAMP(n.createtime) as created_at FROM
@@ -1186,23 +1247,21 @@ SQL_TEMPLATES = {
"has_relationship": """
SELECT COUNT(id) AS cnt FROM LIGHTRAG_GRAPH_EDGES WHERE source_name = :source_name AND target_name = :target_name AND workspace = :workspace
""",
"update_entity": """
UPDATE LIGHTRAG_GRAPH_NODES SET
entity_id = :id, content = :content, content_vector = :content_vector, updatetime = CURRENT_TIMESTAMP
WHERE workspace = :workspace AND name = :name
"upsert_entity": """
INSERT INTO LIGHTRAG_GRAPH_NODES(entity_id, name, content, content_vector, workspace, createtime, updatetime)
VALUES(:id, :name, :content, :content_vector, :workspace, FROM_UNIXTIME(:timestamp), FROM_UNIXTIME(:timestamp))
ON DUPLICATE KEY UPDATE
content = VALUES(content),
content_vector = VALUES(content_vector),
updatetime = FROM_UNIXTIME(:timestamp)
""",
"update_relationship": """
UPDATE LIGHTRAG_GRAPH_EDGES SET
relation_id = :id, content = :content, content_vector = :content_vector, updatetime = CURRENT_TIMESTAMP
WHERE workspace = :workspace AND source_name = :source_name AND target_name = :target_name
""",
"insert_entity": """
INSERT INTO LIGHTRAG_GRAPH_NODES(entity_id, name, content, content_vector, workspace)
VALUES(:id, :name, :content, :content_vector, :workspace)
""",
"insert_relationship": """
INSERT INTO LIGHTRAG_GRAPH_EDGES(relation_id, source_name, target_name, content, content_vector, workspace)
VALUES(:id, :source_name, :target_name, :content, :content_vector, :workspace)
"upsert_relationship": """
INSERT INTO LIGHTRAG_GRAPH_EDGES(relation_id, source_name, target_name, content, content_vector, workspace, createtime, updatetime)
VALUES(:id, :source_name, :target_name, :content, :content_vector, :workspace, FROM_UNIXTIME(:timestamp), FROM_UNIXTIME(:timestamp))
ON DUPLICATE KEY UPDATE
content = VALUES(content),
content_vector = VALUES(content_vector),
updatetime = FROM_UNIXTIME(:timestamp)
""",
# SQL for GraphStorage
"get_node": """