diff --git a/README-zh.md b/README-zh.md index 784fd1f2..6b8cd550 100644 --- a/README-zh.md +++ b/README-zh.md @@ -11,7 +11,6 @@ - [X] [2024.12.31]🎯📢LightRAG现在支持[通过文档ID删除](https://github.com/HKUDS/LightRAG?tab=readme-ov-file#delete)。 - [X] [2024.11.25]🎯📢LightRAG现在支持无缝集成[自定义知识图谱](https://github.com/HKUDS/LightRAG?tab=readme-ov-file#insert-custom-kg),使用户能够用自己的领域专业知识增强系统。 - [X] [2024.11.19]🎯📢LightRAG的综合指南现已在[LearnOpenCV](https://learnopencv.com/lightrag)上发布。非常感谢博客作者。 -- [X] [2024.11.12]🎯📢LightRAG现在支持[Oracle Database 23ai的所有存储类型(KV、向量和图)](https://github.com/HKUDS/LightRAG/blob/main/examples/lightrag_oracle_demo.py)。 - [X] [2024.11.11]🎯📢LightRAG现在支持[通过实体名称删除实体](https://github.com/HKUDS/LightRAG?tab=readme-ov-file#delete)。 - [X] [2024.11.09]🎯📢推出[LightRAG Gui](https://lightrag-gui.streamlit.app),允许您插入、查询、可视化和下载LightRAG知识。 - [X] [2024.11.04]🎯📢现在您可以[使用Neo4J进行存储](https://github.com/HKUDS/LightRAG?tab=readme-ov-file#using-neo4j-for-storage)。 @@ -1037,9 +1036,10 @@ rag.clear_cache(modes=["local"]) | **参数** | **类型** | **说明** | **默认值** | |--------------|----------|-----------------|-------------| | **working_dir** | `str` | 存储缓存的目录 | `lightrag_cache+timestamp` | -| **kv_storage** | `str` | 文档和文本块的存储类型。支持的类型:`JsonKVStorage`、`OracleKVStorage` | `JsonKVStorage` | -| **vector_storage** | `str` | 嵌入向量的存储类型。支持的类型:`NanoVectorDBStorage`、`OracleVectorDBStorage` | `NanoVectorDBStorage` | -| **graph_storage** | `str` | 图边和节点的存储类型。支持的类型:`NetworkXStorage`、`Neo4JStorage`、`OracleGraphStorage` | `NetworkXStorage` | +| **kv_storage** | `str` | Storage type for documents and text chunks. Supported types: `JsonKVStorage`,`PGKVStorage`,`RedisKVStorage`,`MongoKVStorage`,`TiDBKVStorage` | `JsonKVStorage` | +| **vector_storage** | `str` | Storage type for embedding vectors. Supported types: `NanoVectorDBStorage`,`PGVectorStorage`,`MilvusVectorDBStorage`,`ChromaVectorDBStorage`,`FaissVectorDBStorage`,`TiDBVectorDBStorage`,`MongoVectorDBStorage`,`QdrantVectorDBStorage` | `NanoVectorDBStorage` | +| **graph_storage** | `str` | Storage type for graph edges and nodes. Supported types: `NetworkXStorage`,`Neo4JStorage`,`PGGraphStorage`,`AGEStorage`,`GremlinStorage` | `NetworkXStorage` | +| **doc_status_storage** | `str` | Storage type for documents process status. Supported types: `JsonDocStatusStorage`,`PGDocStatusStorage`,`MongoDocStatusStorage` | `JsonDocStatusStorage` | | **chunk_token_size** | `int` | 拆分文档时每个块的最大令牌大小 | `1200` | | **chunk_overlap_token_size** | `int` | 拆分文档时两个块之间的重叠令牌大小 | `100` | | **tiktoken_model_name** | `str` | 用于计算令牌数的Tiktoken编码器的模型名称 | `gpt-4o-mini` | diff --git a/README.md b/README.md index d3716416..4b3fbd01 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,6 @@ - [X] [2024.12.31]🎯📢LightRAG now supports [deletion by document ID](https://github.com/HKUDS/LightRAG?tab=readme-ov-file#delete). - [X] [2024.11.25]🎯📢LightRAG now supports seamless integration of [custom knowledge graphs](https://github.com/HKUDS/LightRAG?tab=readme-ov-file#insert-custom-kg), empowering users to enhance the system with their own domain expertise. - [X] [2024.11.19]🎯📢A comprehensive guide to LightRAG is now available on [LearnOpenCV](https://learnopencv.com/lightrag). Many thanks to the blog author. -- [X] [2024.11.12]🎯📢LightRAG now supports [Oracle Database 23ai for all storage types (KV, vector, and graph)](https://github.com/HKUDS/LightRAG/blob/main/examples/lightrag_oracle_demo.py). - [X] [2024.11.11]🎯📢LightRAG now supports [deleting entities by their names](https://github.com/HKUDS/LightRAG?tab=readme-ov-file#delete). - [X] [2024.11.09]🎯📢Introducing the [LightRAG Gui](https://lightrag-gui.streamlit.app), which allows you to insert, query, visualize, and download LightRAG knowledge. - [X] [2024.11.04]🎯📢You can now [use Neo4J for Storage](https://github.com/HKUDS/LightRAG?tab=readme-ov-file#using-neo4j-for-storage). @@ -1065,9 +1064,10 @@ Valid modes are: | **Parameter** | **Type** | **Explanation** | **Default** | |--------------|----------|-----------------|-------------| | **working_dir** | `str` | Directory where the cache will be stored | `lightrag_cache+timestamp` | -| **kv_storage** | `str` | Storage type for documents and text chunks. Supported types: `JsonKVStorage`, `OracleKVStorage` | `JsonKVStorage` | -| **vector_storage** | `str` | Storage type for embedding vectors. Supported types: `NanoVectorDBStorage`, `OracleVectorDBStorage` | `NanoVectorDBStorage` | -| **graph_storage** | `str` | Storage type for graph edges and nodes. Supported types: `NetworkXStorage`, `Neo4JStorage`, `OracleGraphStorage` | `NetworkXStorage` | +| **kv_storage** | `str` | Storage type for documents and text chunks. Supported types: `JsonKVStorage`,`PGKVStorage`,`RedisKVStorage`,`MongoKVStorage`,`TiDBKVStorage` | `JsonKVStorage` | +| **vector_storage** | `str` | Storage type for embedding vectors. Supported types: `NanoVectorDBStorage`,`PGVectorStorage`,`MilvusVectorDBStorage`,`ChromaVectorDBStorage`,`FaissVectorDBStorage`,`TiDBVectorDBStorage`,`MongoVectorDBStorage`,`QdrantVectorDBStorage` | `NanoVectorDBStorage` | +| **graph_storage** | `str` | Storage type for graph edges and nodes. Supported types: `NetworkXStorage`,`Neo4JStorage`,`PGGraphStorage`,`AGEStorage`,`GremlinStorage` | `NetworkXStorage` | +| **doc_status_storage** | `str` | Storage type for documents process status. Supported types: `JsonDocStatusStorage`,`PGDocStatusStorage`,`MongoDocStatusStorage` | `JsonDocStatusStorage` | | **chunk_token_size** | `int` | Maximum token size per chunk when splitting documents | `1200` | | **chunk_overlap_token_size** | `int` | Overlap token size between two chunks when splitting documents | `100` | | **tiktoken_model_name** | `str` | Model name for the Tiktoken encoder used to calculate token numbers | `gpt-4o-mini` | diff --git a/config.ini.example b/config.ini.example index 3041611e..8829566f 100644 --- a/config.ini.example +++ b/config.ini.example @@ -13,15 +13,6 @@ uri=redis://localhost:6379/1 [qdrant] uri = http://localhost:16333 -[oracle] -dsn = localhost:1521/XEPDB1 -user = your_username -password = your_password -config_dir = /path/to/oracle/config -wallet_location = /path/to/wallet # 可选 -wallet_password = your_wallet_password # 可选 -workspace = default # 可选,默认为default - [tidb] host = localhost port = 4000 diff --git a/env.example b/env.example index fb603279..af4c7f65 100644 --- a/env.example +++ b/env.example @@ -109,16 +109,6 @@ LIGHTRAG_VECTOR_STORAGE=NanoVectorDBStorage LIGHTRAG_GRAPH_STORAGE=NetworkXStorage LIGHTRAG_DOC_STATUS_STORAGE=JsonDocStatusStorage -### Oracle Database Configuration -ORACLE_DSN=localhost:1521/XEPDB1 -ORACLE_USER=your_username -ORACLE_PASSWORD='your_password' -ORACLE_CONFIG_DIR=/path/to/oracle/config -#ORACLE_WALLET_LOCATION=/path/to/wallet -#ORACLE_WALLET_PASSWORD='your_password' -### separating all data from difference Lightrag instances(deprecating) -#ORACLE_WORKSPACE=default - ### TiDB Configuration TIDB_HOST=localhost TIDB_PORT=4000 diff --git a/examples/lightrag_api_oracle_demo.py b/examples/lightrag_api_oracle_demo.py deleted file mode 100644 index 3a82f479..00000000 --- a/examples/lightrag_api_oracle_demo.py +++ /dev/null @@ -1,267 +0,0 @@ -from fastapi import FastAPI, HTTPException, File, UploadFile -from fastapi import Query -from contextlib import asynccontextmanager -from pydantic import BaseModel -from typing import Optional, Any - -import sys -import os - - -from pathlib import Path - -import asyncio -import nest_asyncio -from lightrag import LightRAG, QueryParam -from lightrag.llm.openai import openai_complete_if_cache, openai_embed -from lightrag.utils import EmbeddingFunc -import numpy as np -from lightrag.kg.shared_storage import initialize_pipeline_status - - -print(os.getcwd()) -script_directory = Path(__file__).resolve().parent.parent -sys.path.append(os.path.abspath(script_directory)) - - -# Apply nest_asyncio to solve event loop issues -nest_asyncio.apply() - -DEFAULT_RAG_DIR = "index_default" - - -# We use OpenAI compatible API to call LLM on Oracle Cloud -# More docs here https://github.com/jin38324/OCI_GenAI_access_gateway -BASE_URL = "http://xxx.xxx.xxx.xxx:8088/v1/" -APIKEY = "ocigenerativeai" - -# Configure working directory -WORKING_DIR = os.environ.get("RAG_DIR", f"{DEFAULT_RAG_DIR}") -print(f"WORKING_DIR: {WORKING_DIR}") -LLM_MODEL = os.environ.get("LLM_MODEL", "cohere.command-r-plus-08-2024") -print(f"LLM_MODEL: {LLM_MODEL}") -EMBEDDING_MODEL = os.environ.get("EMBEDDING_MODEL", "cohere.embed-multilingual-v3.0") -print(f"EMBEDDING_MODEL: {EMBEDDING_MODEL}") -EMBEDDING_MAX_TOKEN_SIZE = int(os.environ.get("EMBEDDING_MAX_TOKEN_SIZE", 512)) -print(f"EMBEDDING_MAX_TOKEN_SIZE: {EMBEDDING_MAX_TOKEN_SIZE}") - -if not os.path.exists(WORKING_DIR): - os.mkdir(WORKING_DIR) - -os.environ["ORACLE_USER"] = "" -os.environ["ORACLE_PASSWORD"] = "" -os.environ["ORACLE_DSN"] = "" -os.environ["ORACLE_CONFIG_DIR"] = "path_to_config_dir" -os.environ["ORACLE_WALLET_LOCATION"] = "path_to_wallet_location" -os.environ["ORACLE_WALLET_PASSWORD"] = "wallet_password" -os.environ["ORACLE_WORKSPACE"] = "company" - - -async def llm_model_func( - prompt, system_prompt=None, history_messages=[], keyword_extraction=False, **kwargs -) -> str: - return await openai_complete_if_cache( - LLM_MODEL, - prompt, - system_prompt=system_prompt, - history_messages=history_messages, - api_key=APIKEY, - base_url=BASE_URL, - **kwargs, - ) - - -async def embedding_func(texts: list[str]) -> np.ndarray: - return await openai_embed( - texts, - model=EMBEDDING_MODEL, - api_key=APIKEY, - base_url=BASE_URL, - ) - - -async def get_embedding_dim(): - test_text = ["This is a test sentence."] - embedding = await embedding_func(test_text) - embedding_dim = embedding.shape[1] - return embedding_dim - - -async def init(): - # Detect embedding dimension - embedding_dimension = await get_embedding_dim() - print(f"Detected embedding dimension: {embedding_dimension}") - # Create Oracle DB connection - # The `config` parameter is the connection configuration of Oracle DB - # More docs here https://python-oracledb.readthedocs.io/en/latest/user_guide/connection_handling.html - # We storage data in unified tables, so we need to set a `workspace` parameter to specify which docs we want to store and query - # Below is an example of how to connect to Oracle Autonomous Database on Oracle Cloud - - # Initialize LightRAG - # We use Oracle DB as the KV/vector/graph storage - rag = LightRAG( - enable_llm_cache=False, - working_dir=WORKING_DIR, - chunk_token_size=512, - llm_model_func=llm_model_func, - embedding_func=EmbeddingFunc( - embedding_dim=embedding_dimension, - max_token_size=512, - func=embedding_func, - ), - graph_storage="OracleGraphStorage", - kv_storage="OracleKVStorage", - vector_storage="OracleVectorDBStorage", - ) - - await rag.initialize_storages() - await initialize_pipeline_status() - - return rag - - -# Extract and Insert into LightRAG storage -# with open("./dickens/book.txt", "r", encoding="utf-8") as f: -# await rag.ainsert(f.read()) - -# # Perform search in different modes -# modes = ["naive", "local", "global", "hybrid"] -# for mode in modes: -# print("="*20, mode, "="*20) -# print(await rag.aquery("这篇文档是关于什么内容的?", param=QueryParam(mode=mode))) -# print("-"*100, "\n") - -# Data models - - -class QueryRequest(BaseModel): - query: str - mode: str = "hybrid" - only_need_context: bool = False - only_need_prompt: bool = False - - -class DataRequest(BaseModel): - limit: int = 100 - - -class InsertRequest(BaseModel): - text: str - - -class Response(BaseModel): - status: str - data: Optional[Any] = None - message: Optional[str] = None - - -# API routes - -rag = None - - -@asynccontextmanager -async def lifespan(app: FastAPI): - global rag - rag = await init() - print("done!") - yield - - -app = FastAPI( - title="LightRAG API", description="API for RAG operations", lifespan=lifespan -) - - -@app.post("/query", response_model=Response) -async def query_endpoint(request: QueryRequest): - # try: - # loop = asyncio.get_event_loop() - if request.mode == "naive": - top_k = 3 - else: - top_k = 60 - result = await rag.aquery( - request.query, - param=QueryParam( - mode=request.mode, - only_need_context=request.only_need_context, - only_need_prompt=request.only_need_prompt, - top_k=top_k, - ), - ) - return Response(status="success", data=result) - # except Exception as e: - # raise HTTPException(status_code=500, detail=str(e)) - - -@app.get("/data", response_model=Response) -async def query_all_nodes(type: str = Query("nodes"), limit: int = Query(100)): - if type == "nodes": - result = await rag.chunk_entity_relation_graph.get_all_nodes(limit=limit) - elif type == "edges": - result = await rag.chunk_entity_relation_graph.get_all_edges(limit=limit) - elif type == "statistics": - result = await rag.chunk_entity_relation_graph.get_statistics() - return Response(status="success", data=result) - - -@app.post("/insert", response_model=Response) -async def insert_endpoint(request: InsertRequest): - try: - loop = asyncio.get_event_loop() - await loop.run_in_executor(None, lambda: rag.insert(request.text)) - return Response(status="success", message="Text inserted successfully") - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - - -@app.post("/insert_file", response_model=Response) -async def insert_file(file: UploadFile = File(...)): - try: - file_content = await file.read() - # Read file content - try: - content = file_content.decode("utf-8") - except UnicodeDecodeError: - # If UTF-8 decoding fails, try other encodings - content = file_content.decode("gbk") - # Insert file content - loop = asyncio.get_event_loop() - await loop.run_in_executor(None, lambda: rag.insert(content)) - - return Response( - status="success", - message=f"File content from {file.filename} inserted successfully", - ) - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - - -@app.get("/health") -async def health_check(): - return {"status": "healthy"} - - -if __name__ == "__main__": - import uvicorn - - uvicorn.run(app, host="127.0.0.1", port=8020) - -# Usage example -# To run the server, use the following command in your terminal: -# python lightrag_api_openai_compatible_demo.py - -# Example requests: -# 1. Query: -# curl -X POST "http://127.0.0.1:8020/query" -H "Content-Type: application/json" -d '{"query": "your query here", "mode": "hybrid"}' - -# 2. Insert text: -# curl -X POST "http://127.0.0.1:8020/insert" -H "Content-Type: application/json" -d '{"text": "your text here"}' - -# 3. Insert file: -# curl -X POST "http://127.0.0.1:8020/insert_file" -H "Content-Type: multipart/form-data" -F "file=@path/to/your/file.txt" - - -# 4. Health check: -# curl -X GET "http://127.0.0.1:8020/health" diff --git a/examples/lightrag_oracle_demo.py b/examples/lightrag_oracle_demo.py deleted file mode 100644 index 6663f6a1..00000000 --- a/examples/lightrag_oracle_demo.py +++ /dev/null @@ -1,141 +0,0 @@ -import sys -import os -from pathlib import Path -import asyncio -from lightrag import LightRAG, QueryParam -from lightrag.llm.openai import openai_complete_if_cache, openai_embed -from lightrag.utils import EmbeddingFunc -import numpy as np -from lightrag.kg.shared_storage import initialize_pipeline_status - -print(os.getcwd()) -script_directory = Path(__file__).resolve().parent.parent -sys.path.append(os.path.abspath(script_directory)) - -WORKING_DIR = "./dickens" - -# We use OpenAI compatible API to call LLM on Oracle Cloud -# More docs here https://github.com/jin38324/OCI_GenAI_access_gateway -BASE_URL = "http://xxx.xxx.xxx.xxx:8088/v1/" -APIKEY = "ocigenerativeai" -CHATMODEL = "cohere.command-r-plus" -EMBEDMODEL = "cohere.embed-multilingual-v3.0" -CHUNK_TOKEN_SIZE = 1024 -MAX_TOKENS = 4000 - -if not os.path.exists(WORKING_DIR): - os.mkdir(WORKING_DIR) - -os.environ["ORACLE_USER"] = "username" -os.environ["ORACLE_PASSWORD"] = "xxxxxxxxx" -os.environ["ORACLE_DSN"] = "xxxxxxx_medium" -os.environ["ORACLE_CONFIG_DIR"] = "path_to_config_dir" -os.environ["ORACLE_WALLET_LOCATION"] = "path_to_wallet_location" -os.environ["ORACLE_WALLET_PASSWORD"] = "wallet_password" -os.environ["ORACLE_WORKSPACE"] = "company" - - -async def llm_model_func( - prompt, system_prompt=None, history_messages=[], keyword_extraction=False, **kwargs -) -> str: - return await openai_complete_if_cache( - CHATMODEL, - prompt, - system_prompt=system_prompt, - history_messages=history_messages, - api_key=APIKEY, - base_url=BASE_URL, - **kwargs, - ) - - -async def embedding_func(texts: list[str]) -> np.ndarray: - return await openai_embed( - texts, - model=EMBEDMODEL, - api_key=APIKEY, - base_url=BASE_URL, - ) - - -async def get_embedding_dim(): - test_text = ["This is a test sentence."] - embedding = await embedding_func(test_text) - embedding_dim = embedding.shape[1] - return embedding_dim - - -async def initialize_rag(): - # Detect embedding dimension - embedding_dimension = await get_embedding_dim() - print(f"Detected embedding dimension: {embedding_dimension}") - - # Initialize LightRAG - # We use Oracle DB as the KV/vector/graph storage - # You can add `addon_params={"example_number": 1, "language": "Simplfied Chinese"}` to control the prompt - rag = LightRAG( - # log_level="DEBUG", - working_dir=WORKING_DIR, - entity_extract_max_gleaning=1, - enable_llm_cache=True, - enable_llm_cache_for_entity_extract=True, - embedding_cache_config=None, # {"enabled": True,"similarity_threshold": 0.90}, - chunk_token_size=CHUNK_TOKEN_SIZE, - llm_model_max_token_size=MAX_TOKENS, - llm_model_func=llm_model_func, - embedding_func=EmbeddingFunc( - embedding_dim=embedding_dimension, - max_token_size=500, - func=embedding_func, - ), - graph_storage="OracleGraphStorage", - kv_storage="OracleKVStorage", - vector_storage="OracleVectorDBStorage", - addon_params={ - "example_number": 1, - "language": "Simplfied Chinese", - "entity_types": ["organization", "person", "geo", "event"], - "insert_batch_size": 2, - }, - ) - await rag.initialize_storages() - await initialize_pipeline_status() - - return rag - - -async def main(): - try: - # Initialize RAG instance - rag = await initialize_rag() - - # Extract and Insert into LightRAG storage - with open(WORKING_DIR + "/docs.txt", "r", encoding="utf-8") as f: - all_text = f.read() - texts = [x for x in all_text.split("\n") if x] - - # New mode use pipeline - await rag.apipeline_enqueue_documents(texts) - await rag.apipeline_process_enqueue_documents() - - # Old method use ainsert - # await rag.ainsert(texts) - - # Perform search in different modes - modes = ["naive", "local", "global", "hybrid"] - for mode in modes: - print("=" * 20, mode, "=" * 20) - print( - await rag.aquery( - "What are the top themes in this story?", - param=QueryParam(mode=mode), - ) - ) - print("-" * 100, "\n") - - except Exception as e: - print(f"An error occurred: {e}") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/lightrag/api/README-zh.md b/lightrag/api/README-zh.md index 4bf31a61..aca3d44a 100644 --- a/lightrag/api/README-zh.md +++ b/lightrag/api/README-zh.md @@ -291,11 +291,10 @@ LightRAG 使用 4 种类型的存储用于不同目的: ``` JsonKVStorage JsonFile(默认) -MongoKVStorage MogonDB -RedisKVStorage Redis -TiDBKVStorage TiDB PGKVStorage Postgres -OracleKVStorage Oracle +RedisKVStorage Redis +MongoKVStorage MogonDB +TiDBKVStorage TiDB ``` * GRAPH_STORAGE 支持的实现名称 @@ -303,25 +302,21 @@ OracleKVStorage Oracle ``` NetworkXStorage NetworkX(默认) Neo4JStorage Neo4J -MongoGraphStorage MongoDB -TiDBGraphStorage TiDB +PGGraphStorage Postgres AGEStorage AGE GremlinStorage Gremlin -PGGraphStorage Postgres -OracleGraphStorage Postgres ``` * VECTOR_STORAGE 支持的实现名称 ``` NanoVectorDBStorage NanoVector(默认) +PGVectorStorage Postgres MilvusVectorDBStorge Milvus ChromaVectorDBStorage Chroma -TiDBVectorDBStorage TiDB -PGVectorStorage Postgres FaissVectorDBStorage Faiss QdrantVectorDBStorage Qdrant -OracleVectorDBStorage Oracle +TiDBVectorDBStorage TiDB MongoVectorDBStorage MongoDB ``` diff --git a/lightrag/api/README.md b/lightrag/api/README.md index 8b2e8177..fdd15008 100644 --- a/lightrag/api/README.md +++ b/lightrag/api/README.md @@ -302,11 +302,10 @@ Each storage type have servals implementations: ``` JsonKVStorage JsonFile(default) -MongoKVStorage MogonDB -RedisKVStorage Redis -TiDBKVStorage TiDB PGKVStorage Postgres -OracleKVStorage Oracle +RedisKVStorage Redis +MongoKVStorage MogonDB +TiDBKVStorage TiDB ``` * GRAPH_STORAGE supported implement-name @@ -314,25 +313,21 @@ OracleKVStorage Oracle ``` NetworkXStorage NetworkX(defualt) Neo4JStorage Neo4J -MongoGraphStorage MongoDB -TiDBGraphStorage TiDB +PGGraphStorage Postgres AGEStorage AGE GremlinStorage Gremlin -PGGraphStorage Postgres -OracleGraphStorage Postgres ``` * VECTOR_STORAGE supported implement-name ``` NanoVectorDBStorage NanoVector(default) -MilvusVectorDBStorage Milvus -ChromaVectorDBStorage Chroma -TiDBVectorDBStorage TiDB PGVectorStorage Postgres +MilvusVectorDBStorge Milvus +ChromaVectorDBStorage Chroma FaissVectorDBStorage Faiss QdrantVectorDBStorage Qdrant -OracleVectorDBStorage Oracle +TiDBVectorDBStorage TiDB MongoVectorDBStorage MongoDB ``` diff --git a/lightrag/kg/__init__.py b/lightrag/kg/__init__.py index 91d42d81..445b9e22 100644 --- a/lightrag/kg/__init__.py +++ b/lightrag/kg/__init__.py @@ -6,7 +6,6 @@ STORAGE_IMPLEMENTATIONS = { "RedisKVStorage", "TiDBKVStorage", "PGKVStorage", - "OracleKVStorage", ], "required_methods": ["get_by_id", "upsert"], }, @@ -19,7 +18,6 @@ STORAGE_IMPLEMENTATIONS = { "AGEStorage", "GremlinStorage", "PGGraphStorage", - # "OracleGraphStorage", ], "required_methods": ["upsert_node", "upsert_edge"], }, @@ -32,7 +30,6 @@ STORAGE_IMPLEMENTATIONS = { "PGVectorStorage", "FaissVectorDBStorage", "QdrantVectorDBStorage", - "OracleVectorDBStorage", "MongoVectorDBStorage", ], "required_methods": ["query", "upsert"], @@ -41,7 +38,6 @@ STORAGE_IMPLEMENTATIONS = { "implementations": [ "JsonDocStatusStorage", "PGDocStatusStorage", - "PGDocStatusStorage", "MongoDocStatusStorage", ], "required_methods": ["get_docs_by_status"], @@ -56,12 +52,6 @@ STORAGE_ENV_REQUIREMENTS: dict[str, list[str]] = { "RedisKVStorage": ["REDIS_URI"], "TiDBKVStorage": ["TIDB_USER", "TIDB_PASSWORD", "TIDB_DATABASE"], "PGKVStorage": ["POSTGRES_USER", "POSTGRES_PASSWORD", "POSTGRES_DATABASE"], - "OracleKVStorage": [ - "ORACLE_DSN", - "ORACLE_USER", - "ORACLE_PASSWORD", - "ORACLE_CONFIG_DIR", - ], # Graph Storage Implementations "NetworkXStorage": [], "Neo4JStorage": ["NEO4J_URI", "NEO4J_USERNAME", "NEO4J_PASSWORD"], @@ -78,12 +68,6 @@ STORAGE_ENV_REQUIREMENTS: dict[str, list[str]] = { "POSTGRES_PASSWORD", "POSTGRES_DATABASE", ], - "OracleGraphStorage": [ - "ORACLE_DSN", - "ORACLE_USER", - "ORACLE_PASSWORD", - "ORACLE_CONFIG_DIR", - ], # Vector Storage Implementations "NanoVectorDBStorage": [], "MilvusVectorDBStorage": [], @@ -92,12 +76,6 @@ STORAGE_ENV_REQUIREMENTS: dict[str, list[str]] = { "PGVectorStorage": ["POSTGRES_USER", "POSTGRES_PASSWORD", "POSTGRES_DATABASE"], "FaissVectorDBStorage": [], "QdrantVectorDBStorage": ["QDRANT_URL"], # QDRANT_API_KEY has default value None - "OracleVectorDBStorage": [ - "ORACLE_DSN", - "ORACLE_USER", - "ORACLE_PASSWORD", - "ORACLE_CONFIG_DIR", - ], "MongoVectorDBStorage": [], # Document Status Storage Implementations "JsonDocStatusStorage": [], @@ -112,9 +90,6 @@ STORAGES = { "NanoVectorDBStorage": ".kg.nano_vector_db_impl", "JsonDocStatusStorage": ".kg.json_doc_status_impl", "Neo4JStorage": ".kg.neo4j_impl", - "OracleKVStorage": ".kg.oracle_impl", - "OracleGraphStorage": ".kg.oracle_impl", - "OracleVectorDBStorage": ".kg.oracle_impl", "MilvusVectorDBStorage": ".kg.milvus_impl", "MongoKVStorage": ".kg.mongo_impl", "MongoDocStatusStorage": ".kg.mongo_impl", diff --git a/lightrag/kg/oracle_impl.py b/lightrag/kg/oracle_impl.py deleted file mode 100644 index 7ba9f428..00000000 --- a/lightrag/kg/oracle_impl.py +++ /dev/null @@ -1,1463 +0,0 @@ -import array -import asyncio - -# import html -import os -from dataclasses import dataclass, field -from typing import Any, Union, final -import numpy as np -import configparser - -from lightrag.types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge - -from ..base import ( - BaseGraphStorage, - BaseKVStorage, - BaseVectorStorage, -) -from ..namespace import NameSpace, is_namespace -from ..utils import logger - -import pipmaster as pm - -if not pm.is_installed("graspologic"): - pm.install("graspologic") - -if not pm.is_installed("oracledb"): - pm.install("oracledb") - -from graspologic import embed -import oracledb # type: ignore - - -class OracleDB: - def __init__(self, config, **kwargs): - self.host = config.get("host", None) - self.port = config.get("port", None) - self.user = config.get("user", None) - self.password = config.get("password", None) - self.dsn = config.get("dsn", None) - self.config_dir = config.get("config_dir", None) - self.wallet_location = config.get("wallet_location", None) - self.wallet_password = config.get("wallet_password", None) - self.workspace = config.get("workspace", None) - self.max = 12 - self.increment = 1 - logger.info(f"Using the label {self.workspace} for Oracle Graph as identifier") - if self.user is None or self.password is None: - raise ValueError("Missing database user or password") - - try: - oracledb.defaults.fetch_lobs = False - - self.pool = oracledb.create_pool_async( - user=self.user, - password=self.password, - dsn=self.dsn, - config_dir=self.config_dir, - wallet_location=self.wallet_location, - wallet_password=self.wallet_password, - min=1, - max=self.max, - increment=self.increment, - ) - logger.info(f"Connected to Oracle database at {self.dsn}") - except Exception as e: - logger.error(f"Failed to connect to Oracle database at {self.dsn}") - logger.error(f"Oracle database error: {e}") - raise - - def numpy_converter_in(self, value): - """Convert numpy array to array.array""" - if value.dtype == np.float64: - dtype = "d" - elif value.dtype == np.float32: - dtype = "f" - else: - dtype = "b" - return array.array(dtype, value) - - def input_type_handler(self, cursor, value, arraysize): - """Set the type handler for the input data""" - if isinstance(value, np.ndarray): - return cursor.var( - oracledb.DB_TYPE_VECTOR, - arraysize=arraysize, - inconverter=self.numpy_converter_in, - ) - - def numpy_converter_out(self, value): - """Convert array.array to numpy array""" - if value.typecode == "b": - dtype = np.int8 - elif value.typecode == "f": - dtype = np.float32 - else: - dtype = np.float64 - return np.array(value, copy=False, dtype=dtype) - - def output_type_handler(self, cursor, metadata): - """Set the type handler for the output data""" - if metadata.type_code is oracledb.DB_TYPE_VECTOR: - return cursor.var( - metadata.type_code, - arraysize=cursor.arraysize, - outconverter=self.numpy_converter_out, - ) - - async def check_tables(self): - for k, v in TABLES.items(): - try: - if k.lower() == "lightrag_graph": - await self.query( - "SELECT id FROM GRAPH_TABLE (lightrag_graph MATCH (a) COLUMNS (a.id)) fetch first row only" - ) - else: - await self.query(f"SELECT 1 FROM {k}") - except Exception as e: - logger.error(f"Failed to check table {k} in Oracle database") - logger.error(f"Oracle database error: {e}") - try: - # print(v["ddl"]) - await self.execute(v["ddl"]) - logger.info(f"Created table {k} in Oracle database") - except Exception as e: - logger.error(f"Failed to create table {k} in Oracle database") - logger.error(f"Oracle database error: {e}") - - logger.info("Finished check all tables in Oracle database") - - async def query( - self, sql: str, params: dict = None, multirows: bool = False - ) -> Union[dict, None]: - async with self.pool.acquire() as connection: - connection.inputtypehandler = self.input_type_handler - connection.outputtypehandler = self.output_type_handler - with connection.cursor() as cursor: - try: - await cursor.execute(sql, params) - except Exception as e: - logger.error(f"Oracle database error: {e}") - raise - columns = [column[0].lower() for column in cursor.description] - if multirows: - rows = await cursor.fetchall() - if rows: - data = [dict(zip(columns, row)) for row in rows] - else: - data = [] - else: - row = await cursor.fetchone() - if row: - data = dict(zip(columns, row)) - else: - data = None - return data - - async def execute(self, sql: str, data: Union[list, dict] = None): - # logger.info("go into OracleDB execute method") - try: - async with self.pool.acquire() as connection: - connection.inputtypehandler = self.input_type_handler - connection.outputtypehandler = self.output_type_handler - with connection.cursor() as cursor: - if data is None: - await cursor.execute(sql) - else: - await cursor.execute(sql, data) - await connection.commit() - except Exception as e: - logger.error(f"Oracle database error: {e}") - raise - - -class ClientManager: - _instances: dict[str, Any] = {"db": None, "ref_count": 0} - _lock = asyncio.Lock() - - @staticmethod - def get_config() -> dict[str, Any]: - config = configparser.ConfigParser() - config.read("config.ini", "utf-8") - - return { - "user": os.environ.get( - "ORACLE_USER", - config.get("oracle", "user", fallback=None), - ), - "password": os.environ.get( - "ORACLE_PASSWORD", - config.get("oracle", "password", fallback=None), - ), - "dsn": os.environ.get( - "ORACLE_DSN", - config.get("oracle", "dsn", fallback=None), - ), - "config_dir": os.environ.get( - "ORACLE_CONFIG_DIR", - config.get("oracle", "config_dir", fallback=None), - ), - "wallet_location": os.environ.get( - "ORACLE_WALLET_LOCATION", - config.get("oracle", "wallet_location", fallback=None), - ), - "wallet_password": os.environ.get( - "ORACLE_WALLET_PASSWORD", - config.get("oracle", "wallet_password", fallback=None), - ), - "workspace": os.environ.get( - "ORACLE_WORKSPACE", - config.get("oracle", "workspace", fallback="default"), - ), - } - - @classmethod - async def get_client(cls) -> OracleDB: - async with cls._lock: - if cls._instances["db"] is None: - config = ClientManager.get_config() - db = OracleDB(config) - await db.check_tables() - cls._instances["db"] = db - cls._instances["ref_count"] = 0 - cls._instances["ref_count"] += 1 - return cls._instances["db"] - - @classmethod - async def release_client(cls, db: OracleDB): - async with cls._lock: - if db is not None: - if db is cls._instances["db"]: - cls._instances["ref_count"] -= 1 - if cls._instances["ref_count"] == 0: - await db.pool.close() - logger.info("Closed OracleDB database connection pool") - cls._instances["db"] = None - else: - await db.pool.close() - - -@final -@dataclass -class OracleKVStorage(BaseKVStorage): - db: OracleDB = field(default=None) - meta_fields = None - - def __post_init__(self): - self._data = {} - self._max_batch_size = self.global_config.get("embedding_batch_num", 10) - - async def initialize(self): - if self.db is None: - self.db = await ClientManager.get_client() - - async def finalize(self): - if self.db is not None: - await ClientManager.release_client(self.db) - self.db = None - - ################ QUERY METHODS ################ - - async def get_by_id(self, id: str) -> dict[str, Any] | None: - """Get doc_full data based on id.""" - SQL = SQL_TEMPLATES["get_by_id_" + self.namespace] - params = {"workspace": self.db.workspace, "id": id} - # print("get_by_id:"+SQL) - if is_namespace(self.namespace, NameSpace.KV_STORE_LLM_RESPONSE_CACHE): - array_res = await self.db.query(SQL, params, multirows=True) - res = {} - for row in array_res: - res[row["id"]] = row - if res: - return res - else: - return None - else: - return await self.db.query(SQL, params) - - async def get_by_mode_and_id(self, mode: str, id: str) -> Union[dict, None]: - """Specifically for llm_response_cache.""" - SQL = SQL_TEMPLATES["get_by_mode_id_" + self.namespace] - params = {"workspace": self.db.workspace, "cache_mode": mode, "id": id} - if is_namespace(self.namespace, NameSpace.KV_STORE_LLM_RESPONSE_CACHE): - array_res = await self.db.query(SQL, params, multirows=True) - res = {} - for row in array_res: - res[row["id"]] = row - return res - else: - return None - - async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: - """Get doc_chunks data based on id""" - SQL = SQL_TEMPLATES["get_by_ids_" + self.namespace].format( - ids=",".join([f"'{id}'" for id in ids]) - ) - params = {"workspace": self.db.workspace} - # print("get_by_ids:"+SQL) - res = await self.db.query(SQL, params, multirows=True) - if is_namespace(self.namespace, NameSpace.KV_STORE_LLM_RESPONSE_CACHE): - modes = set() - dict_res: dict[str, dict] = {} - for row in res: - modes.add(row["mode"]) - for mode in modes: - if mode not in dict_res: - dict_res[mode] = {} - for row in res: - dict_res[row["mode"]][row["id"]] = row - res = [{k: v} for k, v in dict_res.items()] - return res - - async def filter_keys(self, keys: set[str]) -> set[str]: - """Return keys that don't exist in storage""" - SQL = SQL_TEMPLATES["filter_keys"].format( - table_name=namespace_to_table_name(self.namespace), - ids=",".join([f"'{id}'" for id in keys]), - ) - params = {"workspace": self.db.workspace} - res = await self.db.query(SQL, params, multirows=True) - if res: - exist_keys = [key["id"] for key in res] - data = set([s for s in keys if s not in exist_keys]) - return data - else: - return set(keys) - - ################ INSERT METHODS ################ - async def upsert(self, data: dict[str, dict[str, Any]]) -> None: - logger.info(f"Inserting {len(data)} to {self.namespace}") - if not data: - return - - if is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS): - list_data = [ - { - "id": k, - **{k1: v1 for k1, v1 in v.items()}, - } - for k, v in data.items() - ] - contents = [v["content"] for v in data.values()] - batches = [ - contents[i : i + self._max_batch_size] - for i in range(0, len(contents), self._max_batch_size) - ] - embeddings_list = await asyncio.gather( - *[self.embedding_func(batch) for batch in batches] - ) - embeddings = np.concatenate(embeddings_list) - for i, d in enumerate(list_data): - d["__vector__"] = embeddings[i] - - merge_sql = SQL_TEMPLATES["merge_chunk"] - for item in list_data: - _data = { - "id": item["id"], - "content": item["content"], - "workspace": self.db.workspace, - "tokens": item["tokens"], - "chunk_order_index": item["chunk_order_index"], - "full_doc_id": item["full_doc_id"], - "content_vector": item["__vector__"], - "status": item["status"], - } - await self.db.execute(merge_sql, _data) - if is_namespace(self.namespace, NameSpace.KV_STORE_FULL_DOCS): - for k, v in data.items(): - # values.clear() - merge_sql = SQL_TEMPLATES["merge_doc_full"] - _data = { - "id": k, - "content": v["content"], - "workspace": self.db.workspace, - } - await self.db.execute(merge_sql, _data) - - if is_namespace(self.namespace, NameSpace.KV_STORE_LLM_RESPONSE_CACHE): - for mode, items in data.items(): - for k, v in items.items(): - upsert_sql = SQL_TEMPLATES["upsert_llm_response_cache"] - _data = { - "workspace": self.db.workspace, - "id": k, - "original_prompt": v["original_prompt"], - "return_value": v["return"], - "cache_mode": mode, - } - - await self.db.execute(upsert_sql, _data) - - async def index_done_callback(self) -> None: - # Oracle handles persistence automatically - pass - - async def delete(self, ids: list[str]) -> None: - """Delete records with specified IDs from the storage. - - Args: - ids: List of record IDs to be deleted - """ - if not ids: - return - - try: - table_name = namespace_to_table_name(self.namespace) - if not table_name: - logger.error(f"Unknown namespace for deletion: {self.namespace}") - return - - ids_list = ",".join([f"'{id}'" for id in ids]) - delete_sql = f"DELETE FROM {table_name} WHERE workspace=:workspace AND id IN ({ids_list})" - - await self.db.execute(delete_sql, {"workspace": self.db.workspace}) - logger.info( - f"Successfully deleted {len(ids)} records from {self.namespace}" - ) - except Exception as e: - logger.error(f"Error deleting records from {self.namespace}: {e}") - - async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: - """Delete specific records from storage by cache mode - - Args: - modes (list[str]): List of cache modes to be dropped from storage - - Returns: - bool: True if successful, False otherwise - """ - if not modes: - return False - - try: - table_name = namespace_to_table_name(self.namespace) - if not table_name: - return False - - if table_name != "LIGHTRAG_LLM_CACHE": - return False - - # 构建Oracle风格的IN查询 - modes_list = ", ".join([f"'{mode}'" for mode in modes]) - sql = f""" - DELETE FROM {table_name} - WHERE workspace = :workspace - AND cache_mode IN ({modes_list}) - """ - - logger.info(f"Deleting cache by modes: {modes}") - await self.db.execute(sql, {"workspace": self.db.workspace}) - return True - except Exception as e: - logger.error(f"Error deleting cache by modes {modes}: {e}") - return False - - async def drop(self) -> dict[str, str]: - """Drop the storage""" - try: - table_name = namespace_to_table_name(self.namespace) - if not table_name: - return { - "status": "error", - "message": f"Unknown namespace: {self.namespace}", - } - - drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( - table_name=table_name - ) - await self.db.execute(drop_sql, {"workspace": self.db.workspace}) - return {"status": "success", "message": "data dropped"} - except Exception as e: - return {"status": "error", "message": str(e)} - - -@final -@dataclass -class OracleVectorDBStorage(BaseVectorStorage): - db: OracleDB | None = field(default=None) - - def __post_init__(self): - config = self.global_config.get("vector_db_storage_cls_kwargs", {}) - cosine_threshold = config.get("cosine_better_than_threshold") - if cosine_threshold is None: - raise ValueError( - "cosine_better_than_threshold must be specified in vector_db_storage_cls_kwargs" - ) - self.cosine_better_than_threshold = cosine_threshold - - async def initialize(self): - if self.db is None: - self.db = await ClientManager.get_client() - - async def finalize(self): - if self.db is not None: - await ClientManager.release_client(self.db) - self.db = None - - #################### query method ############### - async def query( - self, query: str, top_k: int, ids: list[str] | None = None - ) -> list[dict[str, Any]]: - embeddings = await self.embedding_func([query]) - embedding = embeddings[0] - # 转换精度 - dtype = str(embedding.dtype).upper() - dimension = embedding.shape[0] - embedding_string = "[" + ", ".join(map(str, embedding.tolist())) + "]" - - SQL = SQL_TEMPLATES[self.namespace].format(dimension=dimension, dtype=dtype) - params = { - "embedding_string": embedding_string, - "workspace": self.db.workspace, - "top_k": top_k, - "better_than_threshold": self.cosine_better_than_threshold, - } - results = await self.db.query(SQL, params=params, multirows=True) - return results - - async def upsert(self, data: dict[str, dict[str, Any]]) -> None: - raise NotImplementedError - - async def index_done_callback(self) -> None: - # Oracles handles persistence automatically - pass - - async def delete(self, ids: list[str]) -> None: - """Delete vectors with specified IDs - - Args: - ids: List of vector IDs to be deleted - """ - if not ids: - return - - try: - SQL = SQL_TEMPLATES["delete_vectors"].format( - ids=",".join([f"'{id}'" for id in ids]) - ) - params = {"workspace": self.db.workspace} - await self.db.execute(SQL, params) - logger.info( - f"Successfully deleted {len(ids)} vectors from {self.namespace}" - ) - except Exception as e: - logger.error(f"Error while deleting vectors from {self.namespace}: {e}") - raise - - async def delete_entity(self, entity_name: str) -> None: - """Delete entity by name - - Args: - entity_name: Name of the entity to delete - """ - try: - SQL = SQL_TEMPLATES["delete_entity"] - params = {"workspace": self.db.workspace, "entity_name": entity_name} - await self.db.execute(SQL, params) - logger.info(f"Successfully deleted entity {entity_name}") - except Exception as e: - logger.error(f"Error deleting entity {entity_name}: {e}") - raise - - async def delete_entity_relation(self, entity_name: str) -> None: - """Delete all relations connected to an entity - - Args: - entity_name: Name of the entity whose relations should be deleted - """ - try: - SQL = SQL_TEMPLATES["delete_entity_relations"] - params = {"workspace": self.db.workspace, "entity_name": entity_name} - await self.db.execute(SQL, params) - logger.info(f"Successfully deleted relations for entity {entity_name}") - except Exception as e: - logger.error(f"Error deleting relations for entity {entity_name}: {e}") - raise - - async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]: - """Search for records with IDs starting with a specific prefix. - - Args: - prefix: The prefix to search for in record IDs - - Returns: - List of records with matching ID prefixes - """ - try: - # Determine the appropriate table based on namespace - table_name = namespace_to_table_name(self.namespace) - - # Create SQL query to find records with IDs starting with prefix - search_sql = f""" - SELECT * FROM {table_name} - WHERE workspace = :workspace - AND id LIKE :prefix_pattern - ORDER BY id - """ - - params = {"workspace": self.db.workspace, "prefix_pattern": f"{prefix}%"} - - # Execute query and get results - results = await self.db.query(search_sql, params, multirows=True) - - logger.debug( - f"Found {len(results) if results else 0} records with prefix '{prefix}'" - ) - return results or [] - - except Exception as e: - logger.error(f"Error searching records with prefix '{prefix}': {e}") - return [] - - async def get_by_id(self, id: str) -> dict[str, Any] | None: - """Get vector data by its ID - - Args: - id: The unique identifier of the vector - - Returns: - The vector data if found, or None if not found - """ - try: - # Determine the table name based on namespace - table_name = namespace_to_table_name(self.namespace) - if not table_name: - logger.error(f"Unknown namespace for ID lookup: {self.namespace}") - return None - - # Create the appropriate ID field name based on namespace - id_field = "entity_id" if "NODES" in table_name else "relation_id" - if "CHUNKS" in table_name: - id_field = "chunk_id" - - # Prepare and execute the query - query = f""" - SELECT * FROM {table_name} - WHERE {id_field} = :id AND workspace = :workspace - """ - params = {"id": id, "workspace": self.db.workspace} - - result = await self.db.query(query, params) - return result - except Exception as e: - logger.error(f"Error retrieving vector data for ID {id}: {e}") - return None - - async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: - """Get multiple vector data by their IDs - - Args: - ids: List of unique identifiers - - Returns: - List of vector data objects that were found - """ - if not ids: - return [] - - try: - # Determine the table name based on namespace - table_name = namespace_to_table_name(self.namespace) - if not table_name: - logger.error(f"Unknown namespace for IDs lookup: {self.namespace}") - return [] - - # Create the appropriate ID field name based on namespace - id_field = "entity_id" if "NODES" in table_name else "relation_id" - if "CHUNKS" in table_name: - id_field = "chunk_id" - - # Format the list of IDs for SQL IN clause - ids_list = ", ".join([f"'{id}'" for id in ids]) - - # Prepare and execute the query - query = f""" - SELECT * FROM {table_name} - WHERE {id_field} IN ({ids_list}) AND workspace = :workspace - """ - params = {"workspace": self.db.workspace} - - results = await self.db.query(query, params, multirows=True) - return results or [] - except Exception as e: - logger.error(f"Error retrieving vector data for IDs {ids}: {e}") - return [] - - async def drop(self) -> dict[str, str]: - """Drop the storage""" - try: - table_name = namespace_to_table_name(self.namespace) - if not table_name: - return { - "status": "error", - "message": f"Unknown namespace: {self.namespace}", - } - - drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( - table_name=table_name - ) - await self.db.execute(drop_sql, {"workspace": self.db.workspace}) - return {"status": "success", "message": "data dropped"} - except Exception as e: - return {"status": "error", "message": str(e)} - - -@final -@dataclass -class OracleGraphStorage(BaseGraphStorage): - db: OracleDB = field(default=None) - - def __post_init__(self): - self._max_batch_size = self.global_config.get("embedding_batch_num", 10) - - async def initialize(self): - if self.db is None: - self.db = await ClientManager.get_client() - - async def finalize(self): - if self.db is not None: - await ClientManager.release_client(self.db) - self.db = None - - #################### insert method ################ - - async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None: - entity_name = node_id - entity_type = node_data["entity_type"] - description = node_data["description"] - source_id = node_data["source_id"] - logger.debug(f"entity_name:{entity_name}, entity_type:{entity_type}") - - content = entity_name + description - contents = [content] - batches = [ - contents[i : i + self._max_batch_size] - for i in range(0, len(contents), self._max_batch_size) - ] - embeddings_list = await asyncio.gather( - *[self.embedding_func(batch) for batch in batches] - ) - embeddings = np.concatenate(embeddings_list) - content_vector = embeddings[0] - merge_sql = SQL_TEMPLATES["merge_node"] - data = { - "workspace": self.db.workspace, - "name": entity_name, - "entity_type": entity_type, - "description": description, - "source_chunk_id": source_id, - "content": content, - "content_vector": content_vector, - } - await self.db.execute(merge_sql, data) - # self._graph.add_node(node_id, **node_data) - - async def upsert_edge( - self, source_node_id: str, target_node_id: str, edge_data: dict[str, str] - ) -> None: - """插入或更新边""" - # print("go into upsert edge method") - source_name = source_node_id - target_name = target_node_id - weight = edge_data["weight"] - keywords = edge_data["keywords"] - description = edge_data["description"] - source_chunk_id = edge_data["source_id"] - logger.debug( - f"source_name:{source_name}, target_name:{target_name}, keywords: {keywords}" - ) - - content = keywords + source_name + target_name + description - contents = [content] - batches = [ - contents[i : i + self._max_batch_size] - for i in range(0, len(contents), self._max_batch_size) - ] - embeddings_list = await asyncio.gather( - *[self.embedding_func(batch) for batch in batches] - ) - embeddings = np.concatenate(embeddings_list) - content_vector = embeddings[0] - merge_sql = SQL_TEMPLATES["merge_edge"] - data = { - "workspace": self.db.workspace, - "source_name": source_name, - "target_name": target_name, - "weight": weight, - "keywords": keywords, - "description": description, - "source_chunk_id": source_chunk_id, - "content": content, - "content_vector": content_vector, - } - # print(merge_sql) - await self.db.execute(merge_sql, data) - # self._graph.add_edge(source_node_id, target_node_id, **edge_data) - - async def embed_nodes( - self, algorithm: str - ) -> tuple[np.ndarray[Any, Any], list[str]]: - if algorithm not in self._node_embed_algorithms: - raise ValueError(f"Node embedding algorithm {algorithm} not supported") - return await self._node_embed_algorithms[algorithm]() - - async def _node2vec_embed(self): - """为节点生成向量""" - embeddings, nodes = embed.node2vec_embed( - self._graph, - **self.config["node2vec_params"], - ) - - nodes_ids = [self._graph.nodes[node_id]["id"] for node_id in nodes] - return embeddings, nodes_ids - - async def index_done_callback(self) -> None: - # Oracles handles persistence automatically - pass - - #################### query method ################# - async def has_node(self, node_id: str) -> bool: - """根据节点id检查节点是否存在""" - SQL = SQL_TEMPLATES["has_node"] - params = {"workspace": self.db.workspace, "node_id": node_id} - res = await self.db.query(SQL, params) - if res: - # print("Node exist!",res) - return True - else: - # print("Node not exist!") - return False - - async def has_edge(self, source_node_id: str, target_node_id: str) -> bool: - SQL = SQL_TEMPLATES["has_edge"] - params = { - "workspace": self.db.workspace, - "source_node_id": source_node_id, - "target_node_id": target_node_id, - } - res = await self.db.query(SQL, params) - if res: - # print("Edge exist!",res) - return True - else: - # print("Edge not exist!") - return False - - async def node_degree(self, node_id: str) -> int: - SQL = SQL_TEMPLATES["node_degree"] - params = {"workspace": self.db.workspace, "node_id": node_id} - res = await self.db.query(SQL, params) - if res: - return res["degree"] - else: - return 0 - - async def edge_degree(self, src_id: str, tgt_id: str) -> int: - """根据源和目标节点id获取边的度""" - degree = await self.node_degree(src_id) + await self.node_degree(tgt_id) - return degree - - async def get_node(self, node_id: str) -> dict[str, str] | None: - """根据节点id获取节点数据""" - SQL = SQL_TEMPLATES["get_node"] - params = {"workspace": self.db.workspace, "node_id": node_id} - res = await self.db.query(SQL, params) - if res: - return res - else: - return None - - async def get_edge( - self, source_node_id: str, target_node_id: str - ) -> dict[str, str] | None: - SQL = SQL_TEMPLATES["get_edge"] - params = { - "workspace": self.db.workspace, - "source_node_id": source_node_id, - "target_node_id": target_node_id, - } - res = await self.db.query(SQL, params) - if res: - # print("Get edge!",self.db.workspace, source_node_id, target_node_id,res[0]) - return res - else: - # print("Edge not exist!",self.db.workspace, source_node_id, target_node_id) - return None - - async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None: - if await self.has_node(source_node_id): - SQL = SQL_TEMPLATES["get_node_edges"] - params = {"workspace": self.db.workspace, "source_node_id": source_node_id} - res = await self.db.query(sql=SQL, params=params, multirows=True) - if res: - data = [(i["source_name"], i["target_name"]) for i in res] - # print("Get node edge!",self.db.workspace, source_node_id,data) - return data - else: - # print("Node Edge not exist!",self.db.workspace, source_node_id) - return [] - - async def get_all_nodes(self, limit: int): - """查询所有节点""" - SQL = SQL_TEMPLATES["get_all_nodes"] - params = {"workspace": self.db.workspace, "limit": str(limit)} - res = await self.db.query(sql=SQL, params=params, multirows=True) - if res: - return res - - async def get_all_edges(self, limit: int): - """查询所有边""" - SQL = SQL_TEMPLATES["get_all_edges"] - params = {"workspace": self.db.workspace, "limit": str(limit)} - res = await self.db.query(sql=SQL, params=params, multirows=True) - if res: - return res - - async def get_statistics(self): - SQL = SQL_TEMPLATES["get_statistics"] - params = {"workspace": self.db.workspace} - res = await self.db.query(sql=SQL, params=params, multirows=True) - if res: - return res - - async def delete_node(self, node_id: str) -> None: - """Delete a node from the graph - - Args: - node_id: ID of the node to delete - """ - try: - # First delete all relations connected to this node - delete_relations_sql = SQL_TEMPLATES["delete_entity_relations"] - params_relations = {"workspace": self.db.workspace, "entity_name": node_id} - await self.db.execute(delete_relations_sql, params_relations) - - # Then delete the node itself - delete_node_sql = SQL_TEMPLATES["delete_entity"] - params_node = {"workspace": self.db.workspace, "entity_name": node_id} - await self.db.execute(delete_node_sql, params_node) - - logger.info( - f"Successfully deleted node {node_id} and all its relationships" - ) - except Exception as e: - logger.error(f"Error deleting node {node_id}: {e}") - raise - - async def remove_nodes(self, nodes: list[str]) -> None: - """Delete multiple nodes from the graph - - Args: - nodes: List of node IDs to be deleted - """ - if not nodes: - return - - try: - for node in nodes: - # For each node, first delete all its relationships - delete_relations_sql = SQL_TEMPLATES["delete_entity_relations"] - params_relations = {"workspace": self.db.workspace, "entity_name": node} - await self.db.execute(delete_relations_sql, params_relations) - - # Then delete the node itself - delete_node_sql = SQL_TEMPLATES["delete_entity"] - params_node = {"workspace": self.db.workspace, "entity_name": node} - await self.db.execute(delete_node_sql, params_node) - - logger.info( - f"Successfully deleted {len(nodes)} nodes and their relationships" - ) - except Exception as e: - logger.error(f"Error during batch node deletion: {e}") - raise - - async def remove_edges(self, edges: list[tuple[str, str]]) -> None: - """Delete multiple edges from the graph - - Args: - edges: List of edges to be deleted, each edge is a (source, target) tuple - """ - if not edges: - return - - try: - for source, target in edges: - # Check if the edge exists before attempting to delete - if await self.has_edge(source, target): - # Delete the edge using a SQL query that matches both source and target - delete_edge_sql = """ - DELETE FROM LIGHTRAG_GRAPH_EDGES - WHERE workspace = :workspace - AND source_name = :source_name - AND target_name = :target_name - """ - params = { - "workspace": self.db.workspace, - "source_name": source, - "target_name": target, - } - await self.db.execute(delete_edge_sql, params) - - logger.info(f"Successfully deleted {len(edges)} edges from the graph") - except Exception as e: - logger.error(f"Error during batch edge deletion: {e}") - raise - - async def get_all_labels(self) -> list[str]: - """Get all unique entity types (labels) in the graph - - Returns: - List of unique entity types/labels - """ - try: - SQL = """ - SELECT DISTINCT entity_type - FROM LIGHTRAG_GRAPH_NODES - WHERE workspace = :workspace - ORDER BY entity_type - """ - params = {"workspace": self.db.workspace} - results = await self.db.query(SQL, params, multirows=True) - - if results: - labels = [row["entity_type"] for row in results] - return labels - else: - return [] - except Exception as e: - logger.error(f"Error retrieving entity types: {e}") - return [] - - async def drop(self) -> dict[str, str]: - """Drop the storage""" - try: - # 使用图形查询删除所有节点和关系 - delete_edges_sql = ( - """DELETE FROM LIGHTRAG_GRAPH_EDGES WHERE workspace=:workspace""" - ) - await self.db.execute(delete_edges_sql, {"workspace": self.db.workspace}) - - delete_nodes_sql = ( - """DELETE FROM LIGHTRAG_GRAPH_NODES WHERE workspace=:workspace""" - ) - await self.db.execute(delete_nodes_sql, {"workspace": self.db.workspace}) - - return {"status": "success", "message": "graph data dropped"} - except Exception as e: - logger.error(f"Error dropping graph: {e}") - return {"status": "error", "message": str(e)} - - async def get_knowledge_graph( - self, node_label: str, max_depth: int = 5 - ) -> KnowledgeGraph: - """Retrieve a connected subgraph starting from nodes matching the given label - - Maximum number of nodes is constrained by MAX_GRAPH_NODES environment variable. - Prioritizes nodes by: - 1. Nodes matching the specified label - 2. Nodes directly connected to matching nodes - 3. Node degree (number of connections) - - Args: - node_label: Label to match for starting nodes (use "*" for all nodes) - max_depth: Maximum depth of traversal from starting nodes - - Returns: - KnowledgeGraph object containing nodes and edges - """ - result = KnowledgeGraph() - - try: - # Define maximum number of nodes to return - max_graph_nodes = int(os.environ.get("MAX_GRAPH_NODES", 1000)) - - if node_label == "*": - # For "*" label, get all nodes up to the limit - nodes_sql = """ - SELECT name, entity_type, description, source_chunk_id - FROM LIGHTRAG_GRAPH_NODES - WHERE workspace = :workspace - ORDER BY id - FETCH FIRST :limit ROWS ONLY - """ - nodes_params = { - "workspace": self.db.workspace, - "limit": max_graph_nodes, - } - nodes = await self.db.query(nodes_sql, nodes_params, multirows=True) - else: - # For specific label, find matching nodes and related nodes - nodes_sql = """ - WITH matching_nodes AS ( - SELECT name - FROM LIGHTRAG_GRAPH_NODES - WHERE workspace = :workspace - AND (name LIKE '%' || :node_label || '%' OR entity_type LIKE '%' || :node_label || '%') - ) - SELECT n.name, n.entity_type, n.description, n.source_chunk_id, - CASE - WHEN n.name IN (SELECT name FROM matching_nodes) THEN 2 - WHEN EXISTS ( - SELECT 1 FROM LIGHTRAG_GRAPH_EDGES e - WHERE workspace = :workspace - AND ((e.source_name = n.name AND e.target_name IN (SELECT name FROM matching_nodes)) - OR (e.target_name = n.name AND e.source_name IN (SELECT name FROM matching_nodes))) - ) THEN 1 - ELSE 0 - END AS priority, - (SELECT COUNT(*) FROM LIGHTRAG_GRAPH_EDGES e - WHERE workspace = :workspace - AND (e.source_name = n.name OR e.target_name = n.name)) AS degree - FROM LIGHTRAG_GRAPH_NODES n - WHERE workspace = :workspace - ORDER BY priority DESC, degree DESC - FETCH FIRST :limit ROWS ONLY - """ - nodes_params = { - "workspace": self.db.workspace, - "node_label": node_label, - "limit": max_graph_nodes, - } - nodes = await self.db.query(nodes_sql, nodes_params, multirows=True) - - if not nodes: - logger.warning(f"No nodes found matching '{node_label}'") - return result - - # Create mapping of node IDs to be used to filter edges - node_names = [node["name"] for node in nodes] - - # Add nodes to result - seen_nodes = set() - for node in nodes: - node_id = node["name"] - if node_id in seen_nodes: - continue - - # Create node properties dictionary - properties = { - "entity_type": node["entity_type"], - "description": node["description"] or "", - "source_id": node["source_chunk_id"] or "", - } - - # Add node to result - result.nodes.append( - KnowledgeGraphNode( - id=node_id, labels=[node["entity_type"]], properties=properties - ) - ) - seen_nodes.add(node_id) - - # Get edges between these nodes - edges_sql = """ - SELECT source_name, target_name, weight, keywords, description, source_chunk_id - FROM LIGHTRAG_GRAPH_EDGES - WHERE workspace = :workspace - AND source_name IN (SELECT COLUMN_VALUE FROM TABLE(CAST(:node_names AS SYS.ODCIVARCHAR2LIST))) - AND target_name IN (SELECT COLUMN_VALUE FROM TABLE(CAST(:node_names AS SYS.ODCIVARCHAR2LIST))) - ORDER BY id - """ - edges_params = {"workspace": self.db.workspace, "node_names": node_names} - edges = await self.db.query(edges_sql, edges_params, multirows=True) - - # Add edges to result - seen_edges = set() - for edge in edges: - source = edge["source_name"] - target = edge["target_name"] - edge_id = f"{source}-{target}" - - if edge_id in seen_edges: - continue - - # Create edge properties dictionary - properties = { - "weight": edge["weight"] or 0.0, - "keywords": edge["keywords"] or "", - "description": edge["description"] or "", - "source_id": edge["source_chunk_id"] or "", - } - - # Add edge to result - result.edges.append( - KnowledgeGraphEdge( - id=edge_id, - type="RELATED", - source=source, - target=target, - properties=properties, - ) - ) - seen_edges.add(edge_id) - - logger.info( - f"Subgraph query successful | Node count: {len(result.nodes)} | Edge count: {len(result.edges)}" - ) - - except Exception as e: - logger.error(f"Error retrieving knowledge graph: {e}") - - return result - - -N_T = { - NameSpace.KV_STORE_FULL_DOCS: "LIGHTRAG_DOC_FULL", - NameSpace.KV_STORE_TEXT_CHUNKS: "LIGHTRAG_DOC_CHUNKS", - NameSpace.VECTOR_STORE_CHUNKS: "LIGHTRAG_DOC_CHUNKS", - NameSpace.VECTOR_STORE_ENTITIES: "LIGHTRAG_GRAPH_NODES", - NameSpace.VECTOR_STORE_RELATIONSHIPS: "LIGHTRAG_GRAPH_EDGES", -} - - -def namespace_to_table_name(namespace: str) -> str: - for k, v in N_T.items(): - if is_namespace(namespace, k): - return v - - -TABLES = { - "LIGHTRAG_DOC_FULL": { - "ddl": """CREATE TABLE LIGHTRAG_DOC_FULL ( - id varchar(256), - workspace varchar(1024), - doc_name varchar(1024), - content CLOB, - meta JSON, - content_summary varchar(1024), - content_length NUMBER, - status varchar(256), - chunks_count NUMBER, - createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updatetime TIMESTAMP DEFAULT NULL, - error varchar(4096) - )""" - }, - "LIGHTRAG_DOC_CHUNKS": { - "ddl": """CREATE TABLE LIGHTRAG_DOC_CHUNKS ( - id varchar(256), - workspace varchar(1024), - full_doc_id varchar(256), - status varchar(256), - chunk_order_index NUMBER, - tokens NUMBER, - content CLOB, - content_vector VECTOR, - createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updatetime TIMESTAMP DEFAULT NULL - )""" - }, - "LIGHTRAG_GRAPH_NODES": { - "ddl": """CREATE TABLE LIGHTRAG_GRAPH_NODES ( - id NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, - workspace varchar(1024), - name varchar(2048), - entity_type varchar(1024), - description CLOB, - source_chunk_id varchar(256), - content CLOB, - content_vector VECTOR, - createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updatetime TIMESTAMP DEFAULT NULL - )""" - }, - "LIGHTRAG_GRAPH_EDGES": { - "ddl": """CREATE TABLE LIGHTRAG_GRAPH_EDGES ( - id NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, - workspace varchar(1024), - source_name varchar(2048), - target_name varchar(2048), - weight NUMBER, - keywords CLOB, - description CLOB, - source_chunk_id varchar(256), - content CLOB, - content_vector VECTOR, - createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updatetime TIMESTAMP DEFAULT NULL - )""" - }, - "LIGHTRAG_LLM_CACHE": { - "ddl": """CREATE TABLE LIGHTRAG_LLM_CACHE ( - id varchar(256) PRIMARY KEY, - workspace varchar(1024), - cache_mode varchar(256), - model_name varchar(256), - original_prompt clob, - return_value clob, - embedding CLOB, - embedding_shape NUMBER, - embedding_min NUMBER, - embedding_max NUMBER, - createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updatetime TIMESTAMP DEFAULT NULL - )""" - }, - "LIGHTRAG_GRAPH": { - "ddl": """CREATE OR REPLACE PROPERTY GRAPH lightrag_graph - VERTEX TABLES ( - lightrag_graph_nodes KEY (id) - LABEL entity - PROPERTIES (id,workspace,name) -- ,entity_type,description,source_chunk_id) - ) - EDGE TABLES ( - lightrag_graph_edges KEY (id) - SOURCE KEY (source_name) REFERENCES lightrag_graph_nodes(name) - DESTINATION KEY (target_name) REFERENCES lightrag_graph_nodes(name) - LABEL has_relation - PROPERTIES (id,workspace,source_name,target_name) -- ,weight, keywords,description,source_chunk_id) - ) OPTIONS(ALLOW MIXED PROPERTY TYPES)""" - }, -} - - -SQL_TEMPLATES = { - # SQL for KVStorage - "get_by_id_full_docs": "select ID,content,status from LIGHTRAG_DOC_FULL where workspace=:workspace and ID=:id", - "get_by_id_text_chunks": "select ID,TOKENS,content,CHUNK_ORDER_INDEX,FULL_DOC_ID,status from LIGHTRAG_DOC_CHUNKS where workspace=:workspace and ID=:id", - "get_by_id_llm_response_cache": """SELECT id, original_prompt, NVL(return_value, '') as "return", cache_mode as "mode" - FROM LIGHTRAG_LLM_CACHE WHERE workspace=:workspace AND id=:id""", - "get_by_mode_id_llm_response_cache": """SELECT id, original_prompt, NVL(return_value, '') as "return", cache_mode as "mode" - FROM LIGHTRAG_LLM_CACHE WHERE workspace=:workspace AND cache_mode=:cache_mode AND id=:id""", - "get_by_ids_llm_response_cache": """SELECT id, original_prompt, NVL(return_value, '') as "return", cache_mode as "mode" - FROM LIGHTRAG_LLM_CACHE WHERE workspace=:workspace AND id IN ({ids})""", - "get_by_ids_full_docs": "select t.*,createtime as created_at from LIGHTRAG_DOC_FULL t where workspace=:workspace and ID in ({ids})", - "get_by_ids_text_chunks": "select ID,TOKENS,content,CHUNK_ORDER_INDEX,FULL_DOC_ID from LIGHTRAG_DOC_CHUNKS where workspace=:workspace and ID in ({ids})", - "get_by_status_ids_full_docs": "select id,status from LIGHTRAG_DOC_FULL t where workspace=:workspace AND status=:status and ID in ({ids})", - "get_by_status_ids_text_chunks": "select id,status from LIGHTRAG_DOC_CHUNKS where workspace=:workspace and status=:status ID in ({ids})", - "get_by_status_full_docs": "select id,status from LIGHTRAG_DOC_FULL t where workspace=:workspace AND status=:status", - "get_by_status_text_chunks": "select id,status from LIGHTRAG_DOC_CHUNKS where workspace=:workspace and status=:status", - "filter_keys": "select id from {table_name} where workspace=:workspace and id in ({ids})", - "merge_doc_full": """MERGE INTO LIGHTRAG_DOC_FULL a - USING DUAL - ON (a.id = :id and a.workspace = :workspace) - WHEN NOT MATCHED THEN - INSERT(id,content,workspace) values(:id,:content,:workspace)""", - "merge_chunk": """MERGE INTO LIGHTRAG_DOC_CHUNKS - USING DUAL - ON (id = :id and workspace = :workspace) - WHEN NOT MATCHED THEN INSERT - (id,content,workspace,tokens,chunk_order_index,full_doc_id,content_vector,status) - values (:id,:content,:workspace,:tokens,:chunk_order_index,:full_doc_id,:content_vector,:status) """, - "upsert_llm_response_cache": """MERGE INTO LIGHTRAG_LLM_CACHE a - USING DUAL - ON (a.id = :id) - WHEN NOT MATCHED THEN - INSERT (workspace,id,original_prompt,return_value,cache_mode) - VALUES (:workspace,:id,:original_prompt,:return_value,:cache_mode) - WHEN MATCHED THEN UPDATE - SET original_prompt = :original_prompt, - return_value = :return_value, - cache_mode = :cache_mode, - updatetime = SYSDATE""", - # SQL for VectorStorage - "entities": """SELECT name as entity_name FROM - (SELECT id,name,VECTOR_DISTANCE(content_vector,vector(:embedding_string,{dimension},{dtype}),COSINE) as distance - FROM LIGHTRAG_GRAPH_NODES WHERE workspace=:workspace) - WHERE distance>:better_than_threshold ORDER BY distance ASC FETCH FIRST :top_k ROWS ONLY""", - "relationships": """SELECT source_name as src_id, target_name as tgt_id FROM - (SELECT id,source_name,target_name,VECTOR_DISTANCE(content_vector,vector(:embedding_string,{dimension},{dtype}),COSINE) as distance - FROM LIGHTRAG_GRAPH_EDGES WHERE workspace=:workspace) - WHERE distance>:better_than_threshold ORDER BY distance ASC FETCH FIRST :top_k ROWS ONLY""", - "chunks": """SELECT id FROM - (SELECT id,VECTOR_DISTANCE(content_vector,vector(:embedding_string,{dimension},{dtype}),COSINE) as distance - FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=:workspace) - WHERE distance>:better_than_threshold ORDER BY distance ASC FETCH FIRST :top_k ROWS ONLY""", - # SQL for GraphStorage - "has_node": """SELECT * FROM GRAPH_TABLE (lightrag_graph - MATCH (a) - WHERE a.workspace=:workspace AND a.name=:node_id - COLUMNS (a.name))""", - "has_edge": """SELECT * FROM GRAPH_TABLE (lightrag_graph - MATCH (a) -[e]-> (b) - WHERE e.workspace=:workspace and a.workspace=:workspace and b.workspace=:workspace - AND a.name=:source_node_id AND b.name=:target_node_id - COLUMNS (e.source_name,e.target_name) )""", - "node_degree": """SELECT count(1) as degree FROM GRAPH_TABLE (lightrag_graph - MATCH (a)-[e]->(b) - WHERE e.workspace=:workspace and a.workspace=:workspace and b.workspace=:workspace - AND a.name=:node_id or b.name = :node_id - COLUMNS (a.name))""", - "get_node": """SELECT t1.name,t2.entity_type,t2.source_chunk_id as source_id,NVL(t2.description,'') AS description - FROM GRAPH_TABLE (lightrag_graph - MATCH (a) - WHERE a.workspace=:workspace AND a.name=:node_id - COLUMNS (a.name) - ) t1 JOIN LIGHTRAG_GRAPH_NODES t2 on t1.name=t2.name - WHERE t2.workspace=:workspace""", - "get_edge": """SELECT t1.source_id,t2.weight,t2.source_chunk_id as source_id,t2.keywords, - NVL(t2.description,'') AS description,NVL(t2.KEYWORDS,'') AS keywords - FROM GRAPH_TABLE (lightrag_graph - MATCH (a)-[e]->(b) - WHERE e.workspace=:workspace and a.workspace=:workspace and b.workspace=:workspace - AND a.name=:source_node_id and b.name = :target_node_id - COLUMNS (e.id,a.name as source_id) - ) t1 JOIN LIGHTRAG_GRAPH_EDGES t2 on t1.id=t2.id""", - "get_node_edges": """SELECT source_name,target_name - FROM GRAPH_TABLE (lightrag_graph - MATCH (a)-[e]->(b) - WHERE e.workspace=:workspace and a.workspace=:workspace and b.workspace=:workspace - AND a.name=:source_node_id - COLUMNS (a.name as source_name,b.name as target_name))""", - "merge_node": """MERGE INTO LIGHTRAG_GRAPH_NODES a - USING DUAL - ON (a.workspace=:workspace and a.name=:name) - WHEN NOT MATCHED THEN - INSERT(workspace,name,entity_type,description,source_chunk_id,content,content_vector) - values (:workspace,:name,:entity_type,:description,:source_chunk_id,:content,:content_vector) - WHEN MATCHED THEN - UPDATE SET - entity_type=:entity_type,description=:description,source_chunk_id=:source_chunk_id,content=:content,content_vector=:content_vector,updatetime=SYSDATE""", - "merge_edge": """MERGE INTO LIGHTRAG_GRAPH_EDGES a - USING DUAL - ON (a.workspace=:workspace and a.source_name=:source_name and a.target_name=:target_name) - WHEN NOT MATCHED THEN - INSERT(workspace,source_name,target_name,weight,keywords,description,source_chunk_id,content,content_vector) - values (:workspace,:source_name,:target_name,:weight,:keywords,:description,:source_chunk_id,:content,:content_vector) - WHEN MATCHED THEN - UPDATE SET - weight=:weight,keywords=:keywords,description=:description,source_chunk_id=:source_chunk_id,content=:content,content_vector=:content_vector,updatetime=SYSDATE""", - "get_all_nodes": """WITH t0 AS ( - SELECT name AS id, entity_type AS label, entity_type, description, - '["' || replace(source_chunk_id, '', '","') || '"]' source_chunk_ids - FROM lightrag_graph_nodes - WHERE workspace = :workspace - ORDER BY createtime DESC fetch first :limit rows only - ), t1 AS ( - SELECT t0.id, source_chunk_id - FROM t0, JSON_TABLE ( source_chunk_ids, '$[*]' COLUMNS ( source_chunk_id PATH '$' ) ) - ), t2 AS ( - SELECT t1.id, LISTAGG(t2.content, '\n') content - FROM t1 LEFT JOIN lightrag_doc_chunks t2 ON t1.source_chunk_id = t2.id - GROUP BY t1.id - ) - SELECT t0.id, label, entity_type, description, t2.content - FROM t0 LEFT JOIN t2 ON t0.id = t2.id""", - "get_all_edges": """SELECT t1.id,t1.keywords as label,t1.keywords, t1.source_name as source, t1.target_name as target, - t1.weight,t1.DESCRIPTION,t2.content - FROM LIGHTRAG_GRAPH_EDGES t1 - LEFT JOIN LIGHTRAG_DOC_CHUNKS t2 on t1.source_chunk_id=t2.id - WHERE t1.workspace=:workspace - order by t1.CREATETIME DESC - fetch first :limit rows only""", - "get_statistics": """select count(distinct CASE WHEN type='node' THEN id END) as nodes_count, - count(distinct CASE WHEN type='edge' THEN id END) as edges_count - FROM ( - select 'node' as type, id FROM GRAPH_TABLE (lightrag_graph - MATCH (a) WHERE a.workspace=:workspace columns(a.name as id)) - UNION - select 'edge' as type, TO_CHAR(id) id FROM GRAPH_TABLE (lightrag_graph - MATCH (a)-[e]->(b) WHERE e.workspace=:workspace columns(e.id)) - )""", - # SQL for deletion - "delete_vectors": "DELETE FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=:workspace AND id IN ({ids})", - "delete_entity": "DELETE FROM LIGHTRAG_GRAPH_NODES WHERE workspace=:workspace AND name=:entity_name", - "delete_entity_relations": "DELETE FROM LIGHTRAG_GRAPH_EDGES WHERE workspace=:workspace AND (source_name=:entity_name OR target_name=:entity_name)", - "delete_node": """DELETE FROM GRAPH_TABLE (lightrag_graph - MATCH (a) - WHERE a.workspace=:workspace AND a.name=:node_id - ACTION DELETE a)""", - # Drop tables - "drop_specifiy_table_workspace": "DELETE FROM {table_name} WHERE workspace=:workspace", -}