diff --git a/lightrag/kg/tidb_impl.py b/lightrag/kg/tidb_impl.py index 135ca9cc..d875f134 100644 --- a/lightrag/kg/tidb_impl.py +++ b/lightrag/kg/tidb_impl.py @@ -44,7 +44,66 @@ class TiDB: logger.error(f"TiDB database error: {e}") raise + async def _migrate_timestamp_columns(self): + """将表中的时间戳列迁移为带时区的类型,假设原始数据是UTC时间""" + # 需要迁移的表和列 + tables_to_migrate = { + "LIGHTRAG_GRAPH_NODES": ["createtime", "updatetime"], + "LIGHTRAG_GRAPH_EDGES": ["createtime", "updatetime"], + "LIGHTRAG_DOC_CHUNKS": ["createtime", "updatetime"], + } + + for table_name, columns in tables_to_migrate.items(): + for column_name in columns: + try: + # 检查列是否存在 + check_column_sql = f""" + SELECT COLUMN_NAME, DATA_TYPE, COLUMN_TYPE + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_NAME = '{table_name}' + AND COLUMN_NAME = '{column_name}' + """ + + column_info = await self.query(check_column_sql) + if not column_info: + logger.warning( + f"列 {table_name}.{column_name} 不存在,跳过迁移" + ) + continue + + # 检查列类型 + data_type = column_info.get("DATA_TYPE", "").lower() + column_type = column_info.get("COLUMN_TYPE", "").lower() + + # 如果已经是timestamp类型,检查是否包含时区信息 + if data_type == "timestamp" and "time zone" in column_type: + logger.info( + f"列 {table_name}.{column_name} 已经是带时区的timestamp类型,无需迁移" + ) + continue + + # 如果是datetime类型,需要迁移到timestamp + if data_type == "datetime" or ( + data_type == "timestamp" and "time zone" not in column_type + ): + logger.info( + f"正在迁移 {table_name}.{column_name} 到timestamp类型" + ) + migration_sql = f""" + ALTER TABLE {table_name} + MODIFY COLUMN {column_name} TIMESTAMP + """ + + await self.execute(migration_sql) + logger.info( + f"成功迁移 {table_name}.{column_name} 到timestamp类型" + ) + except Exception as e: + # 记录错误但不中断流程 + logger.warning(f"迁移 {table_name}.{column_name} 失败: {e}") + async def check_tables(self): + # 首先创建所有表格 for k, v in TABLES.items(): try: await self.query(f"SELECT 1 FROM {k}".format(k=k)) @@ -58,6 +117,13 @@ class TiDB: logger.error(f"Failed to create table {k} in TiDB database") logger.error(f"TiDB database error: {e}") + # 所有表格创建完成后,尝试迁移时间字段 + try: + await self._migrate_timestamp_columns() + except Exception as e: + logger.error(f"TiDB, Failed to migrate timestamp columns: {e}") + # 不抛出异常,允许初始化过程继续 + async def query( self, sql: str, params: dict = None, multirows: bool = False ) -> Union[dict, None]: @@ -244,6 +310,11 @@ class TiDBKVStorage(BaseKVStorage): for i, d in enumerate(list_data): d["__vector__"] = embeddings[i] + # Get current time as UNIX timestamp + import time + + current_time = int(time.time()) + merge_sql = SQL_TEMPLATES["upsert_chunk"] data = [] for item in list_data: @@ -256,6 +327,7 @@ class TiDBKVStorage(BaseKVStorage): "full_doc_id": item["full_doc_id"], "content_vector": f"{item['__vector__'].tolist()}", "workspace": self.db.workspace, + "timestamp": current_time, } ) await self.db.execute(merge_sql, data) @@ -406,7 +478,6 @@ class TiDBVectorDBStorage(BaseVectorStorage): results = await self.db.query( SQL_TEMPLATES[self.namespace], params=params, multirows=True ) - print("vector search result:", results) if not results: return [] return results @@ -416,22 +487,18 @@ class TiDBVectorDBStorage(BaseVectorStorage): logger.info(f"Inserting {len(data)} to {self.namespace}") if not data: return - if is_namespace(self.namespace, NameSpace.VECTOR_STORE_CHUNKS): - return logger.info(f"Inserting {len(data)} vectors to {self.namespace}") - # Get current time with UTC timezone - import datetime - from datetime import timezone - current_time_with_tz = datetime.datetime.now(timezone.utc) - # Remove timezone info to avoid timezone mismatch issues - current_time = current_time_with_tz.replace(tzinfo=None) + # Get current time as UNIX timestamp + import time + + current_time = int(time.time()) list_data = [ { "id": k, - "created_at": current_time, + "timestamp": current_time, **{k1: v1 for k1, v1 in v.items()}, } for k, v in data.items() @@ -448,8 +515,20 @@ class TiDBVectorDBStorage(BaseVectorStorage): for i, d in enumerate(list_data): d["content_vector"] = embeddings[i] - if is_namespace(self.namespace, NameSpace.VECTOR_STORE_ENTITIES): - data = [] + if is_namespace(self.namespace, NameSpace.VECTOR_STORE_CHUNKS): + for item in list_data: + param = { + "id": item["id"], + "content": item["content"], + "tokens": item.get("tokens", 0), + "chunk_order_index": item.get("chunk_order_index", 0), + "full_doc_id": item.get("full_doc_id", ""), + "content_vector": f"{item['content_vector'].tolist()}", + "workspace": self.db.workspace, + "timestamp": item["timestamp"], + } + await self.db.execute(SQL_TEMPLATES["upsert_chunk"], param) + elif is_namespace(self.namespace, NameSpace.VECTOR_STORE_ENTITIES): for item in list_data: param = { "id": item["id"], @@ -457,20 +536,10 @@ class TiDBVectorDBStorage(BaseVectorStorage): "content": item["content"], "content_vector": f"{item['content_vector'].tolist()}", "workspace": self.db.workspace, + "timestamp": item["timestamp"], } - # update entity_id if node inserted by graph_storage_instance before - has = await self.db.query(SQL_TEMPLATES["has_entity"], param) - if has["cnt"] != 0: - await self.db.execute(SQL_TEMPLATES["update_entity"], param) - continue - - data.append(param) - if data: - merge_sql = SQL_TEMPLATES["insert_entity"] - await self.db.execute(merge_sql, data) - + await self.db.execute(SQL_TEMPLATES["upsert_entity"], param) elif is_namespace(self.namespace, NameSpace.VECTOR_STORE_RELATIONSHIPS): - data = [] for item in list_data: param = { "id": item["id"], @@ -479,17 +548,9 @@ class TiDBVectorDBStorage(BaseVectorStorage): "content": item["content"], "content_vector": f"{item['content_vector'].tolist()}", "workspace": self.db.workspace, + "timestamp": item["timestamp"], } - # update relation_id if node inserted by graph_storage_instance before - has = await self.db.query(SQL_TEMPLATES["has_relationship"], param) - if has["cnt"] != 0: - await self.db.execute(SQL_TEMPLATES["update_relationship"], param) - continue - - data.append(param) - if data: - merge_sql = SQL_TEMPLATES["insert_relationship"] - await self.db.execute(merge_sql, data) + await self.db.execute(SQL_TEMPLATES["upsert_relationship"], param) async def get_by_status(self, status: str) -> Union[list[dict[str, Any]], None]: SQL = SQL_TEMPLATES["get_by_status_" + self.namespace] @@ -630,7 +691,7 @@ class TiDBVectorDBStorage(BaseVectorStorage): # Determine which table to query based on namespace if self.namespace == NameSpace.VECTOR_STORE_ENTITIES: sql_template = """ - SELECT entity_id as id, name as entity_name, entity_type, description, content, + SELECT entity_id as id, name as entity_name, entity_type, description, content, UNIX_TIMESTAMP(createtime) as created_at FROM LIGHTRAG_GRAPH_NODES WHERE entity_id = :entity_id AND workspace = :workspace @@ -1085,8 +1146,8 @@ TABLES = { `tokens` INT, `content` LONGTEXT, `content_vector` VECTOR, - `createtime` DATETIME DEFAULT CURRENT_TIMESTAMP, - `updatetime` DATETIME DEFAULT NULL, + `createtime` TIMESTAMP, + `updatetime` TIMESTAMP, UNIQUE KEY (`chunk_id`) ); """ @@ -1103,8 +1164,8 @@ TABLES = { `source_chunk_id` VARCHAR(256), `content` LONGTEXT, `content_vector` VECTOR, - `createtime` DATETIME DEFAULT CURRENT_TIMESTAMP, - `updatetime` DATETIME DEFAULT NULL, + `createtime` TIMESTAMP, + `updatetime` TIMESTAMP, KEY (`entity_id`) ); """ @@ -1123,8 +1184,8 @@ TABLES = { `source_chunk_id` varchar(256), `content` LONGTEXT, `content_vector` VECTOR, - `createtime` DATETIME DEFAULT CURRENT_TIMESTAMP, - `updatetime` DATETIME DEFAULT NULL, + `createtime` TIMESTAMP, + `updatetime` TIMESTAMP, KEY (`relation_id`) ); """ @@ -1158,11 +1219,11 @@ SQL_TEMPLATES = { ON DUPLICATE KEY UPDATE content = VALUES(content), workspace = VALUES(workspace), updatetime = CURRENT_TIMESTAMP """, "upsert_chunk": """ - INSERT INTO LIGHTRAG_DOC_CHUNKS(chunk_id, content, tokens, chunk_order_index, full_doc_id, content_vector, workspace) - VALUES (:id, :content, :tokens, :chunk_order_index, :full_doc_id, :content_vector, :workspace) + INSERT INTO LIGHTRAG_DOC_CHUNKS(chunk_id, content, tokens, chunk_order_index, full_doc_id, content_vector, workspace, createtime, updatetime) + VALUES (:id, :content, :tokens, :chunk_order_index, :full_doc_id, :content_vector, :workspace, FROM_UNIXTIME(:timestamp), FROM_UNIXTIME(:timestamp)) ON DUPLICATE KEY UPDATE content = VALUES(content), tokens = VALUES(tokens), chunk_order_index = VALUES(chunk_order_index), - full_doc_id = VALUES(full_doc_id), content_vector = VALUES(content_vector), workspace = VALUES(workspace), updatetime = CURRENT_TIMESTAMP + full_doc_id = VALUES(full_doc_id), content_vector = VALUES(content_vector), workspace = VALUES(workspace), updatetime = FROM_UNIXTIME(:timestamp) """, # SQL for VectorStorage "entities": """SELECT n.name as entity_name, UNIX_TIMESTAMP(n.createtime) as created_at FROM @@ -1186,23 +1247,21 @@ SQL_TEMPLATES = { "has_relationship": """ SELECT COUNT(id) AS cnt FROM LIGHTRAG_GRAPH_EDGES WHERE source_name = :source_name AND target_name = :target_name AND workspace = :workspace """, - "update_entity": """ - UPDATE LIGHTRAG_GRAPH_NODES SET - entity_id = :id, content = :content, content_vector = :content_vector, updatetime = CURRENT_TIMESTAMP - WHERE workspace = :workspace AND name = :name + "upsert_entity": """ + INSERT INTO LIGHTRAG_GRAPH_NODES(entity_id, name, content, content_vector, workspace, createtime, updatetime) + VALUES(:id, :name, :content, :content_vector, :workspace, FROM_UNIXTIME(:timestamp), FROM_UNIXTIME(:timestamp)) + ON DUPLICATE KEY UPDATE + content = VALUES(content), + content_vector = VALUES(content_vector), + updatetime = FROM_UNIXTIME(:timestamp) """, - "update_relationship": """ - UPDATE LIGHTRAG_GRAPH_EDGES SET - relation_id = :id, content = :content, content_vector = :content_vector, updatetime = CURRENT_TIMESTAMP - WHERE workspace = :workspace AND source_name = :source_name AND target_name = :target_name - """, - "insert_entity": """ - INSERT INTO LIGHTRAG_GRAPH_NODES(entity_id, name, content, content_vector, workspace) - VALUES(:id, :name, :content, :content_vector, :workspace) - """, - "insert_relationship": """ - INSERT INTO LIGHTRAG_GRAPH_EDGES(relation_id, source_name, target_name, content, content_vector, workspace) - VALUES(:id, :source_name, :target_name, :content, :content_vector, :workspace) + "upsert_relationship": """ + INSERT INTO LIGHTRAG_GRAPH_EDGES(relation_id, source_name, target_name, content, content_vector, workspace, createtime, updatetime) + VALUES(:id, :source_name, :target_name, :content, :content_vector, :workspace, FROM_UNIXTIME(:timestamp), FROM_UNIXTIME(:timestamp)) + ON DUPLICATE KEY UPDATE + content = VALUES(content), + content_vector = VALUES(content_vector), + updatetime = FROM_UNIXTIME(:timestamp) """, # SQL for GraphStorage "get_node": """ @@ -1276,7 +1335,7 @@ SQL_TEMPLATES = { """, # Search by prefix SQL templates "search_entity_by_prefix": """ - SELECT entity_id as id, name as entity_name, entity_type, description, content, + SELECT entity_id as id, name as entity_name, entity_type, description, content, UNIX_TIMESTAMP(createtime) as created_at FROM LIGHTRAG_GRAPH_NODES WHERE entity_id LIKE :prefix_pattern AND workspace = :workspace