Merge pull request #1510 from danielaskdd/fix-time-zone

Fix time zone and created_at problem
This commit is contained in:
Daniel.y
2025-05-03 22:36:38 +08:00
committed by GitHub
24 changed files with 405 additions and 1433 deletions

View File

@@ -9,7 +9,7 @@ import aiofiles
import shutil
import traceback
import pipmaster as pm
from datetime import datetime
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Any, Literal
from fastapi import APIRouter, BackgroundTasks, Depends, File, HTTPException, UploadFile
@@ -20,6 +20,32 @@ from lightrag.base import DocProcessingStatus, DocStatus
from lightrag.api.utils_api import get_combined_auth_dependency
from ..config import global_args
# Function to format datetime to ISO format string with timezone information
def format_datetime(dt: Any) -> Optional[str]:
"""Format datetime to ISO format string with timezone information
Args:
dt: Datetime object, string, or None
Returns:
ISO format string with timezone information, or None if input is None
"""
if dt is None:
return None
if isinstance(dt, str):
return dt
# Check if datetime object has timezone information
if isinstance(dt, datetime):
# If datetime object has no timezone info (naive datetime), add UTC timezone
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
# Return ISO format string with timezone information
return dt.isoformat()
router = APIRouter(
prefix="/documents",
tags=["documents"],
@@ -207,14 +233,6 @@ Attributes:
class DocStatusResponse(BaseModel):
@staticmethod
def format_datetime(dt: Any) -> Optional[str]:
if dt is None:
return None
if isinstance(dt, str):
return dt
return dt.isoformat()
id: str = Field(description="Document identifier")
content_summary: str = Field(description="Summary of document content")
content_length: int = Field(description="Length of document content in characters")
@@ -300,7 +318,7 @@ class PipelineStatusResponse(BaseModel):
autoscanned: Whether auto-scan has started
busy: Whether the pipeline is currently busy
job_name: Current job name (e.g., indexing files/indexing texts)
job_start: Job start time as ISO format string (optional)
job_start: Job start time as ISO format string with timezone (optional)
docs: Total number of documents to be indexed
batchs: Number of batches for processing documents
cur_batch: Current processing batch
@@ -322,6 +340,12 @@ class PipelineStatusResponse(BaseModel):
history_messages: Optional[List[str]] = None
update_status: Optional[dict] = None
@field_validator("job_start", mode="before")
@classmethod
def parse_job_start(cls, value):
"""Process datetime and return as ISO format string with timezone"""
return format_datetime(value)
class Config:
extra = "allow" # Allow additional fields from the pipeline status
@@ -1188,9 +1212,10 @@ def create_document_routes(
if "history_messages" in status_dict:
status_dict["history_messages"] = list(status_dict["history_messages"])
# Format the job_start time if it exists
if status_dict.get("job_start"):
status_dict["job_start"] = str(status_dict["job_start"])
# Ensure job_start is properly formatted as a string with timezone information
if "job_start" in status_dict and status_dict["job_start"]:
# Use format_datetime to ensure consistent formatting
status_dict["job_start"] = format_datetime(status_dict["job_start"])
return PipelineStatusResponse(**status_dict)
except Exception as e:
@@ -1240,12 +1265,8 @@ def create_document_routes(
content_summary=doc_status.content_summary,
content_length=doc_status.content_length,
status=doc_status.status,
created_at=DocStatusResponse.format_datetime(
doc_status.created_at
),
updated_at=DocStatusResponse.format_datetime(
doc_status.updated_at
),
created_at=format_datetime(doc_status.created_at),
updated_at=format_datetime(doc_status.updated_at),
chunks_count=doc_status.chunks_count,
error=doc_status.error,
metadata=doc_status.metadata,

View File

@@ -114,11 +114,18 @@ class ChromaVectorDBStorage(BaseVectorStorage):
return
try:
import time
current_time = int(time.time())
ids = list(data.keys())
documents = [v["content"] for v in data.values()]
metadatas = [
{k: v for k, v in item.items() if k in self.meta_fields}
or {"_default": "true"}
{
**{k: v for k, v in item.items() if k in self.meta_fields},
"created_at": current_time,
}
or {"_default": "true", "created_at": current_time}
for item in data.values()
]
@@ -183,6 +190,7 @@ class ChromaVectorDBStorage(BaseVectorStorage):
"id": results["ids"][0][i],
"distance": 1 - results["distances"][0][i],
"content": results["documents"][0][i],
"created_at": results["metadatas"][0][i].get("created_at"),
**results["metadatas"][0][i],
}
for i in range(len(results["ids"][0]))
@@ -235,42 +243,6 @@ class ChromaVectorDBStorage(BaseVectorStorage):
logger.error(f"Error while deleting vectors from {self.namespace}: {e}")
raise
async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]:
"""Search for records with IDs starting with a specific prefix.
Args:
prefix: The prefix to search for in record IDs
Returns:
List of records with matching ID prefixes
"""
try:
# Get all records from the collection
# Since ChromaDB doesn't directly support prefix search on IDs,
# we'll get all records and filter in Python
results = self._collection.get(
include=["metadatas", "documents", "embeddings"]
)
matching_records = []
# Filter records where ID starts with the prefix
for i, record_id in enumerate(results["ids"]):
if record_id.startswith(prefix):
matching_records.append(
{
"id": record_id,
"content": results["documents"][i],
"vector": results["embeddings"][i],
**results["metadatas"][i],
}
)
logger.debug(
f"Found {len(matching_records)} records with prefix '{prefix}'"
)
return matching_records
except Exception as e:
logger.error(f"Error during prefix search in ChromaDB: {str(e)}")
raise
@@ -298,6 +270,7 @@ class ChromaVectorDBStorage(BaseVectorStorage):
"id": result["ids"][0],
"vector": result["embeddings"][0],
"content": result["documents"][0],
"created_at": result["metadatas"][0].get("created_at"),
**result["metadatas"][0],
}
except Exception as e:
@@ -331,6 +304,7 @@ class ChromaVectorDBStorage(BaseVectorStorage):
"id": result["ids"][i],
"vector": result["embeddings"][i],
"content": result["documents"][i],
"created_at": result["metadatas"][i].get("created_at"),
**result["metadatas"][i],
}
for i in range(len(result["ids"]))

View File

@@ -107,7 +107,7 @@ class FaissVectorDBStorage(BaseVectorStorage):
if not data:
return
current_time = time.time()
current_time = int(time.time())
# Prepare data for embedding
list_data = []
@@ -385,27 +385,6 @@ class FaissVectorDBStorage(BaseVectorStorage):
return True # Return success
async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]:
"""Search for records with IDs starting with a specific prefix.
Args:
prefix: The prefix to search for in record IDs
Returns:
List of records with matching ID prefixes
"""
matching_records = []
# Search for records with IDs starting with the prefix
for faiss_id, meta in self._id_to_meta.items():
if "__id__" in meta and meta["__id__"].startswith(prefix):
# Create a copy of all metadata and add "id" field
record = {**meta, "id": meta["__id__"]}
matching_records.append(record)
logger.debug(f"Found {len(matching_records)} records with prefix '{prefix}'")
return matching_records
async def get_by_id(self, id: str) -> dict[str, Any] | None:
"""Get vector data by its ID
@@ -425,7 +404,11 @@ class FaissVectorDBStorage(BaseVectorStorage):
if not metadata:
return None
return {**metadata, "id": metadata.get("__id__")}
return {
**metadata,
"id": metadata.get("__id__"),
"created_at": metadata.get("__created_at__"),
}
async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
"""Get multiple vector data by their IDs
@@ -445,7 +428,13 @@ class FaissVectorDBStorage(BaseVectorStorage):
if fid is not None:
metadata = self._id_to_meta.get(fid, {})
if metadata:
results.append({**metadata, "id": metadata.get("__id__")})
results.append(
{
**metadata,
"id": metadata.get("__id__"),
"created_at": metadata.get("__created_at__"),
}
)
return results

View File

@@ -79,9 +79,14 @@ class MilvusVectorDBStorage(BaseVectorStorage):
if not data:
return
import time
current_time = int(time.time())
list_data: list[dict[str, Any]] = [
{
"id": k,
"created_at": current_time,
**{k1: v1 for k1, v1 in v.items() if k1 in self.meta_fields},
}
for k, v in data.items()
@@ -111,7 +116,7 @@ class MilvusVectorDBStorage(BaseVectorStorage):
collection_name=self.namespace,
data=embedding,
limit=top_k,
output_fields=list(self.meta_fields),
output_fields=list(self.meta_fields) + ["created_at"],
search_params={
"metric_type": "COSINE",
"params": {"radius": self.cosine_better_than_threshold},
@@ -119,7 +124,13 @@ class MilvusVectorDBStorage(BaseVectorStorage):
)
print(results)
return [
{**dp["entity"], "id": dp["id"], "distance": dp["distance"]}
{
**dp["entity"],
"id": dp["id"],
"distance": dp["distance"],
# created_at is requested in output_fields, so it should be a top-level key in the result dict (dp)
"created_at": dp.get("created_at"),
}
for dp in results[0]
]
@@ -211,31 +222,6 @@ class MilvusVectorDBStorage(BaseVectorStorage):
except Exception as e:
logger.error(f"Error while deleting vectors from {self.namespace}: {e}")
async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]:
"""Search for records with IDs starting with a specific prefix.
Args:
prefix: The prefix to search for in record IDs
Returns:
List of records with matching ID prefixes
"""
try:
# Use Milvus query with expression to find IDs with the given prefix
expression = f'id like "{prefix}%"'
results = self._client.query(
collection_name=self.namespace,
filter=expression,
output_fields=list(self.meta_fields) + ["id"],
)
logger.debug(f"Found {len(results)} records with prefix '{prefix}'")
return results
except Exception as e:
logger.error(f"Error searching for records with prefix '{prefix}': {e}")
return []
async def get_by_id(self, id: str) -> dict[str, Any] | None:
"""Get vector data by its ID
@@ -250,12 +236,16 @@ class MilvusVectorDBStorage(BaseVectorStorage):
result = self._client.query(
collection_name=self.namespace,
filter=f'id == "{id}"',
output_fields=list(self.meta_fields) + ["id"],
output_fields=list(self.meta_fields) + ["id", "created_at"],
)
if not result or len(result) == 0:
return None
# Ensure the result contains created_at field
if "created_at" not in result[0]:
result[0]["created_at"] = None
return result[0]
except Exception as e:
logger.error(f"Error retrieving vector data for ID {id}: {e}")
@@ -282,9 +272,14 @@ class MilvusVectorDBStorage(BaseVectorStorage):
result = self._client.query(
collection_name=self.namespace,
filter=filter_expr,
output_fields=list(self.meta_fields) + ["id"],
output_fields=list(self.meta_fields) + ["id", "created_at"],
)
# Ensure each result contains created_at field
for item in result:
if "created_at" not in item:
item["created_at"] = None
return result or []
except Exception as e:
logger.error(f"Error retrieving vector data for IDs {ids}: {e}")

View File

@@ -999,9 +999,15 @@ class MongoVectorDBStorage(BaseVectorStorage):
if not data:
return
# Add current time as Unix timestamp
import time
current_time = int(time.time())
list_data = [
{
"_id": k,
"created_at": current_time, # Add created_at field as Unix timestamp
**{k1: v1 for k1, v1 in v.items() if k1 in self.meta_fields},
}
for k, v in data.items()
@@ -1059,9 +1065,14 @@ class MongoVectorDBStorage(BaseVectorStorage):
cursor = self._data.aggregate(pipeline)
results = await cursor.to_list()
# Format and return the results
# Format and return the results with created_at field
return [
{**doc, "id": doc["_id"], "distance": doc.get("score", None)}
{
**doc,
"id": doc["_id"],
"distance": doc.get("score", None),
"created_at": doc.get("created_at"), # Include created_at field
}
for doc in results
]
@@ -1138,28 +1149,6 @@ class MongoVectorDBStorage(BaseVectorStorage):
except PyMongoError as e:
logger.error(f"Error deleting relations for {entity_name}: {str(e)}")
async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]:
"""Search for records with IDs starting with a specific prefix.
Args:
prefix: The prefix to search for in record IDs
Returns:
List of records with matching ID prefixes
"""
try:
# Use MongoDB regex to find documents where _id starts with the prefix
cursor = self._data.find({"_id": {"$regex": f"^{prefix}"}})
matching_records = await cursor.to_list(length=None)
# Format results
results = [{**doc, "id": doc["_id"]} for doc in matching_records]
logger.debug(
f"Found {len(results)} records with prefix '{prefix}' in {self.namespace}"
)
return results
except PyMongoError as e:
logger.error(f"Error searching by prefix in {self.namespace}: {str(e)}")
return []

View File

@@ -89,7 +89,7 @@ class NanoVectorDBStorage(BaseVectorStorage):
if not data:
return
current_time = time.time()
current_time = int(time.time())
list_data = [
{
"__id__": k,
@@ -259,26 +259,6 @@ class NanoVectorDBStorage(BaseVectorStorage):
return True # Return success
async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]:
"""Search for records with IDs starting with a specific prefix.
Args:
prefix: The prefix to search for in record IDs
Returns:
List of records with matching ID prefixes
"""
storage = await self.client_storage
matching_records = []
# Search for records with IDs starting with the prefix
for record in storage["data"]:
if "__id__" in record and record["__id__"].startswith(prefix):
matching_records.append({**record, "id": record["__id__"]})
logger.debug(f"Found {len(matching_records)} records with prefix '{prefix}'")
return matching_records
async def get_by_id(self, id: str) -> dict[str, Any] | None:
"""Get vector data by its ID
@@ -291,7 +271,12 @@ class NanoVectorDBStorage(BaseVectorStorage):
client = await self._get_client()
result = client.get([id])
if result:
return result[0]
dp = result[0]
return {
**dp,
"id": dp.get("__id__"),
"created_at": dp.get("__created_at__"),
}
return None
async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
@@ -307,7 +292,15 @@ class NanoVectorDBStorage(BaseVectorStorage):
return []
client = await self._get_client()
return client.get(ids)
results = client.get(ids)
return [
{
**dp,
"id": dp.get("__id__"),
"created_at": dp.get("__created_at__"),
}
for dp in results
]
async def drop(self) -> dict[str, str]:
"""Drop all vector data from storage and clean up resources

View File

@@ -1,7 +1,8 @@
import asyncio
import json
import os
import time
import datetime
from datetime import timezone
from dataclasses import dataclass, field
from typing import Any, Union, final
import numpy as np
@@ -105,7 +106,61 @@ class PostgreSQLDB:
):
pass
async def _migrate_timestamp_columns(self):
"""Migrate timestamp columns in tables to timezone-aware types, assuming original data is in UTC time"""
# Tables and columns that need migration
tables_to_migrate = {
"LIGHTRAG_VDB_ENTITY": ["create_time", "update_time"],
"LIGHTRAG_VDB_RELATION": ["create_time", "update_time"],
"LIGHTRAG_DOC_CHUNKS": ["create_time", "update_time"],
}
for table_name, columns in tables_to_migrate.items():
for column_name in columns:
try:
# Check if column exists
check_column_sql = f"""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = '{table_name.lower()}'
AND column_name = '{column_name}'
"""
column_info = await self.query(check_column_sql)
if not column_info:
logger.warning(
f"Column {table_name}.{column_name} does not exist, skipping migration"
)
continue
# Check column type
data_type = column_info.get("data_type")
if data_type == "timestamp with time zone":
logger.info(
f"Column {table_name}.{column_name} is already timezone-aware, no migration needed"
)
continue
# Execute migration, explicitly specifying UTC timezone for interpreting original data
logger.info(
f"Migrating {table_name}.{column_name} to timezone-aware type"
)
migration_sql = f"""
ALTER TABLE {table_name}
ALTER COLUMN {column_name} TYPE TIMESTAMP(0) WITH TIME ZONE
USING {column_name} AT TIME ZONE 'UTC'
"""
await self.execute(migration_sql)
logger.info(
f"Successfully migrated {table_name}.{column_name} to timezone-aware type"
)
except Exception as e:
# Log error but don't interrupt the process
logger.warning(f"Failed to migrate {table_name}.{column_name}: {e}")
async def check_tables(self):
# First create all tables
for k, v in TABLES.items():
try:
await self.query(f"SELECT 1 FROM {k} LIMIT 1")
@@ -141,6 +196,13 @@ class PostgreSQLDB:
f"PostgreSQL, Failed to create index on table {k}, Got: {e}"
)
# After all tables are created, attempt to migrate timestamp fields
try:
await self._migrate_timestamp_columns()
except Exception as e:
logger.error(f"PostgreSQL, Failed to migrate timestamp columns: {e}")
# Don't throw an exception, allow the initialization process to continue
async def query(
self,
sql: str,
@@ -544,7 +606,9 @@ class PGVectorStorage(BaseVectorStorage):
await ClientManager.release_client(self.db)
self.db = None
def _upsert_chunks(self, item: dict[str, Any]) -> tuple[str, dict[str, Any]]:
def _upsert_chunks(
self, item: dict[str, Any], current_time: datetime.datetime
) -> tuple[str, dict[str, Any]]:
try:
upsert_sql = SQL_TEMPLATES["upsert_chunk"]
data: dict[str, Any] = {
@@ -556,6 +620,8 @@ class PGVectorStorage(BaseVectorStorage):
"content": item["content"],
"content_vector": json.dumps(item["__vector__"].tolist()),
"file_path": item["file_path"],
"create_time": current_time,
"update_time": current_time,
}
except Exception as e:
logger.error(f"Error to prepare upsert,\nsql: {e}\nitem: {item}")
@@ -563,7 +629,9 @@ class PGVectorStorage(BaseVectorStorage):
return upsert_sql, data
def _upsert_entities(self, item: dict[str, Any]) -> tuple[str, dict[str, Any]]:
def _upsert_entities(
self, item: dict[str, Any], current_time: datetime.datetime
) -> tuple[str, dict[str, Any]]:
upsert_sql = SQL_TEMPLATES["upsert_entity"]
source_id = item["source_id"]
if isinstance(source_id, str) and "<SEP>" in source_id:
@@ -579,10 +647,14 @@ class PGVectorStorage(BaseVectorStorage):
"content_vector": json.dumps(item["__vector__"].tolist()),
"chunk_ids": chunk_ids,
"file_path": item.get("file_path", None),
"create_time": current_time,
"update_time": current_time,
}
return upsert_sql, data
def _upsert_relationships(self, item: dict[str, Any]) -> tuple[str, dict[str, Any]]:
def _upsert_relationships(
self, item: dict[str, Any], current_time: datetime.datetime
) -> tuple[str, dict[str, Any]]:
upsert_sql = SQL_TEMPLATES["upsert_relationship"]
source_id = item["source_id"]
if isinstance(source_id, str) and "<SEP>" in source_id:
@@ -599,6 +671,8 @@ class PGVectorStorage(BaseVectorStorage):
"content_vector": json.dumps(item["__vector__"].tolist()),
"chunk_ids": chunk_ids,
"file_path": item.get("file_path", None),
"create_time": current_time,
"update_time": current_time,
}
return upsert_sql, data
@@ -607,11 +681,11 @@ class PGVectorStorage(BaseVectorStorage):
if not data:
return
current_time = time.time()
# Get current time with UTC timezone
current_time = datetime.datetime.now(timezone.utc)
list_data = [
{
"__id__": k,
"__created_at__": current_time,
**{k1: v1 for k1, v1 in v.items()},
}
for k, v in data.items()
@@ -630,11 +704,11 @@ class PGVectorStorage(BaseVectorStorage):
d["__vector__"] = embeddings[i]
for item in list_data:
if is_namespace(self.namespace, NameSpace.VECTOR_STORE_CHUNKS):
upsert_sql, data = self._upsert_chunks(item)
upsert_sql, data = self._upsert_chunks(item, current_time)
elif is_namespace(self.namespace, NameSpace.VECTOR_STORE_ENTITIES):
upsert_sql, data = self._upsert_entities(item)
upsert_sql, data = self._upsert_entities(item, current_time)
elif is_namespace(self.namespace, NameSpace.VECTOR_STORE_RELATIONSHIPS):
upsert_sql, data = self._upsert_relationships(item)
upsert_sql, data = self._upsert_relationships(item, current_time)
else:
raise ValueError(f"{self.namespace} is not supported")
@@ -726,41 +800,6 @@ class PGVectorStorage(BaseVectorStorage):
except Exception as e:
logger.error(f"Error deleting relations for entity {entity_name}: {e}")
async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]:
"""Search for records with IDs starting with a specific prefix.
Args:
prefix: The prefix to search for in record IDs
Returns:
List of records with matching ID prefixes
"""
table_name = namespace_to_table_name(self.namespace)
if not table_name:
logger.error(f"Unknown namespace for prefix search: {self.namespace}")
return []
search_sql = f"SELECT * FROM {table_name} WHERE workspace=$1 AND id LIKE $2"
params = {"workspace": self.db.workspace, "prefix": f"{prefix}%"}
try:
results = await self.db.query(search_sql, params, multirows=True)
logger.debug(f"Found {len(results)} records with prefix '{prefix}'")
# Format results to match the expected return format
formatted_results = []
for record in results:
formatted_record = dict(record)
# Ensure id field is available (for consistency with NanoVectorDB implementation)
if "id" not in formatted_record:
formatted_record["id"] = record["id"]
formatted_results.append(formatted_record)
return formatted_results
except Exception as e:
logger.error(f"Error during prefix search for '{prefix}': {e}")
return []
async def get_by_id(self, id: str) -> dict[str, Any] | None:
"""Get vector data by its ID
@@ -775,7 +814,7 @@ class PGVectorStorage(BaseVectorStorage):
logger.error(f"Unknown namespace for ID lookup: {self.namespace}")
return None
query = f"SELECT * FROM {table_name} WHERE workspace=$1 AND id=$2"
query = f"SELECT *, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM {table_name} WHERE workspace=$1 AND id=$2"
params = {"workspace": self.db.workspace, "id": id}
try:
@@ -805,7 +844,7 @@ class PGVectorStorage(BaseVectorStorage):
return []
ids_str = ",".join([f"'{id}'" for id in ids])
query = f"SELECT * FROM {table_name} WHERE workspace=$1 AND id IN ({ids_str})"
query = f"SELECT *, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM {table_name} WHERE workspace=$1 AND id IN ({ids_str})"
params = {"workspace": self.db.workspace}
try:
@@ -992,8 +1031,28 @@ class PGDocStatusStorage(DocStatusStorage):
if not data:
return
sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content,content_summary,content_length,chunks_count,status,file_path)
values($1,$2,$3,$4,$5,$6,$7,$8)
def parse_datetime(dt_str):
if dt_str is None:
return None
if isinstance(dt_str, (datetime.date, datetime.datetime)):
# If it's a datetime object without timezone info, remove timezone info
if isinstance(dt_str, datetime.datetime):
# Remove timezone info, return naive datetime object
return dt_str.replace(tzinfo=None)
return dt_str
try:
# Process ISO format string with timezone
dt = datetime.datetime.fromisoformat(dt_str)
# Remove timezone info, return naive datetime object
return dt.replace(tzinfo=None)
except (ValueError, TypeError):
logger.warning(f"Unable to parse datetime string: {dt_str}")
return None
# Modified SQL to include created_at and updated_at in both INSERT and UPDATE operations
# Both fields are updated from the input data in both INSERT and UPDATE cases
sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content,content_summary,content_length,chunks_count,status,file_path,created_at,updated_at)
values($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)
on conflict(id,workspace) do update set
content = EXCLUDED.content,
content_summary = EXCLUDED.content_summary,
@@ -1001,8 +1060,13 @@ class PGDocStatusStorage(DocStatusStorage):
chunks_count = EXCLUDED.chunks_count,
status = EXCLUDED.status,
file_path = EXCLUDED.file_path,
updated_at = CURRENT_TIMESTAMP"""
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at"""
for k, v in data.items():
# Remove timezone information, store utc time in db
created_at = parse_datetime(v.get("created_at"))
updated_at = parse_datetime(v.get("updated_at"))
# chunks_count is optional
await self.db.execute(
sql,
@@ -1015,6 +1079,8 @@ class PGDocStatusStorage(DocStatusStorage):
"chunks_count": v["chunks_count"] if "chunks_count" in v else -1,
"status": v["status"],
"file_path": v["file_path"],
"created_at": created_at, # Use the converted datetime object
"updated_at": updated_at, # Use the converted datetime object
},
)
@@ -2194,8 +2260,8 @@ TABLES = {
doc_name VARCHAR(1024),
content TEXT,
meta JSONB,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP,
create_time TIMESTAMP(0)
update_time TIMESTAMP(0)
CONSTRAINT LIGHTRAG_DOC_FULL_PK PRIMARY KEY (workspace, id)
)"""
},
@@ -2209,8 +2275,8 @@ TABLES = {
content TEXT,
content_vector VECTOR,
file_path VARCHAR(256),
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP,
create_time TIMESTAMP(0) WITH TIME ZONE,
update_time TIMESTAMP(0) WITH TIME ZONE,
CONSTRAINT LIGHTRAG_DOC_CHUNKS_PK PRIMARY KEY (workspace, id)
)"""
},
@@ -2221,8 +2287,8 @@ TABLES = {
entity_name VARCHAR(255),
content TEXT,
content_vector VECTOR,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP,
create_time TIMESTAMP(0) WITH TIME ZONE,
update_time TIMESTAMP(0) WITH TIME ZONE,
chunk_ids VARCHAR(255)[] NULL,
file_path TEXT NULL,
CONSTRAINT LIGHTRAG_VDB_ENTITY_PK PRIMARY KEY (workspace, id)
@@ -2236,8 +2302,8 @@ TABLES = {
target_id VARCHAR(256),
content TEXT,
content_vector VECTOR,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP,
create_time TIMESTAMP(0) WITH TIME ZONE,
update_time TIMESTAMP(0) WITH TIME ZONE,
chunk_ids VARCHAR(255)[] NULL,
file_path TEXT NULL,
CONSTRAINT LIGHTRAG_VDB_RELATION_PK PRIMARY KEY (workspace, id)
@@ -2265,8 +2331,8 @@ TABLES = {
chunks_count int4 NULL,
status varchar(64) NULL,
file_path TEXT NULL,
created_at timestamp DEFAULT CURRENT_TIMESTAMP NULL,
updated_at timestamp DEFAULT CURRENT_TIMESTAMP NULL,
created_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NULL,
updated_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NULL,
CONSTRAINT LIGHTRAG_DOC_STATUS_PK PRIMARY KEY (workspace, id)
)"""
},
@@ -2313,8 +2379,9 @@ SQL_TEMPLATES = {
update_time = CURRENT_TIMESTAMP
""",
"upsert_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens,
chunk_order_index, full_doc_id, content, content_vector, file_path)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
chunk_order_index, full_doc_id, content, content_vector, file_path,
create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (workspace,id) DO UPDATE
SET tokens=EXCLUDED.tokens,
chunk_order_index=EXCLUDED.chunk_order_index,
@@ -2322,23 +2389,23 @@ SQL_TEMPLATES = {
content = EXCLUDED.content,
content_vector=EXCLUDED.content_vector,
file_path=EXCLUDED.file_path,
update_time = CURRENT_TIMESTAMP
update_time = EXCLUDED.update_time
""",
# SQL for VectorStorage
"upsert_entity": """INSERT INTO LIGHTRAG_VDB_ENTITY (workspace, id, entity_name, content,
content_vector, chunk_ids, file_path)
VALUES ($1, $2, $3, $4, $5, $6::varchar[], $7)
content_vector, chunk_ids, file_path, create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6::varchar[], $7, $8, $9)
ON CONFLICT (workspace,id) DO UPDATE
SET entity_name=EXCLUDED.entity_name,
content=EXCLUDED.content,
content_vector=EXCLUDED.content_vector,
chunk_ids=EXCLUDED.chunk_ids,
file_path=EXCLUDED.file_path,
update_time=CURRENT_TIMESTAMP
update_time=EXCLUDED.update_time
""",
"upsert_relationship": """INSERT INTO LIGHTRAG_VDB_RELATION (workspace, id, source_id,
target_id, content, content_vector, chunk_ids, file_path)
VALUES ($1, $2, $3, $4, $5, $6, $7::varchar[], $8)
target_id, content, content_vector, chunk_ids, file_path, create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6, $7::varchar[], $8, $9, $10)
ON CONFLICT (workspace,id) DO UPDATE
SET source_id=EXCLUDED.source_id,
target_id=EXCLUDED.target_id,
@@ -2346,7 +2413,7 @@ SQL_TEMPLATES = {
content_vector=EXCLUDED.content_vector,
chunk_ids=EXCLUDED.chunk_ids,
file_path=EXCLUDED.file_path,
update_time = CURRENT_TIMESTAMP
update_time = EXCLUDED.update_time
""",
"relationships": """
WITH relevant_chunks AS (
@@ -2354,9 +2421,9 @@ SQL_TEMPLATES = {
FROM LIGHTRAG_DOC_CHUNKS
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
)
SELECT source_id as src_id, target_id as tgt_id
SELECT source_id as src_id, target_id as tgt_id, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at
FROM (
SELECT r.id, r.source_id, r.target_id, 1 - (r.content_vector <=> '[{embedding_string}]'::vector) as distance
SELECT r.id, r.source_id, r.target_id, r.create_time, 1 - (r.content_vector <=> '[{embedding_string}]'::vector) as distance
FROM LIGHTRAG_VDB_RELATION r
JOIN relevant_chunks c ON c.chunk_id = ANY(r.chunk_ids)
WHERE r.workspace=$1
@@ -2371,9 +2438,9 @@ SQL_TEMPLATES = {
FROM LIGHTRAG_DOC_CHUNKS
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
)
SELECT entity_name FROM
SELECT entity_name, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM
(
SELECT e.id, e.entity_name, 1 - (e.content_vector <=> '[{embedding_string}]'::vector) as distance
SELECT e.id, e.entity_name, e.create_time, 1 - (e.content_vector <=> '[{embedding_string}]'::vector) as distance
FROM LIGHTRAG_VDB_ENTITY e
JOIN relevant_chunks c ON c.chunk_id = ANY(e.chunk_ids)
WHERE e.workspace=$1
@@ -2388,9 +2455,9 @@ SQL_TEMPLATES = {
FROM LIGHTRAG_DOC_CHUNKS
WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[])
)
SELECT id, content, file_path FROM
SELECT id, content, file_path, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM
(
SELECT id, content, file_path, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance
SELECT id, content, file_path, create_time, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance
FROM LIGHTRAG_DOC_CHUNKS
WHERE workspace=$1
AND id IN (SELECT chunk_id FROM relevant_chunks)

View File

@@ -88,9 +88,15 @@ class QdrantVectorDBStorage(BaseVectorStorage):
logger.info(f"Inserting {len(data)} to {self.namespace}")
if not data:
return
import time
current_time = int(time.time())
list_data = [
{
"id": k,
"created_at": current_time,
**{k1: v1 for k1, v1 in v.items() if k1 in self.meta_fields},
}
for k, v in data.items()
@@ -137,7 +143,14 @@ class QdrantVectorDBStorage(BaseVectorStorage):
logger.debug(f"query result: {results}")
return [{**dp.payload, "distance": dp.score} for dp in results]
return [
{
**dp.payload,
"distance": dp.score,
"created_at": dp.payload.get("created_at"),
}
for dp in results
]
async def index_done_callback(self) -> None:
# Qdrant handles persistence automatically
@@ -236,46 +249,6 @@ class QdrantVectorDBStorage(BaseVectorStorage):
except Exception as e:
logger.error(f"Error deleting relations for {entity_name}: {e}")
async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]:
"""Search for records with IDs starting with a specific prefix.
Args:
prefix: The prefix to search for in record IDs
Returns:
List of records with matching ID prefixes
"""
try:
# Use scroll method to find records with IDs starting with the prefix
results = self._client.scroll(
collection_name=self.namespace,
scroll_filter=models.Filter(
must=[
models.FieldCondition(
key="id", match=models.MatchText(text=prefix, prefix=True)
)
]
),
with_payload=True,
with_vectors=False,
limit=1000, # Adjust as needed for your use case
)
# Extract matching points
matching_records = results[0]
# Format the results to match expected return format
formatted_results = [{**point.payload} for point in matching_records]
logger.debug(
f"Found {len(formatted_results)} records with prefix '{prefix}'"
)
return formatted_results
except Exception as e:
logger.error(f"Error searching for prefix '{prefix}': {e}")
return []
async def get_by_id(self, id: str) -> dict[str, Any] | None:
"""Get vector data by its ID
@@ -299,7 +272,12 @@ class QdrantVectorDBStorage(BaseVectorStorage):
if not result:
return None
return result[0].payload
# Ensure the result contains created_at field
payload = result[0].payload
if "created_at" not in payload:
payload["created_at"] = None
return payload
except Exception as e:
logger.error(f"Error retrieving vector data for ID {id}: {e}")
return None
@@ -327,7 +305,15 @@ class QdrantVectorDBStorage(BaseVectorStorage):
with_payload=True,
)
return [point.payload for point in results]
# Ensure each result contains created_at field
payloads = []
for point in results:
payload = point.payload
if "created_at" not in payload:
payload["created_at"] = None
payloads.append(payload)
return payloads
except Exception as e:
logger.error(f"Error retrieving vector data for IDs {ids}: {e}")
return []

View File

@@ -2,7 +2,7 @@ import asyncio
import os
from dataclasses import dataclass, field
from typing import Any, Union, final
import time
import numpy as np
from lightrag.types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge
@@ -44,7 +44,13 @@ class TiDB:
logger.error(f"TiDB database error: {e}")
raise
async def _migrate_timestamp_columns(self):
"""Migrate timestamp columns in tables to timezone-aware types, assuming original data is in UTC"""
# Not implemented yet
pass
async def check_tables(self):
# First create all tables
for k, v in TABLES.items():
try:
await self.query(f"SELECT 1 FROM {k}".format(k=k))
@@ -58,6 +64,13 @@ class TiDB:
logger.error(f"Failed to create table {k} in TiDB database")
logger.error(f"TiDB database error: {e}")
# After all tables are created, try to migrate timestamp fields
try:
await self._migrate_timestamp_columns()
except Exception as e:
logger.error(f"TiDB, Failed to migrate timestamp columns: {e}")
# Don't raise exceptions, allow initialization process to continue
async def query(
self, sql: str, params: dict = None, multirows: bool = False
) -> Union[dict, None]:
@@ -244,6 +257,9 @@ class TiDBKVStorage(BaseKVStorage):
for i, d in enumerate(list_data):
d["__vector__"] = embeddings[i]
# Get current time as UNIX timestamp
current_time = int(time.time())
merge_sql = SQL_TEMPLATES["upsert_chunk"]
data = []
for item in list_data:
@@ -256,6 +272,7 @@ class TiDBKVStorage(BaseKVStorage):
"full_doc_id": item["full_doc_id"],
"content_vector": f"{item['__vector__'].tolist()}",
"workspace": self.db.workspace,
"timestamp": current_time,
}
)
await self.db.execute(merge_sql, data)
@@ -325,7 +342,7 @@ class TiDBKVStorage(BaseKVStorage):
if table_name != "LIGHTRAG_LLM_CACHE":
return False
# 构建MySQL风格的IN查询
# Build MySQL style IN query
modes_list = ", ".join([f"'{mode}'" for mode in modes])
sql = f"""
DELETE FROM {table_name}
@@ -406,7 +423,6 @@ class TiDBVectorDBStorage(BaseVectorStorage):
results = await self.db.query(
SQL_TEMPLATES[self.namespace], params=params, multirows=True
)
print("vector search result:", results)
if not results:
return []
return results
@@ -416,14 +432,18 @@ class TiDBVectorDBStorage(BaseVectorStorage):
logger.info(f"Inserting {len(data)} to {self.namespace}")
if not data:
return
if is_namespace(self.namespace, NameSpace.VECTOR_STORE_CHUNKS):
return
logger.info(f"Inserting {len(data)} vectors to {self.namespace}")
# Get current time as UNIX timestamp
import time
current_time = int(time.time())
list_data = [
{
"id": k,
"timestamp": current_time,
**{k1: v1 for k1, v1 in v.items()},
}
for k, v in data.items()
@@ -440,8 +460,20 @@ class TiDBVectorDBStorage(BaseVectorStorage):
for i, d in enumerate(list_data):
d["content_vector"] = embeddings[i]
if is_namespace(self.namespace, NameSpace.VECTOR_STORE_ENTITIES):
data = []
if is_namespace(self.namespace, NameSpace.VECTOR_STORE_CHUNKS):
for item in list_data:
param = {
"id": item["id"],
"content": item["content"],
"tokens": item.get("tokens", 0),
"chunk_order_index": item.get("chunk_order_index", 0),
"full_doc_id": item.get("full_doc_id", ""),
"content_vector": f"{item['content_vector'].tolist()}",
"workspace": self.db.workspace,
"timestamp": item["timestamp"],
}
await self.db.execute(SQL_TEMPLATES["upsert_chunk"], param)
elif is_namespace(self.namespace, NameSpace.VECTOR_STORE_ENTITIES):
for item in list_data:
param = {
"id": item["id"],
@@ -449,20 +481,10 @@ class TiDBVectorDBStorage(BaseVectorStorage):
"content": item["content"],
"content_vector": f"{item['content_vector'].tolist()}",
"workspace": self.db.workspace,
"timestamp": item["timestamp"],
}
# update entity_id if node inserted by graph_storage_instance before
has = await self.db.query(SQL_TEMPLATES["has_entity"], param)
if has["cnt"] != 0:
await self.db.execute(SQL_TEMPLATES["update_entity"], param)
continue
data.append(param)
if data:
merge_sql = SQL_TEMPLATES["insert_entity"]
await self.db.execute(merge_sql, data)
await self.db.execute(SQL_TEMPLATES["upsert_entity"], param)
elif is_namespace(self.namespace, NameSpace.VECTOR_STORE_RELATIONSHIPS):
data = []
for item in list_data:
param = {
"id": item["id"],
@@ -471,17 +493,9 @@ class TiDBVectorDBStorage(BaseVectorStorage):
"content": item["content"],
"content_vector": f"{item['content_vector'].tolist()}",
"workspace": self.db.workspace,
"timestamp": item["timestamp"],
}
# update relation_id if node inserted by graph_storage_instance before
has = await self.db.query(SQL_TEMPLATES["has_relationship"], param)
if has["cnt"] != 0:
await self.db.execute(SQL_TEMPLATES["update_relationship"], param)
continue
data.append(param)
if data:
merge_sql = SQL_TEMPLATES["insert_relationship"]
await self.db.execute(merge_sql, data)
await self.db.execute(SQL_TEMPLATES["upsert_relationship"], param)
async def get_by_status(self, status: str) -> Union[list[dict[str, Any]], None]:
SQL = SQL_TEMPLATES["get_by_status_" + self.namespace]
@@ -573,55 +587,6 @@ class TiDBVectorDBStorage(BaseVectorStorage):
except Exception as e:
return {"status": "error", "message": str(e)}
async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]:
"""Search for records with IDs starting with a specific prefix.
Args:
prefix: The prefix to search for in record IDs
Returns:
List of records with matching ID prefixes
"""
# Determine which table to query based on namespace
if self.namespace == NameSpace.VECTOR_STORE_ENTITIES:
sql_template = """
SELECT entity_id as id, name as entity_name, entity_type, description, content
FROM LIGHTRAG_GRAPH_NODES
WHERE entity_id LIKE :prefix_pattern AND workspace = :workspace
"""
elif self.namespace == NameSpace.VECTOR_STORE_RELATIONSHIPS:
sql_template = """
SELECT relation_id as id, source_name as src_id, target_name as tgt_id,
keywords, description, content
FROM LIGHTRAG_GRAPH_EDGES
WHERE relation_id LIKE :prefix_pattern AND workspace = :workspace
"""
elif self.namespace == NameSpace.VECTOR_STORE_CHUNKS:
sql_template = """
SELECT chunk_id as id, content, tokens, chunk_order_index, full_doc_id
FROM LIGHTRAG_DOC_CHUNKS
WHERE chunk_id LIKE :prefix_pattern AND workspace = :workspace
"""
else:
logger.warning(
f"Namespace {self.namespace} not supported for prefix search"
)
return []
# Add prefix pattern parameter with % for SQL LIKE
prefix_pattern = f"{prefix}%"
params = {"prefix_pattern": prefix_pattern, "workspace": self.db.workspace}
try:
results = await self.db.query(sql_template, params=params, multirows=True)
logger.debug(
f"Found {len(results) if results else 0} records with prefix '{prefix}'"
)
return results if results else []
except Exception as e:
logger.error(f"Error searching records with prefix '{prefix}': {e}")
return []
async def get_by_id(self, id: str) -> dict[str, Any] | None:
"""Get vector data by its ID
@@ -635,7 +600,8 @@ class TiDBVectorDBStorage(BaseVectorStorage):
# Determine which table to query based on namespace
if self.namespace == NameSpace.VECTOR_STORE_ENTITIES:
sql_template = """
SELECT entity_id as id, name as entity_name, entity_type, description, content
SELECT entity_id as id, name as entity_name, entity_type, description, content,
UNIX_TIMESTAMP(createtime) as created_at
FROM LIGHTRAG_GRAPH_NODES
WHERE entity_id = :entity_id AND workspace = :workspace
"""
@@ -643,14 +609,15 @@ class TiDBVectorDBStorage(BaseVectorStorage):
elif self.namespace == NameSpace.VECTOR_STORE_RELATIONSHIPS:
sql_template = """
SELECT relation_id as id, source_name as src_id, target_name as tgt_id,
keywords, description, content
keywords, description, content, UNIX_TIMESTAMP(createtime) as created_at
FROM LIGHTRAG_GRAPH_EDGES
WHERE relation_id = :relation_id AND workspace = :workspace
"""
params = {"relation_id": id, "workspace": self.db.workspace}
elif self.namespace == NameSpace.VECTOR_STORE_CHUNKS:
sql_template = """
SELECT chunk_id as id, content, tokens, chunk_order_index, full_doc_id
SELECT chunk_id as id, content, tokens, chunk_order_index, full_doc_id,
UNIX_TIMESTAMP(createtime) as created_at
FROM LIGHTRAG_DOC_CHUNKS
WHERE chunk_id = :chunk_id AND workspace = :workspace
"""
@@ -686,20 +653,22 @@ class TiDBVectorDBStorage(BaseVectorStorage):
# Determine which table to query based on namespace
if self.namespace == NameSpace.VECTOR_STORE_ENTITIES:
sql_template = f"""
SELECT entity_id as id, name as entity_name, entity_type, description, content
SELECT entity_id as id, name as entity_name, entity_type, description, content,
UNIX_TIMESTAMP(createtime) as created_at
FROM LIGHTRAG_GRAPH_NODES
WHERE entity_id IN ({ids_str}) AND workspace = :workspace
"""
elif self.namespace == NameSpace.VECTOR_STORE_RELATIONSHIPS:
sql_template = f"""
SELECT relation_id as id, source_name as src_id, target_name as tgt_id,
keywords, description, content
keywords, description, content, UNIX_TIMESTAMP(createtime) as created_at
FROM LIGHTRAG_GRAPH_EDGES
WHERE relation_id IN ({ids_str}) AND workspace = :workspace
"""
elif self.namespace == NameSpace.VECTOR_STORE_CHUNKS:
sql_template = f"""
SELECT chunk_id as id, content, tokens, chunk_order_index, full_doc_id
SELECT chunk_id as id, content, tokens, chunk_order_index, full_doc_id,
UNIX_TIMESTAMP(createtime) as created_at
FROM LIGHTRAG_DOC_CHUNKS
WHERE chunk_id IN ({ids_str}) AND workspace = :workspace
"""
@@ -1086,8 +1055,8 @@ TABLES = {
`tokens` INT,
`content` LONGTEXT,
`content_vector` VECTOR,
`createtime` DATETIME DEFAULT CURRENT_TIMESTAMP,
`updatetime` DATETIME DEFAULT NULL,
`createtime` TIMESTAMP,
`updatetime` TIMESTAMP,
UNIQUE KEY (`chunk_id`)
);
"""
@@ -1104,8 +1073,8 @@ TABLES = {
`source_chunk_id` VARCHAR(256),
`content` LONGTEXT,
`content_vector` VECTOR,
`createtime` DATETIME DEFAULT CURRENT_TIMESTAMP,
`updatetime` DATETIME DEFAULT NULL,
`createtime` TIMESTAMP,
`updatetime` TIMESTAMP,
KEY (`entity_id`)
);
"""
@@ -1124,8 +1093,8 @@ TABLES = {
`source_chunk_id` varchar(256),
`content` LONGTEXT,
`content_vector` VECTOR,
`createtime` DATETIME DEFAULT CURRENT_TIMESTAMP,
`updatetime` DATETIME DEFAULT NULL,
`createtime` TIMESTAMP,
`updatetime` TIMESTAMP,
KEY (`relation_id`)
);
"""
@@ -1159,25 +1128,25 @@ SQL_TEMPLATES = {
ON DUPLICATE KEY UPDATE content = VALUES(content), workspace = VALUES(workspace), updatetime = CURRENT_TIMESTAMP
""",
"upsert_chunk": """
INSERT INTO LIGHTRAG_DOC_CHUNKS(chunk_id, content, tokens, chunk_order_index, full_doc_id, content_vector, workspace)
VALUES (:id, :content, :tokens, :chunk_order_index, :full_doc_id, :content_vector, :workspace)
INSERT INTO LIGHTRAG_DOC_CHUNKS(chunk_id, content, tokens, chunk_order_index, full_doc_id, content_vector, workspace, createtime, updatetime)
VALUES (:id, :content, :tokens, :chunk_order_index, :full_doc_id, :content_vector, :workspace, FROM_UNIXTIME(:timestamp), FROM_UNIXTIME(:timestamp))
ON DUPLICATE KEY UPDATE
content = VALUES(content), tokens = VALUES(tokens), chunk_order_index = VALUES(chunk_order_index),
full_doc_id = VALUES(full_doc_id), content_vector = VALUES(content_vector), workspace = VALUES(workspace), updatetime = CURRENT_TIMESTAMP
full_doc_id = VALUES(full_doc_id), content_vector = VALUES(content_vector), workspace = VALUES(workspace), updatetime = FROM_UNIXTIME(:timestamp)
""",
# SQL for VectorStorage
"entities": """SELECT n.name as entity_name FROM
(SELECT entity_id as id, name, VEC_COSINE_DISTANCE(content_vector,:embedding_string) as distance
"entities": """SELECT n.name as entity_name, UNIX_TIMESTAMP(n.createtime) as created_at FROM
(SELECT entity_id as id, name, createtime, VEC_COSINE_DISTANCE(content_vector,:embedding_string) as distance
FROM LIGHTRAG_GRAPH_NODES WHERE workspace = :workspace) n
WHERE n.distance>:better_than_threshold ORDER BY n.distance DESC LIMIT :top_k
""",
"relationships": """SELECT e.source_name as src_id, e.target_name as tgt_id FROM
(SELECT source_name, target_name, VEC_COSINE_DISTANCE(content_vector, :embedding_string) as distance
"relationships": """SELECT e.source_name as src_id, e.target_name as tgt_id, UNIX_TIMESTAMP(e.createtime) as created_at FROM
(SELECT source_name, target_name, createtime, VEC_COSINE_DISTANCE(content_vector, :embedding_string) as distance
FROM LIGHTRAG_GRAPH_EDGES WHERE workspace = :workspace) e
WHERE e.distance>:better_than_threshold ORDER BY e.distance DESC LIMIT :top_k
""",
"chunks": """SELECT c.id FROM
(SELECT chunk_id as id,VEC_COSINE_DISTANCE(content_vector, :embedding_string) as distance
"chunks": """SELECT c.id, UNIX_TIMESTAMP(c.createtime) as created_at FROM
(SELECT chunk_id as id, createtime, VEC_COSINE_DISTANCE(content_vector, :embedding_string) as distance
FROM LIGHTRAG_DOC_CHUNKS WHERE workspace = :workspace) c
WHERE c.distance>:better_than_threshold ORDER BY c.distance DESC LIMIT :top_k
""",
@@ -1187,23 +1156,21 @@ SQL_TEMPLATES = {
"has_relationship": """
SELECT COUNT(id) AS cnt FROM LIGHTRAG_GRAPH_EDGES WHERE source_name = :source_name AND target_name = :target_name AND workspace = :workspace
""",
"update_entity": """
UPDATE LIGHTRAG_GRAPH_NODES SET
entity_id = :id, content = :content, content_vector = :content_vector, updatetime = CURRENT_TIMESTAMP
WHERE workspace = :workspace AND name = :name
"upsert_entity": """
INSERT INTO LIGHTRAG_GRAPH_NODES(entity_id, name, content, content_vector, workspace, createtime, updatetime)
VALUES(:id, :name, :content, :content_vector, :workspace, FROM_UNIXTIME(:timestamp), FROM_UNIXTIME(:timestamp))
ON DUPLICATE KEY UPDATE
content = VALUES(content),
content_vector = VALUES(content_vector),
updatetime = FROM_UNIXTIME(:timestamp)
""",
"update_relationship": """
UPDATE LIGHTRAG_GRAPH_EDGES SET
relation_id = :id, content = :content, content_vector = :content_vector, updatetime = CURRENT_TIMESTAMP
WHERE workspace = :workspace AND source_name = :source_name AND target_name = :target_name
""",
"insert_entity": """
INSERT INTO LIGHTRAG_GRAPH_NODES(entity_id, name, content, content_vector, workspace)
VALUES(:id, :name, :content, :content_vector, :workspace)
""",
"insert_relationship": """
INSERT INTO LIGHTRAG_GRAPH_EDGES(relation_id, source_name, target_name, content, content_vector, workspace)
VALUES(:id, :source_name, :target_name, :content, :content_vector, :workspace)
"upsert_relationship": """
INSERT INTO LIGHTRAG_GRAPH_EDGES(relation_id, source_name, target_name, content, content_vector, workspace, createtime, updatetime)
VALUES(:id, :source_name, :target_name, :content, :content_vector, :workspace, FROM_UNIXTIME(:timestamp), FROM_UNIXTIME(:timestamp))
ON DUPLICATE KEY UPDATE
content = VALUES(content),
content_vector = VALUES(content_vector),
updatetime = FROM_UNIXTIME(:timestamp)
""",
# SQL for GraphStorage
"get_node": """
@@ -1275,22 +1242,6 @@ SQL_TEMPLATES = {
WHERE (source_name = :source AND target_name = :target)
AND workspace = :workspace
""",
# Search by prefix SQL templates
"search_entity_by_prefix": """
SELECT entity_id as id, name as entity_name, entity_type, description, content
FROM LIGHTRAG_GRAPH_NODES
WHERE entity_id LIKE :prefix_pattern AND workspace = :workspace
""",
"search_relationship_by_prefix": """
SELECT relation_id as id, source_name as src_id, target_name as tgt_id, keywords, description, content
FROM LIGHTRAG_GRAPH_EDGES
WHERE relation_id LIKE :prefix_pattern AND workspace = :workspace
""",
"search_chunk_by_prefix": """
SELECT chunk_id as id, content, tokens, chunk_order_index, full_doc_id
FROM LIGHTRAG_DOC_CHUNKS
WHERE chunk_id LIKE :prefix_pattern AND workspace = :workspace
""",
# Drop tables
"drop_specifiy_table_workspace": "DELETE FROM {table_name} WHERE workspace = :workspace",
}

View File

@@ -6,7 +6,7 @@ import configparser
import os
import warnings
from dataclasses import asdict, dataclass, field
from datetime import datetime
from datetime import datetime, timezone
from functools import partial
from typing import (
Any,
@@ -756,8 +756,8 @@ class LightRAG:
"content": content_data["content"],
"content_summary": get_content_summary(content_data["content"]),
"content_length": len(content_data["content"]),
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"created_at": datetime.now(timezone.utc).isoformat(),
"updated_at": datetime.now(timezone.utc).isoformat(),
"file_path": content_data[
"file_path"
], # Store file path in document status
@@ -840,7 +840,7 @@ class LightRAG:
{
"busy": True,
"job_name": "Default Job",
"job_start": datetime.now().isoformat(),
"job_start": datetime.now(timezone.utc).isoformat(),
"docs": 0,
"batchs": 0, # Total number of files to be processed
"cur_batch": 0, # Number of files already processed
@@ -958,7 +958,9 @@ class LightRAG:
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now().isoformat(),
"updated_at": datetime.now(
timezone.utc
).isoformat(),
"file_path": file_path,
}
}
@@ -1018,7 +1020,9 @@ class LightRAG:
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now().isoformat(),
"updated_at": datetime.now(
timezone.utc
).isoformat(),
"file_path": file_path,
}
}
@@ -1053,7 +1057,9 @@ class LightRAG:
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now().isoformat(),
"updated_at": datetime.now(
timezone.utc
).isoformat(),
"file_path": file_path,
}
}

View File

@@ -311,6 +311,7 @@ async def _merge_nodes_then_upsert(
description=description,
source_id=source_id,
file_path=file_path,
created_at=int(time.time()),
)
await knowledge_graph_inst.upsert_node(
entity_name,
@@ -422,6 +423,7 @@ async def _merge_edges_then_upsert(
"description": description,
"entity_type": "UNKNOWN",
"file_path": file_path,
"created_at": int(time.time()),
},
)
@@ -465,6 +467,7 @@ async def _merge_edges_then_upsert(
keywords=keywords,
source_id=source_id,
file_path=file_path,
created_at=int(time.time()),
),
)
@@ -1455,7 +1458,12 @@ async def _get_node_data(
logger.warning("Some nodes are missing, maybe the storage is damaged")
node_datas = [
{**n, "entity_name": k["entity_name"], "rank": d}
{
**n,
"entity_name": k["entity_name"],
"rank": d,
"created_at": k.get("created_at"),
}
for k, n, d in zip(results, node_datas, node_degrees)
if n is not None
] # what is this text_chunks_db doing. dont remember it in airvx. check the diagram.
@@ -1774,7 +1782,7 @@ async def _get_edge_data(
"src_id": k["src_id"],
"tgt_id": k["tgt_id"],
"rank": edge_degrees_dict.get(pair, k.get("rank", 0)),
"created_at": k.get("__created_at__", None),
"created_at": k.get("created_at", None),
**edge_props,
}
edge_datas.append(combined)
@@ -1820,7 +1828,7 @@ async def _get_edge_data(
]
]
for i, e in enumerate(edge_datas):
created_at = e.get("created_at", "Unknown")
created_at = e.get("created_at", "UNKNOWN")
# Convert timestamp to readable format
if isinstance(created_at, (int, float)):
created_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(created_at))
@@ -1847,7 +1855,7 @@ async def _get_edge_data(
["id", "entity", "type", "description", "rank", "created_at", "file_path"]
]
for i, n in enumerate(use_entities):
created_at = n.get("created_at", "Unknown")
created_at = n.get("created_at", "UNKNOWN")
# Convert timestamp to readable format
if isinstance(created_at, (int, float)):
created_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(created_at))