diff --git a/examples/.env.oai.example b/examples/.env.oai.example deleted file mode 100644 index cea86da2..00000000 --- a/examples/.env.oai.example +++ /dev/null @@ -1,7 +0,0 @@ -AZURE_OPENAI_API_VERSION=2024-08-01-preview -AZURE_OPENAI_DEPLOYMENT=gpt-4o -AZURE_OPENAI_API_KEY=myapikey -AZURE_OPENAI_ENDPOINT=https://myendpoint.openai.azure.com - -AZURE_EMBEDDING_DEPLOYMENT=text-embedding-3-large -AZURE_EMBEDDING_API_VERSION=2023-05-15 diff --git a/examples/batch_eval.py b/examples/batch_eval.py deleted file mode 100644 index a85e1ede..00000000 --- a/examples/batch_eval.py +++ /dev/null @@ -1,108 +0,0 @@ -import re -import json -import jsonlines - -from openai import OpenAI - - -def batch_eval(query_file, result1_file, result2_file, output_file_path): - client = OpenAI() - - with open(query_file, "r") as f: - data = f.read() - - queries = re.findall(r"- Question \d+: (.+)", data) - - with open(result1_file, "r") as f: - answers1 = json.load(f) - answers1 = [i["result"] for i in answers1] - - with open(result2_file, "r") as f: - answers2 = json.load(f) - answers2 = [i["result"] for i in answers2] - - requests = [] - for i, (query, answer1, answer2) in enumerate(zip(queries, answers1, answers2)): - sys_prompt = """ - ---Role--- - You are an expert tasked with evaluating two answers to the same question based on three criteria: **Comprehensiveness**, **Diversity**, and **Empowerment**. - """ - - prompt = f""" - You will evaluate two answers to the same question based on three criteria: **Comprehensiveness**, **Diversity**, and **Empowerment**. - - - **Comprehensiveness**: How much detail does the answer provide to cover all aspects and details of the question? - - **Diversity**: How varied and rich is the answer in providing different perspectives and insights on the question? - - **Empowerment**: How well does the answer help the reader understand and make informed judgments about the topic? - - For each criterion, choose the better answer (either Answer 1 or Answer 2) and explain why. Then, select an overall winner based on these three categories. - - Here is the question: - {query} - - Here are the two answers: - - **Answer 1:** - {answer1} - - **Answer 2:** - {answer2} - - Evaluate both answers using the three criteria listed above and provide detailed explanations for each criterion. - - Output your evaluation in the following JSON format: - - {{ - "Comprehensiveness": {{ - "Winner": "[Answer 1 or Answer 2]", - "Explanation": "[Provide explanation here]" - }}, - "Empowerment": {{ - "Winner": "[Answer 1 or Answer 2]", - "Explanation": "[Provide explanation here]" - }}, - "Overall Winner": {{ - "Winner": "[Answer 1 or Answer 2]", - "Explanation": "[Summarize why this answer is the overall winner based on the three criteria]" - }} - }} - """ - - request_data = { - "custom_id": f"request-{i+1}", - "method": "POST", - "url": "/v1/chat/completions", - "body": { - "model": "gpt-4o-mini", - "messages": [ - {"role": "system", "content": sys_prompt}, - {"role": "user", "content": prompt}, - ], - }, - } - - requests.append(request_data) - - with jsonlines.open(output_file_path, mode="w") as writer: - for request in requests: - writer.write(request) - - print(f"Batch API requests written to {output_file_path}") - - batch_input_file = client.files.create( - file=open(output_file_path, "rb"), purpose="batch" - ) - batch_input_file_id = batch_input_file.id - - batch = client.batches.create( - input_file_id=batch_input_file_id, - endpoint="/v1/chat/completions", - completion_window="24h", - metadata={"description": "nightly eval job"}, - ) - - print(f"Batch {batch.id} has been created.") - - -if __name__ == "__main__": - batch_eval() diff --git a/examples/generate_query.py b/examples/generate_query.py deleted file mode 100644 index 705b23d3..00000000 --- a/examples/generate_query.py +++ /dev/null @@ -1,55 +0,0 @@ -from openai import OpenAI - -# os.environ["OPENAI_API_KEY"] = "" - - -def openai_complete_if_cache( - model="gpt-4o-mini", prompt=None, system_prompt=None, history_messages=[], **kwargs -) -> str: - openai_client = OpenAI() - - messages = [] - if system_prompt: - messages.append({"role": "system", "content": system_prompt}) - messages.extend(history_messages) - messages.append({"role": "user", "content": prompt}) - - response = openai_client.chat.completions.create( - model=model, messages=messages, **kwargs - ) - return response.choices[0].message.content - - -if __name__ == "__main__": - description = "" - prompt = f""" - Given the following description of a dataset: - - {description} - - Please identify 5 potential users who would engage with this dataset. For each user, list 5 tasks they would perform with this dataset. Then, for each (user, task) combination, generate 5 questions that require a high-level understanding of the entire dataset. - - Output the results in the following structure: - - User 1: [user description] - - Task 1: [task description] - - Question 1: - - Question 2: - - Question 3: - - Question 4: - - Question 5: - - Task 2: [task description] - ... - - Task 5: [task description] - - User 2: [user description] - ... - - User 5: [user description] - ... - """ - - result = openai_complete_if_cache(model="gpt-4o-mini", prompt=prompt) - - file_path = "./queries.txt" - with open(file_path, "w") as file: - file.write(result) - - print(f"Queries written to {file_path}") diff --git a/examples/get_all_edges_nx.py b/examples/get_all_edges_nx.py deleted file mode 100644 index b2c1d84e..00000000 --- a/examples/get_all_edges_nx.py +++ /dev/null @@ -1,40 +0,0 @@ -import networkx as nx - -G = nx.read_graphml("./dickensTestEmbedcall/graph_chunk_entity_relation.graphml") - - -def get_all_edges_and_nodes(G): - # Get all edges and their properties - edges_with_properties = [] - for u, v, data in G.edges(data=True): - edges_with_properties.append( - { - "start": u, - "end": v, - "label": data.get( - "label", "" - ), # Assuming 'label' is used for edge type - "properties": data, - "start_node_properties": G.nodes[u], - "end_node_properties": G.nodes[v], - } - ) - - return edges_with_properties - - -# Example usage -if __name__ == "__main__": - # Assume G is your NetworkX graph loaded from Neo4j - - all_edges = get_all_edges_and_nodes(G) - - # Print all edges and node properties - for edge in all_edges: - print(f"Edge Label: {edge['label']}") - print(f"Edge Properties: {edge['properties']}") - print(f"Start Node: {edge['start']}") - print(f"Start Node Properties: {edge['start_node_properties']}") - print(f"End Node: {edge['end']}") - print(f"End Node Properties: {edge['end_node_properties']}") - print("---") diff --git a/examples/openai_README.md b/examples/openai_README.md deleted file mode 100644 index e2d8d42e..00000000 --- a/examples/openai_README.md +++ /dev/null @@ -1,114 +0,0 @@ - -## API Server Implementation - -LightRAG also provides a FastAPI-based server implementation for RESTful API access to RAG operations. This allows you to run LightRAG as a service and interact with it through HTTP requests. - -### Setting up the API Server -
-Click to expand setup instructions - -1. First, ensure you have the required dependencies: -```bash -pip install fastapi uvicorn pydantic -``` - -2. Set up your environment variables: -```bash -export RAG_DIR="your_index_directory" # Optional: Defaults to "index_default" -export OPENAI_BASE_URL="Your OpenAI API base URL" # Optional: Defaults to "https://api.openai.com/v1" -export OPENAI_API_KEY="Your OpenAI API key" # Required -export LLM_MODEL="Your LLM model" # Optional: Defaults to "gpt-4o-mini" -export EMBEDDING_MODEL="Your embedding model" # Optional: Defaults to "text-embedding-3-large" -``` - -3. Run the API server: -```bash -python examples/lightrag_api_openai_compatible_demo.py -``` - -The server will start on `http://0.0.0.0:8020`. -
- -### API Endpoints - -The API server provides the following endpoints: - -#### 1. Query Endpoint -
-Click to view Query endpoint details - -- **URL:** `/query` -- **Method:** POST -- **Body:** -```json -{ - "query": "Your question here", - "mode": "hybrid", // Can be "naive", "local", "global", or "hybrid" - "only_need_context": true // Optional: Defaults to false, if true, only the referenced context will be returned, otherwise the llm answer will be returned -} -``` -- **Example:** -```bash -curl -X POST "http://127.0.0.1:8020/query" \ - -H "Content-Type: application/json" \ - -d '{"query": "What are the main themes?", "mode": "hybrid"}' -``` -
- -#### 2. Insert Text Endpoint -
-Click to view Insert Text endpoint details - -- **URL:** `/insert` -- **Method:** POST -- **Body:** -```json -{ - "text": "Your text content here" -} -``` -- **Example:** -```bash -curl -X POST "http://127.0.0.1:8020/insert" \ - -H "Content-Type: application/json" \ - -d '{"text": "Content to be inserted into RAG"}' -``` -
- -#### 3. Insert File Endpoint -
-Click to view Insert File endpoint details - -- **URL:** `/insert_file` -- **Method:** POST -- **Body:** -```json -{ - "file_path": "path/to/your/file.txt" -} -``` -- **Example:** -```bash -curl -X POST "http://127.0.0.1:8020/insert_file" \ - -H "Content-Type: application/json" \ - -d '{"file_path": "./book.txt"}' -``` -
- -#### 4. Health Check Endpoint -
-Click to view Health Check endpoint details - -- **URL:** `/health` -- **Method:** GET -- **Example:** -```bash -curl -X GET "http://127.0.0.1:8020/health" -``` -
- -### Configuration - -The API server can be configured using environment variables: -- `RAG_DIR`: Directory for storing the RAG index (default: "index_default") -- API keys and base URLs should be configured in the code for your specific LLM and embedding model providers diff --git a/examples/openai_README_zh.md b/examples/openai_README_zh.md deleted file mode 100644 index 068b2caf..00000000 --- a/examples/openai_README_zh.md +++ /dev/null @@ -1,115 +0,0 @@ - -## API 服务器实现 - -LightRAG also provides a FastAPI-based server implementation for RESTful API access to RAG operations. This allows you to run LightRAG as a service and interact with it through HTTP requests. -LightRAG 还提供基于 FastAPI 的服务器实现,用于对 RAG 操作进行 RESTful API 访问。这允许您将 LightRAG 作为服务运行并通过 HTTP 请求与其交互。 - -### 设置 API 服务器 -
-单击展开设置说明 - -1. 首先,确保您具有所需的依赖项: -```bash -pip install fastapi uvicorn pydantic -``` - -2. 设置您的环境变量: -```bash -export RAG_DIR="your_index_directory" # Optional: Defaults to "index_default" -export OPENAI_BASE_URL="Your OpenAI API base URL" # Optional: Defaults to "https://api.openai.com/v1" -export OPENAI_API_KEY="Your OpenAI API key" # Required -export LLM_MODEL="Your LLM model" # Optional: Defaults to "gpt-4o-mini" -export EMBEDDING_MODEL="Your embedding model" # Optional: Defaults to "text-embedding-3-large" -``` - -3. 运行API服务器: -```bash -python examples/lightrag_api_openai_compatible_demo.py -``` - -服务器将启动于 `http://0.0.0.0:8020`. -
- -### API端点 - -API服务器提供以下端点: - -#### 1. 查询端点 -
-点击查看查询端点详情 - -- **URL:** `/query` -- **Method:** POST -- **Body:** -```json -{ - "query": "Your question here", - "mode": "hybrid", // Can be "naive", "local", "global", or "hybrid" - "only_need_context": true // Optional: Defaults to false, if true, only the referenced context will be returned, otherwise the llm answer will be returned -} -``` -- **Example:** -```bash -curl -X POST "http://127.0.0.1:8020/query" \ - -H "Content-Type: application/json" \ - -d '{"query": "What are the main themes?", "mode": "hybrid"}' -``` -
- -#### 2. 插入文本端点 -
-单击可查看插入文本端点详细信息 - -- **URL:** `/insert` -- **Method:** POST -- **Body:** -```json -{ - "text": "Your text content here" -} -``` -- **Example:** -```bash -curl -X POST "http://127.0.0.1:8020/insert" \ - -H "Content-Type: application/json" \ - -d '{"text": "Content to be inserted into RAG"}' -``` -
- -#### 3. 插入文件端点 -
-单击查看插入文件端点详细信息 - -- **URL:** `/insert_file` -- **Method:** POST -- **Body:** -```json -{ - "file_path": "path/to/your/file.txt" -} -``` -- **Example:** -```bash -curl -X POST "http://127.0.0.1:8020/insert_file" \ - -H "Content-Type: application/json" \ - -d '{"file_path": "./book.txt"}' -``` -
- -#### 4. 健康检查端点 -
-点击查看健康检查端点详细信息 - -- **URL:** `/health` -- **Method:** GET -- **Example:** -```bash -curl -X GET "http://127.0.0.1:8020/health" -``` -
- -### 配置 - -可以使用环境变量配置API服务器: -- `RAG_DIR`: 存放RAG索引的目录 (default: "index_default") -- 应在代码中为您的特定 LLM 和嵌入模型提供商配置 API 密钥和基本 URL diff --git a/examples/test.py b/examples/test.py deleted file mode 100644 index f2456436..00000000 --- a/examples/test.py +++ /dev/null @@ -1,68 +0,0 @@ -import os -import asyncio -from lightrag import LightRAG, QueryParam -from lightrag.llm.openai import gpt_4o_mini_complete -from lightrag.kg.shared_storage import initialize_pipeline_status -######### -# Uncomment the below two lines if running in a jupyter notebook to handle the async nature of rag.insert() -# import nest_asyncio -# nest_asyncio.apply() -######### - -WORKING_DIR = "./dickens" - -if not os.path.exists(WORKING_DIR): - os.mkdir(WORKING_DIR) - - -async def initialize_rag(): - rag = LightRAG( - working_dir=WORKING_DIR, - llm_model_func=gpt_4o_mini_complete, # Use gpt_4o_mini_complete LLM model - # llm_model_func=gpt_4o_complete # Optionally, use a stronger model - ) - - await rag.initialize_storages() - await initialize_pipeline_status() - - return rag - - -def main(): - # Initialize RAG instance - rag = asyncio.run(initialize_rag()) - - with open("./book.txt", "r", encoding="utf-8") as f: - rag.insert(f.read()) - - # Perform naive search - print( - rag.query( - "What are the top themes in this story?", param=QueryParam(mode="naive") - ) - ) - - # Perform local search - print( - rag.query( - "What are the top themes in this story?", param=QueryParam(mode="local") - ) - ) - - # Perform global search - print( - rag.query( - "What are the top themes in this story?", param=QueryParam(mode="global") - ) - ) - - # Perform hybrid search - print( - rag.query( - "What are the top themes in this story?", param=QueryParam(mode="hybrid") - ) - ) - - -if __name__ == "__main__": - main() diff --git a/examples/test_chromadb.py b/examples/test_chromadb.py deleted file mode 100644 index e4e9b698..00000000 --- a/examples/test_chromadb.py +++ /dev/null @@ -1,158 +0,0 @@ -import os -import asyncio -from lightrag import LightRAG, QueryParam -from lightrag.llm.openai import gpt_4o_mini_complete, openai_embed -from lightrag.utils import EmbeddingFunc -import numpy as np -from lightrag.kg.shared_storage import initialize_pipeline_status - -######### -# Uncomment the below two lines if running in a jupyter notebook to handle the async nature of rag.insert() -# import nest_asyncio -# nest_asyncio.apply() -######### -WORKING_DIR = "./chromadb_test_dir" -if not os.path.exists(WORKING_DIR): - os.mkdir(WORKING_DIR) - -# ChromaDB Configuration -CHROMADB_USE_LOCAL_PERSISTENT = False -# Local PersistentClient Configuration -CHROMADB_LOCAL_PATH = os.environ.get( - "CHROMADB_LOCAL_PATH", os.path.join(WORKING_DIR, "chroma_data") -) -# Remote HttpClient Configuration -CHROMADB_HOST = os.environ.get("CHROMADB_HOST", "localhost") -CHROMADB_PORT = int(os.environ.get("CHROMADB_PORT", 8000)) -CHROMADB_AUTH_TOKEN = os.environ.get("CHROMADB_AUTH_TOKEN", "secret-token") -CHROMADB_AUTH_PROVIDER = os.environ.get( - "CHROMADB_AUTH_PROVIDER", "chromadb.auth.token_authn.TokenAuthClientProvider" -) -CHROMADB_AUTH_HEADER = os.environ.get("CHROMADB_AUTH_HEADER", "X-Chroma-Token") - -# Embedding Configuration and Functions -EMBEDDING_MODEL = os.environ.get("EMBEDDING_MODEL", "text-embedding-3-large") -EMBEDDING_MAX_TOKEN_SIZE = int(os.environ.get("EMBEDDING_MAX_TOKEN_SIZE", 8192)) - -# ChromaDB requires knowing the dimension of embeddings upfront when -# creating a collection. The embedding dimension is model-specific -# (e.g. text-embedding-3-large uses 3072 dimensions) -# we dynamically determine it by running a test embedding -# and then pass it to the ChromaDBStorage class - - -async def embedding_func(texts: list[str]) -> np.ndarray: - return await openai_embed( - texts, - model=EMBEDDING_MODEL, - ) - - -async def get_embedding_dimension(): - test_text = ["This is a test sentence."] - embedding = await embedding_func(test_text) - return embedding.shape[1] - - -async def create_embedding_function_instance(): - # Get embedding dimension - embedding_dimension = await get_embedding_dimension() - # Create embedding function instance - return EmbeddingFunc( - embedding_dim=embedding_dimension, - max_token_size=EMBEDDING_MAX_TOKEN_SIZE, - func=embedding_func, - ) - - -async def initialize_rag(): - embedding_func_instance = await create_embedding_function_instance() - if CHROMADB_USE_LOCAL_PERSISTENT: - rag = LightRAG( - working_dir=WORKING_DIR, - llm_model_func=gpt_4o_mini_complete, - embedding_func=embedding_func_instance, - vector_storage="ChromaVectorDBStorage", - log_level="DEBUG", - embedding_batch_num=32, - vector_db_storage_cls_kwargs={ - "local_path": CHROMADB_LOCAL_PATH, - "collection_settings": { - "hnsw:space": "cosine", - "hnsw:construction_ef": 128, - "hnsw:search_ef": 128, - "hnsw:M": 16, - "hnsw:batch_size": 100, - "hnsw:sync_threshold": 1000, - }, - }, - ) - else: - rag = LightRAG( - working_dir=WORKING_DIR, - llm_model_func=gpt_4o_mini_complete, - embedding_func=embedding_func_instance, - vector_storage="ChromaVectorDBStorage", - log_level="DEBUG", - embedding_batch_num=32, - vector_db_storage_cls_kwargs={ - "host": CHROMADB_HOST, - "port": CHROMADB_PORT, - "auth_token": CHROMADB_AUTH_TOKEN, - "auth_provider": CHROMADB_AUTH_PROVIDER, - "auth_header_name": CHROMADB_AUTH_HEADER, - "collection_settings": { - "hnsw:space": "cosine", - "hnsw:construction_ef": 128, - "hnsw:search_ef": 128, - "hnsw:M": 16, - "hnsw:batch_size": 100, - "hnsw:sync_threshold": 1000, - }, - }, - ) - - await rag.initialize_storages() - await initialize_pipeline_status() - - return rag - - -def main(): - # Initialize RAG instance - rag = asyncio.run(initialize_rag()) - - with open("./book.txt", "r", encoding="utf-8") as f: - rag.insert(f.read()) - - # Perform naive search - print( - rag.query( - "What are the top themes in this story?", param=QueryParam(mode="naive") - ) - ) - - # Perform local search - print( - rag.query( - "What are the top themes in this story?", param=QueryParam(mode="local") - ) - ) - - # Perform global search - print( - rag.query( - "What are the top themes in this story?", param=QueryParam(mode="global") - ) - ) - - # Perform hybrid search - print( - rag.query( - "What are the top themes in this story?", param=QueryParam(mode="hybrid") - ) - ) - - -if __name__ == "__main__": - main() diff --git a/examples/test_faiss.py b/examples/test_faiss.py deleted file mode 100644 index febdce14..00000000 --- a/examples/test_faiss.py +++ /dev/null @@ -1,108 +0,0 @@ -import os -import logging -import asyncio -import numpy as np - -from dotenv import load_dotenv -from sentence_transformers import SentenceTransformer - -from openai import AzureOpenAI -from lightrag import LightRAG, QueryParam -from lightrag.utils import EmbeddingFunc -from lightrag.kg.shared_storage import initialize_pipeline_status - -WORKING_DIR = "./dickens" -# Configure Logging -logging.basicConfig(level=logging.INFO) - -# Load environment variables from .env file -load_dotenv() -AZURE_OPENAI_API_VERSION = os.getenv("AZURE_OPENAI_API_VERSION") -AZURE_OPENAI_DEPLOYMENT = os.getenv("AZURE_OPENAI_DEPLOYMENT") -AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY") -AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT") - - -async def llm_model_func( - prompt, system_prompt=None, history_messages=[], keyword_extraction=False, **kwargs -) -> str: - # Create a client for AzureOpenAI - client = AzureOpenAI( - api_key=AZURE_OPENAI_API_KEY, - api_version=AZURE_OPENAI_API_VERSION, - azure_endpoint=AZURE_OPENAI_ENDPOINT, - ) - - # Build the messages list for the conversation - messages = [] - if system_prompt: - messages.append({"role": "system", "content": system_prompt}) - if history_messages: - messages.extend(history_messages) - messages.append({"role": "user", "content": prompt}) - - # Call the LLM - chat_completion = client.chat.completions.create( - model=AZURE_OPENAI_DEPLOYMENT, - messages=messages, - temperature=kwargs.get("temperature", 0), - top_p=kwargs.get("top_p", 1), - n=kwargs.get("n", 1), - ) - - return chat_completion.choices[0].message.content - - -async def embedding_func(texts: list[str]) -> np.ndarray: - model = SentenceTransformer("all-MiniLM-L6-v2") - embeddings = model.encode(texts, convert_to_numpy=True) - return embeddings - - -async def initialize_rag(): - rag = LightRAG( - working_dir=WORKING_DIR, - llm_model_func=llm_model_func, - embedding_func=EmbeddingFunc( - embedding_dim=384, - max_token_size=8192, - func=embedding_func, - ), - vector_storage="FaissVectorDBStorage", - vector_db_storage_cls_kwargs={ - "cosine_better_than_threshold": 0.2 # Your desired threshold - }, - ) - - await rag.initialize_storages() - await initialize_pipeline_status() - - return rag - - -def main(): - # Initialize RAG instance - rag = asyncio.run(initialize_rag()) - # Insert the custom chunks into LightRAG - book1 = open("./book_1.txt", encoding="utf-8") - book2 = open("./book_2.txt", encoding="utf-8") - - rag.insert([book1.read(), book2.read()]) - - query_text = "What are the main themes?" - - print("Result (Naive):") - print(rag.query(query_text, param=QueryParam(mode="naive"))) - - print("\nResult (Local):") - print(rag.query(query_text, param=QueryParam(mode="local"))) - - print("\nResult (Global):") - print(rag.query(query_text, param=QueryParam(mode="global"))) - - print("\nResult (Hybrid):") - print(rag.query(query_text, param=QueryParam(mode="hybrid"))) - - -if __name__ == "__main__": - main() diff --git a/examples/test_neo4j.py b/examples/test_neo4j.py deleted file mode 100644 index 7f620acc..00000000 --- a/examples/test_neo4j.py +++ /dev/null @@ -1,71 +0,0 @@ -import os -import asyncio -from lightrag import LightRAG, QueryParam -from lightrag.llm.openai import gpt_4o_mini_complete -from lightrag.kg.shared_storage import initialize_pipeline_status - -######### -# Uncomment the below two lines if running in a jupyter notebook to handle the async nature of rag.insert() -# import nest_asyncio -# nest_asyncio.apply() -######### - -WORKING_DIR = "./local_neo4jWorkDir" - -if not os.path.exists(WORKING_DIR): - os.mkdir(WORKING_DIR) - - -async def initialize_rag(): - rag = LightRAG( - working_dir=WORKING_DIR, - llm_model_func=gpt_4o_mini_complete, # Use gpt_4o_mini_complete LLM model - graph_storage="Neo4JStorage", - log_level="INFO", - # llm_model_func=gpt_4o_complete # Optionally, use a stronger model - ) - - await rag.initialize_storages() - await initialize_pipeline_status() - - return rag - - -def main(): - # Initialize RAG instance - rag = asyncio.run(initialize_rag()) - - with open("./book.txt", "r", encoding="utf-8") as f: - rag.insert(f.read()) - - # Perform naive search - print( - rag.query( - "What are the top themes in this story?", param=QueryParam(mode="naive") - ) - ) - - # Perform local search - print( - rag.query( - "What are the top themes in this story?", param=QueryParam(mode="local") - ) - ) - - # Perform global search - print( - rag.query( - "What are the top themes in this story?", param=QueryParam(mode="global") - ) - ) - - # Perform hybrid search - print( - rag.query( - "What are the top themes in this story?", param=QueryParam(mode="hybrid") - ) - ) - - -if __name__ == "__main__": - main() diff --git a/examples/test_postgres.py b/examples/test_postgres.py deleted file mode 100644 index e1f796c6..00000000 --- a/examples/test_postgres.py +++ /dev/null @@ -1,51 +0,0 @@ -import os -import asyncio -from lightrag.kg.postgres_impl import PGGraphStorage -from lightrag.llm.ollama import ollama_embedding -from lightrag.utils import EmbeddingFunc - -######### -# Uncomment the below two lines if running in a jupyter notebook to handle the async nature of rag.insert() -# import nest_asyncio -# nest_asyncio.apply() -######### - -WORKING_DIR = "./local_neo4jWorkDir" - -if not os.path.exists(WORKING_DIR): - os.mkdir(WORKING_DIR) - -# AGE -os.environ["AGE_GRAPH_NAME"] = "dickens" - -os.environ["POSTGRES_HOST"] = "localhost" -os.environ["POSTGRES_PORT"] = "15432" -os.environ["POSTGRES_USER"] = "rag" -os.environ["POSTGRES_PASSWORD"] = "rag" -os.environ["POSTGRES_DATABASE"] = "rag" - - -async def main(): - graph_db = PGGraphStorage( - namespace="dickens", - embedding_func=EmbeddingFunc( - embedding_dim=1024, - max_token_size=8192, - func=lambda texts: ollama_embedding( - texts, embed_model="bge-m3", host="http://localhost:11434" - ), - ), - global_config={}, - ) - await graph_db.initialize() - labels = await graph_db.get_all_labels() - print("all labels", labels) - - res = await graph_db.get_knowledge_graph("FEZZIWIG") - print("knowledge graphs", res) - - await graph_db.finalize() - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/examples/vram_management_demo.py b/examples/vram_management_demo.py deleted file mode 100644 index 36eb5468..00000000 --- a/examples/vram_management_demo.py +++ /dev/null @@ -1,121 +0,0 @@ -import os -import time -import asyncio -from lightrag import LightRAG, QueryParam -from lightrag.llm.ollama import ollama_model_complete, ollama_embed -from lightrag.utils import EmbeddingFunc -from lightrag.kg.shared_storage import initialize_pipeline_status - -# Working directory and the directory path for text files -WORKING_DIR = "./dickens" -TEXT_FILES_DIR = "/llm/mt" - -# Create the working directory if it doesn't exist -if not os.path.exists(WORKING_DIR): - os.mkdir(WORKING_DIR) - - -async def initialize_rag(): - # Initialize LightRAG - rag = LightRAG( - working_dir=WORKING_DIR, - llm_model_func=ollama_model_complete, - llm_model_name="qwen2.5:3b-instruct-max-context", - embedding_func=EmbeddingFunc( - embedding_dim=768, - max_token_size=8192, - func=lambda texts: ollama_embed(texts, embed_model="nomic-embed-text"), - ), - ) - await rag.initialize_storages() - await initialize_pipeline_status() - - return rag - - -# Read all .txt files from the TEXT_FILES_DIR directory -texts = [] -for filename in os.listdir(TEXT_FILES_DIR): - if filename.endswith(".txt"): - file_path = os.path.join(TEXT_FILES_DIR, filename) - with open(file_path, "r", encoding="utf-8") as file: - texts.append(file.read()) - - -# Batch insert texts into LightRAG with a retry mechanism -def insert_texts_with_retry(rag, texts, retries=3, delay=5): - for _ in range(retries): - try: - rag.insert(texts) - return - except Exception as e: - print( - f"Error occurred during insertion: {e}. Retrying in {delay} seconds..." - ) - time.sleep(delay) - raise RuntimeError("Failed to insert texts after multiple retries.") - - -def main(): - # Initialize RAG instance - rag = asyncio.run(initialize_rag()) - - insert_texts_with_retry(rag, texts) - - # Perform different types of queries and handle potential errors - try: - print( - rag.query( - "What are the top themes in this story?", param=QueryParam(mode="naive") - ) - ) - except Exception as e: - print(f"Error performing naive search: {e}") - - try: - print( - rag.query( - "What are the top themes in this story?", param=QueryParam(mode="local") - ) - ) - except Exception as e: - print(f"Error performing local search: {e}") - - try: - print( - rag.query( - "What are the top themes in this story?", - param=QueryParam(mode="global"), - ) - ) - except Exception as e: - print(f"Error performing global search: {e}") - - try: - print( - rag.query( - "What are the top themes in this story?", - param=QueryParam(mode="hybrid"), - ) - ) - except Exception as e: - print(f"Error performing hybrid search: {e}") - - # Function to clear VRAM resources - def clear_vram(): - os.system("sudo nvidia-smi --gpu-reset") - - # Regularly clear VRAM to prevent overflow - clear_vram_interval = 3600 # Clear once every hour - start_time = time.time() - - while True: - current_time = time.time() - if current_time - start_time > clear_vram_interval: - clear_vram() - start_time = current_time - time.sleep(60) # Check the time every minute - - -if __name__ == "__main__": - main() diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 823d7bff..aa5af7e5 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -9,7 +9,7 @@ import aiofiles import shutil import traceback import pipmaster as pm -from datetime import datetime +from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Any, Literal from fastapi import APIRouter, BackgroundTasks, Depends, File, HTTPException, UploadFile @@ -20,6 +20,32 @@ from lightrag.base import DocProcessingStatus, DocStatus from lightrag.api.utils_api import get_combined_auth_dependency from ..config import global_args + +# Function to format datetime to ISO format string with timezone information +def format_datetime(dt: Any) -> Optional[str]: + """Format datetime to ISO format string with timezone information + + Args: + dt: Datetime object, string, or None + + Returns: + ISO format string with timezone information, or None if input is None + """ + if dt is None: + return None + if isinstance(dt, str): + return dt + + # Check if datetime object has timezone information + if isinstance(dt, datetime): + # If datetime object has no timezone info (naive datetime), add UTC timezone + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + + # Return ISO format string with timezone information + return dt.isoformat() + + router = APIRouter( prefix="/documents", tags=["documents"], @@ -207,14 +233,6 @@ Attributes: class DocStatusResponse(BaseModel): - @staticmethod - def format_datetime(dt: Any) -> Optional[str]: - if dt is None: - return None - if isinstance(dt, str): - return dt - return dt.isoformat() - id: str = Field(description="Document identifier") content_summary: str = Field(description="Summary of document content") content_length: int = Field(description="Length of document content in characters") @@ -300,7 +318,7 @@ class PipelineStatusResponse(BaseModel): autoscanned: Whether auto-scan has started busy: Whether the pipeline is currently busy job_name: Current job name (e.g., indexing files/indexing texts) - job_start: Job start time as ISO format string (optional) + job_start: Job start time as ISO format string with timezone (optional) docs: Total number of documents to be indexed batchs: Number of batches for processing documents cur_batch: Current processing batch @@ -322,6 +340,12 @@ class PipelineStatusResponse(BaseModel): history_messages: Optional[List[str]] = None update_status: Optional[dict] = None + @field_validator("job_start", mode="before") + @classmethod + def parse_job_start(cls, value): + """Process datetime and return as ISO format string with timezone""" + return format_datetime(value) + class Config: extra = "allow" # Allow additional fields from the pipeline status @@ -1188,9 +1212,10 @@ def create_document_routes( if "history_messages" in status_dict: status_dict["history_messages"] = list(status_dict["history_messages"]) - # Format the job_start time if it exists - if status_dict.get("job_start"): - status_dict["job_start"] = str(status_dict["job_start"]) + # Ensure job_start is properly formatted as a string with timezone information + if "job_start" in status_dict and status_dict["job_start"]: + # Use format_datetime to ensure consistent formatting + status_dict["job_start"] = format_datetime(status_dict["job_start"]) return PipelineStatusResponse(**status_dict) except Exception as e: @@ -1240,12 +1265,8 @@ def create_document_routes( content_summary=doc_status.content_summary, content_length=doc_status.content_length, status=doc_status.status, - created_at=DocStatusResponse.format_datetime( - doc_status.created_at - ), - updated_at=DocStatusResponse.format_datetime( - doc_status.updated_at - ), + created_at=format_datetime(doc_status.created_at), + updated_at=format_datetime(doc_status.updated_at), chunks_count=doc_status.chunks_count, error=doc_status.error, metadata=doc_status.metadata, diff --git a/lightrag/kg/chroma_impl.py b/lightrag/kg/chroma_impl.py index 627cf480..c3927a19 100644 --- a/lightrag/kg/chroma_impl.py +++ b/lightrag/kg/chroma_impl.py @@ -114,11 +114,18 @@ class ChromaVectorDBStorage(BaseVectorStorage): return try: + import time + + current_time = int(time.time()) + ids = list(data.keys()) documents = [v["content"] for v in data.values()] metadatas = [ - {k: v for k, v in item.items() if k in self.meta_fields} - or {"_default": "true"} + { + **{k: v for k, v in item.items() if k in self.meta_fields}, + "created_at": current_time, + } + or {"_default": "true", "created_at": current_time} for item in data.values() ] @@ -183,6 +190,7 @@ class ChromaVectorDBStorage(BaseVectorStorage): "id": results["ids"][0][i], "distance": 1 - results["distances"][0][i], "content": results["documents"][0][i], + "created_at": results["metadatas"][0][i].get("created_at"), **results["metadatas"][0][i], } for i in range(len(results["ids"][0])) @@ -235,42 +243,6 @@ class ChromaVectorDBStorage(BaseVectorStorage): logger.error(f"Error while deleting vectors from {self.namespace}: {e}") raise - async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]: - """Search for records with IDs starting with a specific prefix. - - Args: - prefix: The prefix to search for in record IDs - - Returns: - List of records with matching ID prefixes - """ - try: - # Get all records from the collection - # Since ChromaDB doesn't directly support prefix search on IDs, - # we'll get all records and filter in Python - results = self._collection.get( - include=["metadatas", "documents", "embeddings"] - ) - - matching_records = [] - - # Filter records where ID starts with the prefix - for i, record_id in enumerate(results["ids"]): - if record_id.startswith(prefix): - matching_records.append( - { - "id": record_id, - "content": results["documents"][i], - "vector": results["embeddings"][i], - **results["metadatas"][i], - } - ) - - logger.debug( - f"Found {len(matching_records)} records with prefix '{prefix}'" - ) - return matching_records - except Exception as e: logger.error(f"Error during prefix search in ChromaDB: {str(e)}") raise @@ -298,6 +270,7 @@ class ChromaVectorDBStorage(BaseVectorStorage): "id": result["ids"][0], "vector": result["embeddings"][0], "content": result["documents"][0], + "created_at": result["metadatas"][0].get("created_at"), **result["metadatas"][0], } except Exception as e: @@ -331,6 +304,7 @@ class ChromaVectorDBStorage(BaseVectorStorage): "id": result["ids"][i], "vector": result["embeddings"][i], "content": result["documents"][i], + "created_at": result["metadatas"][i].get("created_at"), **result["metadatas"][i], } for i in range(len(result["ids"])) diff --git a/lightrag/kg/faiss_impl.py b/lightrag/kg/faiss_impl.py index d4e6ad01..f2afde2e 100644 --- a/lightrag/kg/faiss_impl.py +++ b/lightrag/kg/faiss_impl.py @@ -107,7 +107,7 @@ class FaissVectorDBStorage(BaseVectorStorage): if not data: return - current_time = time.time() + current_time = int(time.time()) # Prepare data for embedding list_data = [] @@ -385,27 +385,6 @@ class FaissVectorDBStorage(BaseVectorStorage): return True # Return success - async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]: - """Search for records with IDs starting with a specific prefix. - - Args: - prefix: The prefix to search for in record IDs - - Returns: - List of records with matching ID prefixes - """ - matching_records = [] - - # Search for records with IDs starting with the prefix - for faiss_id, meta in self._id_to_meta.items(): - if "__id__" in meta and meta["__id__"].startswith(prefix): - # Create a copy of all metadata and add "id" field - record = {**meta, "id": meta["__id__"]} - matching_records.append(record) - - logger.debug(f"Found {len(matching_records)} records with prefix '{prefix}'") - return matching_records - async def get_by_id(self, id: str) -> dict[str, Any] | None: """Get vector data by its ID @@ -425,7 +404,11 @@ class FaissVectorDBStorage(BaseVectorStorage): if not metadata: return None - return {**metadata, "id": metadata.get("__id__")} + return { + **metadata, + "id": metadata.get("__id__"), + "created_at": metadata.get("__created_at__"), + } async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: """Get multiple vector data by their IDs @@ -445,7 +428,13 @@ class FaissVectorDBStorage(BaseVectorStorage): if fid is not None: metadata = self._id_to_meta.get(fid, {}) if metadata: - results.append({**metadata, "id": metadata.get("__id__")}) + results.append( + { + **metadata, + "id": metadata.get("__id__"), + "created_at": metadata.get("__created_at__"), + } + ) return results diff --git a/lightrag/kg/milvus_impl.py b/lightrag/kg/milvus_impl.py index 74e0f039..23e178bc 100644 --- a/lightrag/kg/milvus_impl.py +++ b/lightrag/kg/milvus_impl.py @@ -79,9 +79,14 @@ class MilvusVectorDBStorage(BaseVectorStorage): if not data: return + import time + + current_time = int(time.time()) + list_data: list[dict[str, Any]] = [ { "id": k, + "created_at": current_time, **{k1: v1 for k1, v1 in v.items() if k1 in self.meta_fields}, } for k, v in data.items() @@ -111,7 +116,7 @@ class MilvusVectorDBStorage(BaseVectorStorage): collection_name=self.namespace, data=embedding, limit=top_k, - output_fields=list(self.meta_fields), + output_fields=list(self.meta_fields) + ["created_at"], search_params={ "metric_type": "COSINE", "params": {"radius": self.cosine_better_than_threshold}, @@ -119,7 +124,13 @@ class MilvusVectorDBStorage(BaseVectorStorage): ) print(results) return [ - {**dp["entity"], "id": dp["id"], "distance": dp["distance"]} + { + **dp["entity"], + "id": dp["id"], + "distance": dp["distance"], + # created_at is requested in output_fields, so it should be a top-level key in the result dict (dp) + "created_at": dp.get("created_at"), + } for dp in results[0] ] @@ -211,31 +222,6 @@ class MilvusVectorDBStorage(BaseVectorStorage): except Exception as e: logger.error(f"Error while deleting vectors from {self.namespace}: {e}") - async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]: - """Search for records with IDs starting with a specific prefix. - - Args: - prefix: The prefix to search for in record IDs - - Returns: - List of records with matching ID prefixes - """ - try: - # Use Milvus query with expression to find IDs with the given prefix - expression = f'id like "{prefix}%"' - results = self._client.query( - collection_name=self.namespace, - filter=expression, - output_fields=list(self.meta_fields) + ["id"], - ) - - logger.debug(f"Found {len(results)} records with prefix '{prefix}'") - return results - - except Exception as e: - logger.error(f"Error searching for records with prefix '{prefix}': {e}") - return [] - async def get_by_id(self, id: str) -> dict[str, Any] | None: """Get vector data by its ID @@ -250,12 +236,16 @@ class MilvusVectorDBStorage(BaseVectorStorage): result = self._client.query( collection_name=self.namespace, filter=f'id == "{id}"', - output_fields=list(self.meta_fields) + ["id"], + output_fields=list(self.meta_fields) + ["id", "created_at"], ) if not result or len(result) == 0: return None + # Ensure the result contains created_at field + if "created_at" not in result[0]: + result[0]["created_at"] = None + return result[0] except Exception as e: logger.error(f"Error retrieving vector data for ID {id}: {e}") @@ -282,9 +272,14 @@ class MilvusVectorDBStorage(BaseVectorStorage): result = self._client.query( collection_name=self.namespace, filter=filter_expr, - output_fields=list(self.meta_fields) + ["id"], + output_fields=list(self.meta_fields) + ["id", "created_at"], ) + # Ensure each result contains created_at field + for item in result: + if "created_at" not in item: + item["created_at"] = None + return result or [] except Exception as e: logger.error(f"Error retrieving vector data for IDs {ids}: {e}") diff --git a/lightrag/kg/mongo_impl.py b/lightrag/kg/mongo_impl.py index f4b15d60..d49a36b7 100644 --- a/lightrag/kg/mongo_impl.py +++ b/lightrag/kg/mongo_impl.py @@ -999,9 +999,15 @@ class MongoVectorDBStorage(BaseVectorStorage): if not data: return + # Add current time as Unix timestamp + import time + + current_time = int(time.time()) + list_data = [ { "_id": k, + "created_at": current_time, # Add created_at field as Unix timestamp **{k1: v1 for k1, v1 in v.items() if k1 in self.meta_fields}, } for k, v in data.items() @@ -1059,9 +1065,14 @@ class MongoVectorDBStorage(BaseVectorStorage): cursor = self._data.aggregate(pipeline) results = await cursor.to_list() - # Format and return the results + # Format and return the results with created_at field return [ - {**doc, "id": doc["_id"], "distance": doc.get("score", None)} + { + **doc, + "id": doc["_id"], + "distance": doc.get("score", None), + "created_at": doc.get("created_at"), # Include created_at field + } for doc in results ] @@ -1138,28 +1149,6 @@ class MongoVectorDBStorage(BaseVectorStorage): except PyMongoError as e: logger.error(f"Error deleting relations for {entity_name}: {str(e)}") - async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]: - """Search for records with IDs starting with a specific prefix. - - Args: - prefix: The prefix to search for in record IDs - - Returns: - List of records with matching ID prefixes - """ - try: - # Use MongoDB regex to find documents where _id starts with the prefix - cursor = self._data.find({"_id": {"$regex": f"^{prefix}"}}) - matching_records = await cursor.to_list(length=None) - - # Format results - results = [{**doc, "id": doc["_id"]} for doc in matching_records] - - logger.debug( - f"Found {len(results)} records with prefix '{prefix}' in {self.namespace}" - ) - return results - except PyMongoError as e: logger.error(f"Error searching by prefix in {self.namespace}: {str(e)}") return [] diff --git a/lightrag/kg/nano_vector_db_impl.py b/lightrag/kg/nano_vector_db_impl.py index 589dd9ee..fa56a214 100644 --- a/lightrag/kg/nano_vector_db_impl.py +++ b/lightrag/kg/nano_vector_db_impl.py @@ -89,7 +89,7 @@ class NanoVectorDBStorage(BaseVectorStorage): if not data: return - current_time = time.time() + current_time = int(time.time()) list_data = [ { "__id__": k, @@ -259,26 +259,6 @@ class NanoVectorDBStorage(BaseVectorStorage): return True # Return success - async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]: - """Search for records with IDs starting with a specific prefix. - - Args: - prefix: The prefix to search for in record IDs - - Returns: - List of records with matching ID prefixes - """ - storage = await self.client_storage - matching_records = [] - - # Search for records with IDs starting with the prefix - for record in storage["data"]: - if "__id__" in record and record["__id__"].startswith(prefix): - matching_records.append({**record, "id": record["__id__"]}) - - logger.debug(f"Found {len(matching_records)} records with prefix '{prefix}'") - return matching_records - async def get_by_id(self, id: str) -> dict[str, Any] | None: """Get vector data by its ID @@ -291,7 +271,12 @@ class NanoVectorDBStorage(BaseVectorStorage): client = await self._get_client() result = client.get([id]) if result: - return result[0] + dp = result[0] + return { + **dp, + "id": dp.get("__id__"), + "created_at": dp.get("__created_at__"), + } return None async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: @@ -307,7 +292,15 @@ class NanoVectorDBStorage(BaseVectorStorage): return [] client = await self._get_client() - return client.get(ids) + results = client.get(ids) + return [ + { + **dp, + "id": dp.get("__id__"), + "created_at": dp.get("__created_at__"), + } + for dp in results + ] async def drop(self) -> dict[str, str]: """Drop all vector data from storage and clean up resources diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index cb302e8c..026d3f6e 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -1,7 +1,8 @@ import asyncio import json import os -import time +import datetime +from datetime import timezone from dataclasses import dataclass, field from typing import Any, Union, final import numpy as np @@ -105,7 +106,61 @@ class PostgreSQLDB: ): pass + async def _migrate_timestamp_columns(self): + """Migrate timestamp columns in tables to timezone-aware types, assuming original data is in UTC time""" + # Tables and columns that need migration + tables_to_migrate = { + "LIGHTRAG_VDB_ENTITY": ["create_time", "update_time"], + "LIGHTRAG_VDB_RELATION": ["create_time", "update_time"], + "LIGHTRAG_DOC_CHUNKS": ["create_time", "update_time"], + } + + for table_name, columns in tables_to_migrate.items(): + for column_name in columns: + try: + # Check if column exists + check_column_sql = f""" + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_name = '{table_name.lower()}' + AND column_name = '{column_name}' + """ + + column_info = await self.query(check_column_sql) + if not column_info: + logger.warning( + f"Column {table_name}.{column_name} does not exist, skipping migration" + ) + continue + + # Check column type + data_type = column_info.get("data_type") + if data_type == "timestamp with time zone": + logger.info( + f"Column {table_name}.{column_name} is already timezone-aware, no migration needed" + ) + continue + + # Execute migration, explicitly specifying UTC timezone for interpreting original data + logger.info( + f"Migrating {table_name}.{column_name} to timezone-aware type" + ) + migration_sql = f""" + ALTER TABLE {table_name} + ALTER COLUMN {column_name} TYPE TIMESTAMP(0) WITH TIME ZONE + USING {column_name} AT TIME ZONE 'UTC' + """ + + await self.execute(migration_sql) + logger.info( + f"Successfully migrated {table_name}.{column_name} to timezone-aware type" + ) + except Exception as e: + # Log error but don't interrupt the process + logger.warning(f"Failed to migrate {table_name}.{column_name}: {e}") + async def check_tables(self): + # First create all tables for k, v in TABLES.items(): try: await self.query(f"SELECT 1 FROM {k} LIMIT 1") @@ -141,6 +196,13 @@ class PostgreSQLDB: f"PostgreSQL, Failed to create index on table {k}, Got: {e}" ) + # After all tables are created, attempt to migrate timestamp fields + try: + await self._migrate_timestamp_columns() + except Exception as e: + logger.error(f"PostgreSQL, Failed to migrate timestamp columns: {e}") + # Don't throw an exception, allow the initialization process to continue + async def query( self, sql: str, @@ -544,7 +606,9 @@ class PGVectorStorage(BaseVectorStorage): await ClientManager.release_client(self.db) self.db = None - def _upsert_chunks(self, item: dict[str, Any]) -> tuple[str, dict[str, Any]]: + def _upsert_chunks( + self, item: dict[str, Any], current_time: datetime.datetime + ) -> tuple[str, dict[str, Any]]: try: upsert_sql = SQL_TEMPLATES["upsert_chunk"] data: dict[str, Any] = { @@ -556,6 +620,8 @@ class PGVectorStorage(BaseVectorStorage): "content": item["content"], "content_vector": json.dumps(item["__vector__"].tolist()), "file_path": item["file_path"], + "create_time": current_time, + "update_time": current_time, } except Exception as e: logger.error(f"Error to prepare upsert,\nsql: {e}\nitem: {item}") @@ -563,7 +629,9 @@ class PGVectorStorage(BaseVectorStorage): return upsert_sql, data - def _upsert_entities(self, item: dict[str, Any]) -> tuple[str, dict[str, Any]]: + def _upsert_entities( + self, item: dict[str, Any], current_time: datetime.datetime + ) -> tuple[str, dict[str, Any]]: upsert_sql = SQL_TEMPLATES["upsert_entity"] source_id = item["source_id"] if isinstance(source_id, str) and "" in source_id: @@ -579,10 +647,14 @@ class PGVectorStorage(BaseVectorStorage): "content_vector": json.dumps(item["__vector__"].tolist()), "chunk_ids": chunk_ids, "file_path": item.get("file_path", None), + "create_time": current_time, + "update_time": current_time, } return upsert_sql, data - def _upsert_relationships(self, item: dict[str, Any]) -> tuple[str, dict[str, Any]]: + def _upsert_relationships( + self, item: dict[str, Any], current_time: datetime.datetime + ) -> tuple[str, dict[str, Any]]: upsert_sql = SQL_TEMPLATES["upsert_relationship"] source_id = item["source_id"] if isinstance(source_id, str) and "" in source_id: @@ -599,6 +671,8 @@ class PGVectorStorage(BaseVectorStorage): "content_vector": json.dumps(item["__vector__"].tolist()), "chunk_ids": chunk_ids, "file_path": item.get("file_path", None), + "create_time": current_time, + "update_time": current_time, } return upsert_sql, data @@ -607,11 +681,11 @@ class PGVectorStorage(BaseVectorStorage): if not data: return - current_time = time.time() + # Get current time with UTC timezone + current_time = datetime.datetime.now(timezone.utc) list_data = [ { "__id__": k, - "__created_at__": current_time, **{k1: v1 for k1, v1 in v.items()}, } for k, v in data.items() @@ -630,11 +704,11 @@ class PGVectorStorage(BaseVectorStorage): d["__vector__"] = embeddings[i] for item in list_data: if is_namespace(self.namespace, NameSpace.VECTOR_STORE_CHUNKS): - upsert_sql, data = self._upsert_chunks(item) + upsert_sql, data = self._upsert_chunks(item, current_time) elif is_namespace(self.namespace, NameSpace.VECTOR_STORE_ENTITIES): - upsert_sql, data = self._upsert_entities(item) + upsert_sql, data = self._upsert_entities(item, current_time) elif is_namespace(self.namespace, NameSpace.VECTOR_STORE_RELATIONSHIPS): - upsert_sql, data = self._upsert_relationships(item) + upsert_sql, data = self._upsert_relationships(item, current_time) else: raise ValueError(f"{self.namespace} is not supported") @@ -726,41 +800,6 @@ class PGVectorStorage(BaseVectorStorage): except Exception as e: logger.error(f"Error deleting relations for entity {entity_name}: {e}") - async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]: - """Search for records with IDs starting with a specific prefix. - - Args: - prefix: The prefix to search for in record IDs - - Returns: - List of records with matching ID prefixes - """ - table_name = namespace_to_table_name(self.namespace) - if not table_name: - logger.error(f"Unknown namespace for prefix search: {self.namespace}") - return [] - - search_sql = f"SELECT * FROM {table_name} WHERE workspace=$1 AND id LIKE $2" - params = {"workspace": self.db.workspace, "prefix": f"{prefix}%"} - - try: - results = await self.db.query(search_sql, params, multirows=True) - logger.debug(f"Found {len(results)} records with prefix '{prefix}'") - - # Format results to match the expected return format - formatted_results = [] - for record in results: - formatted_record = dict(record) - # Ensure id field is available (for consistency with NanoVectorDB implementation) - if "id" not in formatted_record: - formatted_record["id"] = record["id"] - formatted_results.append(formatted_record) - - return formatted_results - except Exception as e: - logger.error(f"Error during prefix search for '{prefix}': {e}") - return [] - async def get_by_id(self, id: str) -> dict[str, Any] | None: """Get vector data by its ID @@ -775,7 +814,7 @@ class PGVectorStorage(BaseVectorStorage): logger.error(f"Unknown namespace for ID lookup: {self.namespace}") return None - query = f"SELECT * FROM {table_name} WHERE workspace=$1 AND id=$2" + query = f"SELECT *, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM {table_name} WHERE workspace=$1 AND id=$2" params = {"workspace": self.db.workspace, "id": id} try: @@ -805,7 +844,7 @@ class PGVectorStorage(BaseVectorStorage): return [] ids_str = ",".join([f"'{id}'" for id in ids]) - query = f"SELECT * FROM {table_name} WHERE workspace=$1 AND id IN ({ids_str})" + query = f"SELECT *, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM {table_name} WHERE workspace=$1 AND id IN ({ids_str})" params = {"workspace": self.db.workspace} try: @@ -992,8 +1031,28 @@ class PGDocStatusStorage(DocStatusStorage): if not data: return - sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content,content_summary,content_length,chunks_count,status,file_path) - values($1,$2,$3,$4,$5,$6,$7,$8) + def parse_datetime(dt_str): + if dt_str is None: + return None + if isinstance(dt_str, (datetime.date, datetime.datetime)): + # If it's a datetime object without timezone info, remove timezone info + if isinstance(dt_str, datetime.datetime): + # Remove timezone info, return naive datetime object + return dt_str.replace(tzinfo=None) + return dt_str + try: + # Process ISO format string with timezone + dt = datetime.datetime.fromisoformat(dt_str) + # Remove timezone info, return naive datetime object + return dt.replace(tzinfo=None) + except (ValueError, TypeError): + logger.warning(f"Unable to parse datetime string: {dt_str}") + return None + + # Modified SQL to include created_at and updated_at in both INSERT and UPDATE operations + # Both fields are updated from the input data in both INSERT and UPDATE cases + sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content,content_summary,content_length,chunks_count,status,file_path,created_at,updated_at) + values($1,$2,$3,$4,$5,$6,$7,$8,$9,$10) on conflict(id,workspace) do update set content = EXCLUDED.content, content_summary = EXCLUDED.content_summary, @@ -1001,8 +1060,13 @@ class PGDocStatusStorage(DocStatusStorage): chunks_count = EXCLUDED.chunks_count, status = EXCLUDED.status, file_path = EXCLUDED.file_path, - updated_at = CURRENT_TIMESTAMP""" + created_at = EXCLUDED.created_at, + updated_at = EXCLUDED.updated_at""" for k, v in data.items(): + # Remove timezone information, store utc time in db + created_at = parse_datetime(v.get("created_at")) + updated_at = parse_datetime(v.get("updated_at")) + # chunks_count is optional await self.db.execute( sql, @@ -1015,6 +1079,8 @@ class PGDocStatusStorage(DocStatusStorage): "chunks_count": v["chunks_count"] if "chunks_count" in v else -1, "status": v["status"], "file_path": v["file_path"], + "created_at": created_at, # Use the converted datetime object + "updated_at": updated_at, # Use the converted datetime object }, ) @@ -2194,8 +2260,8 @@ TABLES = { doc_name VARCHAR(1024), content TEXT, meta JSONB, - create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - update_time TIMESTAMP, + create_time TIMESTAMP(0) + update_time TIMESTAMP(0) CONSTRAINT LIGHTRAG_DOC_FULL_PK PRIMARY KEY (workspace, id) )""" }, @@ -2209,8 +2275,8 @@ TABLES = { content TEXT, content_vector VECTOR, file_path VARCHAR(256), - create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - update_time TIMESTAMP, + create_time TIMESTAMP(0) WITH TIME ZONE, + update_time TIMESTAMP(0) WITH TIME ZONE, CONSTRAINT LIGHTRAG_DOC_CHUNKS_PK PRIMARY KEY (workspace, id) )""" }, @@ -2221,8 +2287,8 @@ TABLES = { entity_name VARCHAR(255), content TEXT, content_vector VECTOR, - create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - update_time TIMESTAMP, + create_time TIMESTAMP(0) WITH TIME ZONE, + update_time TIMESTAMP(0) WITH TIME ZONE, chunk_ids VARCHAR(255)[] NULL, file_path TEXT NULL, CONSTRAINT LIGHTRAG_VDB_ENTITY_PK PRIMARY KEY (workspace, id) @@ -2236,8 +2302,8 @@ TABLES = { target_id VARCHAR(256), content TEXT, content_vector VECTOR, - create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - update_time TIMESTAMP, + create_time TIMESTAMP(0) WITH TIME ZONE, + update_time TIMESTAMP(0) WITH TIME ZONE, chunk_ids VARCHAR(255)[] NULL, file_path TEXT NULL, CONSTRAINT LIGHTRAG_VDB_RELATION_PK PRIMARY KEY (workspace, id) @@ -2265,8 +2331,8 @@ TABLES = { chunks_count int4 NULL, status varchar(64) NULL, file_path TEXT NULL, - created_at timestamp DEFAULT CURRENT_TIMESTAMP NULL, - updated_at timestamp DEFAULT CURRENT_TIMESTAMP NULL, + created_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NULL, + updated_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP NULL, CONSTRAINT LIGHTRAG_DOC_STATUS_PK PRIMARY KEY (workspace, id) )""" }, @@ -2313,8 +2379,9 @@ SQL_TEMPLATES = { update_time = CURRENT_TIMESTAMP """, "upsert_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens, - chunk_order_index, full_doc_id, content, content_vector, file_path) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + chunk_order_index, full_doc_id, content, content_vector, file_path, + create_time, update_time) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (workspace,id) DO UPDATE SET tokens=EXCLUDED.tokens, chunk_order_index=EXCLUDED.chunk_order_index, @@ -2322,23 +2389,23 @@ SQL_TEMPLATES = { content = EXCLUDED.content, content_vector=EXCLUDED.content_vector, file_path=EXCLUDED.file_path, - update_time = CURRENT_TIMESTAMP + update_time = EXCLUDED.update_time """, # SQL for VectorStorage "upsert_entity": """INSERT INTO LIGHTRAG_VDB_ENTITY (workspace, id, entity_name, content, - content_vector, chunk_ids, file_path) - VALUES ($1, $2, $3, $4, $5, $6::varchar[], $7) + content_vector, chunk_ids, file_path, create_time, update_time) + VALUES ($1, $2, $3, $4, $5, $6::varchar[], $7, $8, $9) ON CONFLICT (workspace,id) DO UPDATE SET entity_name=EXCLUDED.entity_name, content=EXCLUDED.content, content_vector=EXCLUDED.content_vector, chunk_ids=EXCLUDED.chunk_ids, file_path=EXCLUDED.file_path, - update_time=CURRENT_TIMESTAMP + update_time=EXCLUDED.update_time """, "upsert_relationship": """INSERT INTO LIGHTRAG_VDB_RELATION (workspace, id, source_id, - target_id, content, content_vector, chunk_ids, file_path) - VALUES ($1, $2, $3, $4, $5, $6, $7::varchar[], $8) + target_id, content, content_vector, chunk_ids, file_path, create_time, update_time) + VALUES ($1, $2, $3, $4, $5, $6, $7::varchar[], $8, $9, $10) ON CONFLICT (workspace,id) DO UPDATE SET source_id=EXCLUDED.source_id, target_id=EXCLUDED.target_id, @@ -2346,7 +2413,7 @@ SQL_TEMPLATES = { content_vector=EXCLUDED.content_vector, chunk_ids=EXCLUDED.chunk_ids, file_path=EXCLUDED.file_path, - update_time = CURRENT_TIMESTAMP + update_time = EXCLUDED.update_time """, "relationships": """ WITH relevant_chunks AS ( @@ -2354,9 +2421,9 @@ SQL_TEMPLATES = { FROM LIGHTRAG_DOC_CHUNKS WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[]) ) - SELECT source_id as src_id, target_id as tgt_id + SELECT source_id as src_id, target_id as tgt_id, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM ( - SELECT r.id, r.source_id, r.target_id, 1 - (r.content_vector <=> '[{embedding_string}]'::vector) as distance + SELECT r.id, r.source_id, r.target_id, r.create_time, 1 - (r.content_vector <=> '[{embedding_string}]'::vector) as distance FROM LIGHTRAG_VDB_RELATION r JOIN relevant_chunks c ON c.chunk_id = ANY(r.chunk_ids) WHERE r.workspace=$1 @@ -2371,9 +2438,9 @@ SQL_TEMPLATES = { FROM LIGHTRAG_DOC_CHUNKS WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[]) ) - SELECT entity_name FROM + SELECT entity_name, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM ( - SELECT e.id, e.entity_name, 1 - (e.content_vector <=> '[{embedding_string}]'::vector) as distance + SELECT e.id, e.entity_name, e.create_time, 1 - (e.content_vector <=> '[{embedding_string}]'::vector) as distance FROM LIGHTRAG_VDB_ENTITY e JOIN relevant_chunks c ON c.chunk_id = ANY(e.chunk_ids) WHERE e.workspace=$1 @@ -2388,9 +2455,9 @@ SQL_TEMPLATES = { FROM LIGHTRAG_DOC_CHUNKS WHERE $2::varchar[] IS NULL OR full_doc_id = ANY($2::varchar[]) ) - SELECT id, content, file_path FROM + SELECT id, content, file_path, EXTRACT(EPOCH FROM create_time)::BIGINT as created_at FROM ( - SELECT id, content, file_path, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance + SELECT id, content, file_path, create_time, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id IN (SELECT chunk_id FROM relevant_chunks) diff --git a/lightrag/kg/qdrant_impl.py b/lightrag/kg/qdrant_impl.py index e0d0c9c3..885a23ca 100644 --- a/lightrag/kg/qdrant_impl.py +++ b/lightrag/kg/qdrant_impl.py @@ -88,9 +88,15 @@ class QdrantVectorDBStorage(BaseVectorStorage): logger.info(f"Inserting {len(data)} to {self.namespace}") if not data: return + + import time + + current_time = int(time.time()) + list_data = [ { "id": k, + "created_at": current_time, **{k1: v1 for k1, v1 in v.items() if k1 in self.meta_fields}, } for k, v in data.items() @@ -137,7 +143,14 @@ class QdrantVectorDBStorage(BaseVectorStorage): logger.debug(f"query result: {results}") - return [{**dp.payload, "distance": dp.score} for dp in results] + return [ + { + **dp.payload, + "distance": dp.score, + "created_at": dp.payload.get("created_at"), + } + for dp in results + ] async def index_done_callback(self) -> None: # Qdrant handles persistence automatically @@ -236,46 +249,6 @@ class QdrantVectorDBStorage(BaseVectorStorage): except Exception as e: logger.error(f"Error deleting relations for {entity_name}: {e}") - async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]: - """Search for records with IDs starting with a specific prefix. - - Args: - prefix: The prefix to search for in record IDs - - Returns: - List of records with matching ID prefixes - """ - try: - # Use scroll method to find records with IDs starting with the prefix - results = self._client.scroll( - collection_name=self.namespace, - scroll_filter=models.Filter( - must=[ - models.FieldCondition( - key="id", match=models.MatchText(text=prefix, prefix=True) - ) - ] - ), - with_payload=True, - with_vectors=False, - limit=1000, # Adjust as needed for your use case - ) - - # Extract matching points - matching_records = results[0] - - # Format the results to match expected return format - formatted_results = [{**point.payload} for point in matching_records] - - logger.debug( - f"Found {len(formatted_results)} records with prefix '{prefix}'" - ) - return formatted_results - - except Exception as e: - logger.error(f"Error searching for prefix '{prefix}': {e}") - return [] - async def get_by_id(self, id: str) -> dict[str, Any] | None: """Get vector data by its ID @@ -299,7 +272,12 @@ class QdrantVectorDBStorage(BaseVectorStorage): if not result: return None - return result[0].payload + # Ensure the result contains created_at field + payload = result[0].payload + if "created_at" not in payload: + payload["created_at"] = None + + return payload except Exception as e: logger.error(f"Error retrieving vector data for ID {id}: {e}") return None @@ -327,7 +305,15 @@ class QdrantVectorDBStorage(BaseVectorStorage): with_payload=True, ) - return [point.payload for point in results] + # Ensure each result contains created_at field + payloads = [] + for point in results: + payload = point.payload + if "created_at" not in payload: + payload["created_at"] = None + payloads.append(payload) + + return payloads except Exception as e: logger.error(f"Error retrieving vector data for IDs {ids}: {e}") return [] diff --git a/lightrag/kg/tidb_impl.py b/lightrag/kg/tidb_impl.py index fb52b744..5eac42b5 100644 --- a/lightrag/kg/tidb_impl.py +++ b/lightrag/kg/tidb_impl.py @@ -2,7 +2,7 @@ import asyncio import os from dataclasses import dataclass, field from typing import Any, Union, final - +import time import numpy as np from lightrag.types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge @@ -44,7 +44,13 @@ class TiDB: logger.error(f"TiDB database error: {e}") raise + async def _migrate_timestamp_columns(self): + """Migrate timestamp columns in tables to timezone-aware types, assuming original data is in UTC""" + # Not implemented yet + pass + async def check_tables(self): + # First create all tables for k, v in TABLES.items(): try: await self.query(f"SELECT 1 FROM {k}".format(k=k)) @@ -58,6 +64,13 @@ class TiDB: logger.error(f"Failed to create table {k} in TiDB database") logger.error(f"TiDB database error: {e}") + # After all tables are created, try to migrate timestamp fields + try: + await self._migrate_timestamp_columns() + except Exception as e: + logger.error(f"TiDB, Failed to migrate timestamp columns: {e}") + # Don't raise exceptions, allow initialization process to continue + async def query( self, sql: str, params: dict = None, multirows: bool = False ) -> Union[dict, None]: @@ -244,6 +257,9 @@ class TiDBKVStorage(BaseKVStorage): for i, d in enumerate(list_data): d["__vector__"] = embeddings[i] + # Get current time as UNIX timestamp + current_time = int(time.time()) + merge_sql = SQL_TEMPLATES["upsert_chunk"] data = [] for item in list_data: @@ -256,6 +272,7 @@ class TiDBKVStorage(BaseKVStorage): "full_doc_id": item["full_doc_id"], "content_vector": f"{item['__vector__'].tolist()}", "workspace": self.db.workspace, + "timestamp": current_time, } ) await self.db.execute(merge_sql, data) @@ -325,7 +342,7 @@ class TiDBKVStorage(BaseKVStorage): if table_name != "LIGHTRAG_LLM_CACHE": return False - # 构建MySQL风格的IN查询 + # Build MySQL style IN query modes_list = ", ".join([f"'{mode}'" for mode in modes]) sql = f""" DELETE FROM {table_name} @@ -406,7 +423,6 @@ class TiDBVectorDBStorage(BaseVectorStorage): results = await self.db.query( SQL_TEMPLATES[self.namespace], params=params, multirows=True ) - print("vector search result:", results) if not results: return [] return results @@ -416,14 +432,18 @@ class TiDBVectorDBStorage(BaseVectorStorage): logger.info(f"Inserting {len(data)} to {self.namespace}") if not data: return - if is_namespace(self.namespace, NameSpace.VECTOR_STORE_CHUNKS): - return logger.info(f"Inserting {len(data)} vectors to {self.namespace}") + # Get current time as UNIX timestamp + import time + + current_time = int(time.time()) + list_data = [ { "id": k, + "timestamp": current_time, **{k1: v1 for k1, v1 in v.items()}, } for k, v in data.items() @@ -440,8 +460,20 @@ class TiDBVectorDBStorage(BaseVectorStorage): for i, d in enumerate(list_data): d["content_vector"] = embeddings[i] - if is_namespace(self.namespace, NameSpace.VECTOR_STORE_ENTITIES): - data = [] + if is_namespace(self.namespace, NameSpace.VECTOR_STORE_CHUNKS): + for item in list_data: + param = { + "id": item["id"], + "content": item["content"], + "tokens": item.get("tokens", 0), + "chunk_order_index": item.get("chunk_order_index", 0), + "full_doc_id": item.get("full_doc_id", ""), + "content_vector": f"{item['content_vector'].tolist()}", + "workspace": self.db.workspace, + "timestamp": item["timestamp"], + } + await self.db.execute(SQL_TEMPLATES["upsert_chunk"], param) + elif is_namespace(self.namespace, NameSpace.VECTOR_STORE_ENTITIES): for item in list_data: param = { "id": item["id"], @@ -449,20 +481,10 @@ class TiDBVectorDBStorage(BaseVectorStorage): "content": item["content"], "content_vector": f"{item['content_vector'].tolist()}", "workspace": self.db.workspace, + "timestamp": item["timestamp"], } - # update entity_id if node inserted by graph_storage_instance before - has = await self.db.query(SQL_TEMPLATES["has_entity"], param) - if has["cnt"] != 0: - await self.db.execute(SQL_TEMPLATES["update_entity"], param) - continue - - data.append(param) - if data: - merge_sql = SQL_TEMPLATES["insert_entity"] - await self.db.execute(merge_sql, data) - + await self.db.execute(SQL_TEMPLATES["upsert_entity"], param) elif is_namespace(self.namespace, NameSpace.VECTOR_STORE_RELATIONSHIPS): - data = [] for item in list_data: param = { "id": item["id"], @@ -471,17 +493,9 @@ class TiDBVectorDBStorage(BaseVectorStorage): "content": item["content"], "content_vector": f"{item['content_vector'].tolist()}", "workspace": self.db.workspace, + "timestamp": item["timestamp"], } - # update relation_id if node inserted by graph_storage_instance before - has = await self.db.query(SQL_TEMPLATES["has_relationship"], param) - if has["cnt"] != 0: - await self.db.execute(SQL_TEMPLATES["update_relationship"], param) - continue - - data.append(param) - if data: - merge_sql = SQL_TEMPLATES["insert_relationship"] - await self.db.execute(merge_sql, data) + await self.db.execute(SQL_TEMPLATES["upsert_relationship"], param) async def get_by_status(self, status: str) -> Union[list[dict[str, Any]], None]: SQL = SQL_TEMPLATES["get_by_status_" + self.namespace] @@ -573,55 +587,6 @@ class TiDBVectorDBStorage(BaseVectorStorage): except Exception as e: return {"status": "error", "message": str(e)} - async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]: - """Search for records with IDs starting with a specific prefix. - - Args: - prefix: The prefix to search for in record IDs - - Returns: - List of records with matching ID prefixes - """ - # Determine which table to query based on namespace - if self.namespace == NameSpace.VECTOR_STORE_ENTITIES: - sql_template = """ - SELECT entity_id as id, name as entity_name, entity_type, description, content - FROM LIGHTRAG_GRAPH_NODES - WHERE entity_id LIKE :prefix_pattern AND workspace = :workspace - """ - elif self.namespace == NameSpace.VECTOR_STORE_RELATIONSHIPS: - sql_template = """ - SELECT relation_id as id, source_name as src_id, target_name as tgt_id, - keywords, description, content - FROM LIGHTRAG_GRAPH_EDGES - WHERE relation_id LIKE :prefix_pattern AND workspace = :workspace - """ - elif self.namespace == NameSpace.VECTOR_STORE_CHUNKS: - sql_template = """ - SELECT chunk_id as id, content, tokens, chunk_order_index, full_doc_id - FROM LIGHTRAG_DOC_CHUNKS - WHERE chunk_id LIKE :prefix_pattern AND workspace = :workspace - """ - else: - logger.warning( - f"Namespace {self.namespace} not supported for prefix search" - ) - return [] - - # Add prefix pattern parameter with % for SQL LIKE - prefix_pattern = f"{prefix}%" - params = {"prefix_pattern": prefix_pattern, "workspace": self.db.workspace} - - try: - results = await self.db.query(sql_template, params=params, multirows=True) - logger.debug( - f"Found {len(results) if results else 0} records with prefix '{prefix}'" - ) - return results if results else [] - except Exception as e: - logger.error(f"Error searching records with prefix '{prefix}': {e}") - return [] - async def get_by_id(self, id: str) -> dict[str, Any] | None: """Get vector data by its ID @@ -635,7 +600,8 @@ class TiDBVectorDBStorage(BaseVectorStorage): # Determine which table to query based on namespace if self.namespace == NameSpace.VECTOR_STORE_ENTITIES: sql_template = """ - SELECT entity_id as id, name as entity_name, entity_type, description, content + SELECT entity_id as id, name as entity_name, entity_type, description, content, + UNIX_TIMESTAMP(createtime) as created_at FROM LIGHTRAG_GRAPH_NODES WHERE entity_id = :entity_id AND workspace = :workspace """ @@ -643,14 +609,15 @@ class TiDBVectorDBStorage(BaseVectorStorage): elif self.namespace == NameSpace.VECTOR_STORE_RELATIONSHIPS: sql_template = """ SELECT relation_id as id, source_name as src_id, target_name as tgt_id, - keywords, description, content + keywords, description, content, UNIX_TIMESTAMP(createtime) as created_at FROM LIGHTRAG_GRAPH_EDGES WHERE relation_id = :relation_id AND workspace = :workspace """ params = {"relation_id": id, "workspace": self.db.workspace} elif self.namespace == NameSpace.VECTOR_STORE_CHUNKS: sql_template = """ - SELECT chunk_id as id, content, tokens, chunk_order_index, full_doc_id + SELECT chunk_id as id, content, tokens, chunk_order_index, full_doc_id, + UNIX_TIMESTAMP(createtime) as created_at FROM LIGHTRAG_DOC_CHUNKS WHERE chunk_id = :chunk_id AND workspace = :workspace """ @@ -686,20 +653,22 @@ class TiDBVectorDBStorage(BaseVectorStorage): # Determine which table to query based on namespace if self.namespace == NameSpace.VECTOR_STORE_ENTITIES: sql_template = f""" - SELECT entity_id as id, name as entity_name, entity_type, description, content + SELECT entity_id as id, name as entity_name, entity_type, description, content, + UNIX_TIMESTAMP(createtime) as created_at FROM LIGHTRAG_GRAPH_NODES WHERE entity_id IN ({ids_str}) AND workspace = :workspace """ elif self.namespace == NameSpace.VECTOR_STORE_RELATIONSHIPS: sql_template = f""" SELECT relation_id as id, source_name as src_id, target_name as tgt_id, - keywords, description, content + keywords, description, content, UNIX_TIMESTAMP(createtime) as created_at FROM LIGHTRAG_GRAPH_EDGES WHERE relation_id IN ({ids_str}) AND workspace = :workspace """ elif self.namespace == NameSpace.VECTOR_STORE_CHUNKS: sql_template = f""" - SELECT chunk_id as id, content, tokens, chunk_order_index, full_doc_id + SELECT chunk_id as id, content, tokens, chunk_order_index, full_doc_id, + UNIX_TIMESTAMP(createtime) as created_at FROM LIGHTRAG_DOC_CHUNKS WHERE chunk_id IN ({ids_str}) AND workspace = :workspace """ @@ -1086,8 +1055,8 @@ TABLES = { `tokens` INT, `content` LONGTEXT, `content_vector` VECTOR, - `createtime` DATETIME DEFAULT CURRENT_TIMESTAMP, - `updatetime` DATETIME DEFAULT NULL, + `createtime` TIMESTAMP, + `updatetime` TIMESTAMP, UNIQUE KEY (`chunk_id`) ); """ @@ -1104,8 +1073,8 @@ TABLES = { `source_chunk_id` VARCHAR(256), `content` LONGTEXT, `content_vector` VECTOR, - `createtime` DATETIME DEFAULT CURRENT_TIMESTAMP, - `updatetime` DATETIME DEFAULT NULL, + `createtime` TIMESTAMP, + `updatetime` TIMESTAMP, KEY (`entity_id`) ); """ @@ -1124,8 +1093,8 @@ TABLES = { `source_chunk_id` varchar(256), `content` LONGTEXT, `content_vector` VECTOR, - `createtime` DATETIME DEFAULT CURRENT_TIMESTAMP, - `updatetime` DATETIME DEFAULT NULL, + `createtime` TIMESTAMP, + `updatetime` TIMESTAMP, KEY (`relation_id`) ); """ @@ -1159,25 +1128,25 @@ SQL_TEMPLATES = { ON DUPLICATE KEY UPDATE content = VALUES(content), workspace = VALUES(workspace), updatetime = CURRENT_TIMESTAMP """, "upsert_chunk": """ - INSERT INTO LIGHTRAG_DOC_CHUNKS(chunk_id, content, tokens, chunk_order_index, full_doc_id, content_vector, workspace) - VALUES (:id, :content, :tokens, :chunk_order_index, :full_doc_id, :content_vector, :workspace) + INSERT INTO LIGHTRAG_DOC_CHUNKS(chunk_id, content, tokens, chunk_order_index, full_doc_id, content_vector, workspace, createtime, updatetime) + VALUES (:id, :content, :tokens, :chunk_order_index, :full_doc_id, :content_vector, :workspace, FROM_UNIXTIME(:timestamp), FROM_UNIXTIME(:timestamp)) ON DUPLICATE KEY UPDATE content = VALUES(content), tokens = VALUES(tokens), chunk_order_index = VALUES(chunk_order_index), - full_doc_id = VALUES(full_doc_id), content_vector = VALUES(content_vector), workspace = VALUES(workspace), updatetime = CURRENT_TIMESTAMP + full_doc_id = VALUES(full_doc_id), content_vector = VALUES(content_vector), workspace = VALUES(workspace), updatetime = FROM_UNIXTIME(:timestamp) """, # SQL for VectorStorage - "entities": """SELECT n.name as entity_name FROM - (SELECT entity_id as id, name, VEC_COSINE_DISTANCE(content_vector,:embedding_string) as distance + "entities": """SELECT n.name as entity_name, UNIX_TIMESTAMP(n.createtime) as created_at FROM + (SELECT entity_id as id, name, createtime, VEC_COSINE_DISTANCE(content_vector,:embedding_string) as distance FROM LIGHTRAG_GRAPH_NODES WHERE workspace = :workspace) n WHERE n.distance>:better_than_threshold ORDER BY n.distance DESC LIMIT :top_k """, - "relationships": """SELECT e.source_name as src_id, e.target_name as tgt_id FROM - (SELECT source_name, target_name, VEC_COSINE_DISTANCE(content_vector, :embedding_string) as distance + "relationships": """SELECT e.source_name as src_id, e.target_name as tgt_id, UNIX_TIMESTAMP(e.createtime) as created_at FROM + (SELECT source_name, target_name, createtime, VEC_COSINE_DISTANCE(content_vector, :embedding_string) as distance FROM LIGHTRAG_GRAPH_EDGES WHERE workspace = :workspace) e WHERE e.distance>:better_than_threshold ORDER BY e.distance DESC LIMIT :top_k """, - "chunks": """SELECT c.id FROM - (SELECT chunk_id as id,VEC_COSINE_DISTANCE(content_vector, :embedding_string) as distance + "chunks": """SELECT c.id, UNIX_TIMESTAMP(c.createtime) as created_at FROM + (SELECT chunk_id as id, createtime, VEC_COSINE_DISTANCE(content_vector, :embedding_string) as distance FROM LIGHTRAG_DOC_CHUNKS WHERE workspace = :workspace) c WHERE c.distance>:better_than_threshold ORDER BY c.distance DESC LIMIT :top_k """, @@ -1187,23 +1156,21 @@ SQL_TEMPLATES = { "has_relationship": """ SELECT COUNT(id) AS cnt FROM LIGHTRAG_GRAPH_EDGES WHERE source_name = :source_name AND target_name = :target_name AND workspace = :workspace """, - "update_entity": """ - UPDATE LIGHTRAG_GRAPH_NODES SET - entity_id = :id, content = :content, content_vector = :content_vector, updatetime = CURRENT_TIMESTAMP - WHERE workspace = :workspace AND name = :name + "upsert_entity": """ + INSERT INTO LIGHTRAG_GRAPH_NODES(entity_id, name, content, content_vector, workspace, createtime, updatetime) + VALUES(:id, :name, :content, :content_vector, :workspace, FROM_UNIXTIME(:timestamp), FROM_UNIXTIME(:timestamp)) + ON DUPLICATE KEY UPDATE + content = VALUES(content), + content_vector = VALUES(content_vector), + updatetime = FROM_UNIXTIME(:timestamp) """, - "update_relationship": """ - UPDATE LIGHTRAG_GRAPH_EDGES SET - relation_id = :id, content = :content, content_vector = :content_vector, updatetime = CURRENT_TIMESTAMP - WHERE workspace = :workspace AND source_name = :source_name AND target_name = :target_name - """, - "insert_entity": """ - INSERT INTO LIGHTRAG_GRAPH_NODES(entity_id, name, content, content_vector, workspace) - VALUES(:id, :name, :content, :content_vector, :workspace) - """, - "insert_relationship": """ - INSERT INTO LIGHTRAG_GRAPH_EDGES(relation_id, source_name, target_name, content, content_vector, workspace) - VALUES(:id, :source_name, :target_name, :content, :content_vector, :workspace) + "upsert_relationship": """ + INSERT INTO LIGHTRAG_GRAPH_EDGES(relation_id, source_name, target_name, content, content_vector, workspace, createtime, updatetime) + VALUES(:id, :source_name, :target_name, :content, :content_vector, :workspace, FROM_UNIXTIME(:timestamp), FROM_UNIXTIME(:timestamp)) + ON DUPLICATE KEY UPDATE + content = VALUES(content), + content_vector = VALUES(content_vector), + updatetime = FROM_UNIXTIME(:timestamp) """, # SQL for GraphStorage "get_node": """ @@ -1275,22 +1242,6 @@ SQL_TEMPLATES = { WHERE (source_name = :source AND target_name = :target) AND workspace = :workspace """, - # Search by prefix SQL templates - "search_entity_by_prefix": """ - SELECT entity_id as id, name as entity_name, entity_type, description, content - FROM LIGHTRAG_GRAPH_NODES - WHERE entity_id LIKE :prefix_pattern AND workspace = :workspace - """, - "search_relationship_by_prefix": """ - SELECT relation_id as id, source_name as src_id, target_name as tgt_id, keywords, description, content - FROM LIGHTRAG_GRAPH_EDGES - WHERE relation_id LIKE :prefix_pattern AND workspace = :workspace - """, - "search_chunk_by_prefix": """ - SELECT chunk_id as id, content, tokens, chunk_order_index, full_doc_id - FROM LIGHTRAG_DOC_CHUNKS - WHERE chunk_id LIKE :prefix_pattern AND workspace = :workspace - """, # Drop tables "drop_specifiy_table_workspace": "DELETE FROM {table_name} WHERE workspace = :workspace", } diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 7a79da31..c16a7b68 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -6,7 +6,7 @@ import configparser import os import warnings from dataclasses import asdict, dataclass, field -from datetime import datetime +from datetime import datetime, timezone from functools import partial from typing import ( Any, @@ -756,8 +756,8 @@ class LightRAG: "content": content_data["content"], "content_summary": get_content_summary(content_data["content"]), "content_length": len(content_data["content"]), - "created_at": datetime.now().isoformat(), - "updated_at": datetime.now().isoformat(), + "created_at": datetime.now(timezone.utc).isoformat(), + "updated_at": datetime.now(timezone.utc).isoformat(), "file_path": content_data[ "file_path" ], # Store file path in document status @@ -840,7 +840,7 @@ class LightRAG: { "busy": True, "job_name": "Default Job", - "job_start": datetime.now().isoformat(), + "job_start": datetime.now(timezone.utc).isoformat(), "docs": 0, "batchs": 0, # Total number of files to be processed "cur_batch": 0, # Number of files already processed @@ -958,7 +958,9 @@ class LightRAG: "content_summary": status_doc.content_summary, "content_length": status_doc.content_length, "created_at": status_doc.created_at, - "updated_at": datetime.now().isoformat(), + "updated_at": datetime.now( + timezone.utc + ).isoformat(), "file_path": file_path, } } @@ -1018,7 +1020,9 @@ class LightRAG: "content_summary": status_doc.content_summary, "content_length": status_doc.content_length, "created_at": status_doc.created_at, - "updated_at": datetime.now().isoformat(), + "updated_at": datetime.now( + timezone.utc + ).isoformat(), "file_path": file_path, } } @@ -1053,7 +1057,9 @@ class LightRAG: "content_summary": status_doc.content_summary, "content_length": status_doc.content_length, "created_at": status_doc.created_at, - "updated_at": datetime.now().isoformat(), + "updated_at": datetime.now( + timezone.utc + ).isoformat(), "file_path": file_path, } } diff --git a/lightrag/operate.py b/lightrag/operate.py index d82965e2..373d39bf 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -311,6 +311,7 @@ async def _merge_nodes_then_upsert( description=description, source_id=source_id, file_path=file_path, + created_at=int(time.time()), ) await knowledge_graph_inst.upsert_node( entity_name, @@ -422,6 +423,7 @@ async def _merge_edges_then_upsert( "description": description, "entity_type": "UNKNOWN", "file_path": file_path, + "created_at": int(time.time()), }, ) @@ -465,6 +467,7 @@ async def _merge_edges_then_upsert( keywords=keywords, source_id=source_id, file_path=file_path, + created_at=int(time.time()), ), ) @@ -1455,7 +1458,12 @@ async def _get_node_data( logger.warning("Some nodes are missing, maybe the storage is damaged") node_datas = [ - {**n, "entity_name": k["entity_name"], "rank": d} + { + **n, + "entity_name": k["entity_name"], + "rank": d, + "created_at": k.get("created_at"), + } for k, n, d in zip(results, node_datas, node_degrees) if n is not None ] # what is this text_chunks_db doing. dont remember it in airvx. check the diagram. @@ -1774,7 +1782,7 @@ async def _get_edge_data( "src_id": k["src_id"], "tgt_id": k["tgt_id"], "rank": edge_degrees_dict.get(pair, k.get("rank", 0)), - "created_at": k.get("__created_at__", None), + "created_at": k.get("created_at", None), **edge_props, } edge_datas.append(combined) @@ -1820,7 +1828,7 @@ async def _get_edge_data( ] ] for i, e in enumerate(edge_datas): - created_at = e.get("created_at", "Unknown") + created_at = e.get("created_at", "UNKNOWN") # Convert timestamp to readable format if isinstance(created_at, (int, float)): created_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(created_at)) @@ -1847,7 +1855,7 @@ async def _get_edge_data( ["id", "entity", "type", "description", "rank", "created_at", "file_path"] ] for i, n in enumerate(use_entities): - created_at = n.get("created_at", "Unknown") + created_at = n.get("created_at", "UNKNOWN") # Convert timestamp to readable format if isinstance(created_at, (int, float)): created_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(created_at)) diff --git a/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx b/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx index 5e902ecf..2a2c5d93 100644 --- a/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx +++ b/lightrag_webui/src/components/documents/PipelineStatusDialog.tsx @@ -158,7 +158,16 @@ export default function PipelineStatusDialog({
{t('documentPanel.pipelineStatus.jobName')}: {status?.job_name || '-'}
- {t('documentPanel.pipelineStatus.startTime')}: {status?.job_start ? new Date(status.job_start).toLocaleString() : '-'} + {t('documentPanel.pipelineStatus.startTime')}: {status?.job_start + ? new Date(status.job_start).toLocaleString(undefined, { + year: 'numeric', + month: 'numeric', + day: 'numeric', + hour: 'numeric', + minute: 'numeric', + second: 'numeric' + }) + : '-'} {t('documentPanel.pipelineStatus.progress')}: {status ? `${status.cur_batch}/${status.batchs} ${t('documentPanel.pipelineStatus.unit')}` : '-'}