Remove Oracle storage implementation
This commit is contained in:
@@ -1,267 +0,0 @@
|
||||
from fastapi import FastAPI, HTTPException, File, UploadFile
|
||||
from fastapi import Query
|
||||
from contextlib import asynccontextmanager
|
||||
from pydantic import BaseModel
|
||||
from typing import Optional, Any
|
||||
|
||||
import sys
|
||||
import os
|
||||
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import asyncio
|
||||
import nest_asyncio
|
||||
from lightrag import LightRAG, QueryParam
|
||||
from lightrag.llm.openai import openai_complete_if_cache, openai_embed
|
||||
from lightrag.utils import EmbeddingFunc
|
||||
import numpy as np
|
||||
from lightrag.kg.shared_storage import initialize_pipeline_status
|
||||
|
||||
|
||||
print(os.getcwd())
|
||||
script_directory = Path(__file__).resolve().parent.parent
|
||||
sys.path.append(os.path.abspath(script_directory))
|
||||
|
||||
|
||||
# Apply nest_asyncio to solve event loop issues
|
||||
nest_asyncio.apply()
|
||||
|
||||
DEFAULT_RAG_DIR = "index_default"
|
||||
|
||||
|
||||
# We use OpenAI compatible API to call LLM on Oracle Cloud
|
||||
# More docs here https://github.com/jin38324/OCI_GenAI_access_gateway
|
||||
BASE_URL = "http://xxx.xxx.xxx.xxx:8088/v1/"
|
||||
APIKEY = "ocigenerativeai"
|
||||
|
||||
# Configure working directory
|
||||
WORKING_DIR = os.environ.get("RAG_DIR", f"{DEFAULT_RAG_DIR}")
|
||||
print(f"WORKING_DIR: {WORKING_DIR}")
|
||||
LLM_MODEL = os.environ.get("LLM_MODEL", "cohere.command-r-plus-08-2024")
|
||||
print(f"LLM_MODEL: {LLM_MODEL}")
|
||||
EMBEDDING_MODEL = os.environ.get("EMBEDDING_MODEL", "cohere.embed-multilingual-v3.0")
|
||||
print(f"EMBEDDING_MODEL: {EMBEDDING_MODEL}")
|
||||
EMBEDDING_MAX_TOKEN_SIZE = int(os.environ.get("EMBEDDING_MAX_TOKEN_SIZE", 512))
|
||||
print(f"EMBEDDING_MAX_TOKEN_SIZE: {EMBEDDING_MAX_TOKEN_SIZE}")
|
||||
|
||||
if not os.path.exists(WORKING_DIR):
|
||||
os.mkdir(WORKING_DIR)
|
||||
|
||||
os.environ["ORACLE_USER"] = ""
|
||||
os.environ["ORACLE_PASSWORD"] = ""
|
||||
os.environ["ORACLE_DSN"] = ""
|
||||
os.environ["ORACLE_CONFIG_DIR"] = "path_to_config_dir"
|
||||
os.environ["ORACLE_WALLET_LOCATION"] = "path_to_wallet_location"
|
||||
os.environ["ORACLE_WALLET_PASSWORD"] = "wallet_password"
|
||||
os.environ["ORACLE_WORKSPACE"] = "company"
|
||||
|
||||
|
||||
async def llm_model_func(
|
||||
prompt, system_prompt=None, history_messages=[], keyword_extraction=False, **kwargs
|
||||
) -> str:
|
||||
return await openai_complete_if_cache(
|
||||
LLM_MODEL,
|
||||
prompt,
|
||||
system_prompt=system_prompt,
|
||||
history_messages=history_messages,
|
||||
api_key=APIKEY,
|
||||
base_url=BASE_URL,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
||||
async def embedding_func(texts: list[str]) -> np.ndarray:
|
||||
return await openai_embed(
|
||||
texts,
|
||||
model=EMBEDDING_MODEL,
|
||||
api_key=APIKEY,
|
||||
base_url=BASE_URL,
|
||||
)
|
||||
|
||||
|
||||
async def get_embedding_dim():
|
||||
test_text = ["This is a test sentence."]
|
||||
embedding = await embedding_func(test_text)
|
||||
embedding_dim = embedding.shape[1]
|
||||
return embedding_dim
|
||||
|
||||
|
||||
async def init():
|
||||
# Detect embedding dimension
|
||||
embedding_dimension = await get_embedding_dim()
|
||||
print(f"Detected embedding dimension: {embedding_dimension}")
|
||||
# Create Oracle DB connection
|
||||
# The `config` parameter is the connection configuration of Oracle DB
|
||||
# More docs here https://python-oracledb.readthedocs.io/en/latest/user_guide/connection_handling.html
|
||||
# We storage data in unified tables, so we need to set a `workspace` parameter to specify which docs we want to store and query
|
||||
# Below is an example of how to connect to Oracle Autonomous Database on Oracle Cloud
|
||||
|
||||
# Initialize LightRAG
|
||||
# We use Oracle DB as the KV/vector/graph storage
|
||||
rag = LightRAG(
|
||||
enable_llm_cache=False,
|
||||
working_dir=WORKING_DIR,
|
||||
chunk_token_size=512,
|
||||
llm_model_func=llm_model_func,
|
||||
embedding_func=EmbeddingFunc(
|
||||
embedding_dim=embedding_dimension,
|
||||
max_token_size=512,
|
||||
func=embedding_func,
|
||||
),
|
||||
graph_storage="OracleGraphStorage",
|
||||
kv_storage="OracleKVStorage",
|
||||
vector_storage="OracleVectorDBStorage",
|
||||
)
|
||||
|
||||
await rag.initialize_storages()
|
||||
await initialize_pipeline_status()
|
||||
|
||||
return rag
|
||||
|
||||
|
||||
# Extract and Insert into LightRAG storage
|
||||
# with open("./dickens/book.txt", "r", encoding="utf-8") as f:
|
||||
# await rag.ainsert(f.read())
|
||||
|
||||
# # Perform search in different modes
|
||||
# modes = ["naive", "local", "global", "hybrid"]
|
||||
# for mode in modes:
|
||||
# print("="*20, mode, "="*20)
|
||||
# print(await rag.aquery("这篇文档是关于什么内容的?", param=QueryParam(mode=mode)))
|
||||
# print("-"*100, "\n")
|
||||
|
||||
# Data models
|
||||
|
||||
|
||||
class QueryRequest(BaseModel):
|
||||
query: str
|
||||
mode: str = "hybrid"
|
||||
only_need_context: bool = False
|
||||
only_need_prompt: bool = False
|
||||
|
||||
|
||||
class DataRequest(BaseModel):
|
||||
limit: int = 100
|
||||
|
||||
|
||||
class InsertRequest(BaseModel):
|
||||
text: str
|
||||
|
||||
|
||||
class Response(BaseModel):
|
||||
status: str
|
||||
data: Optional[Any] = None
|
||||
message: Optional[str] = None
|
||||
|
||||
|
||||
# API routes
|
||||
|
||||
rag = None
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
global rag
|
||||
rag = await init()
|
||||
print("done!")
|
||||
yield
|
||||
|
||||
|
||||
app = FastAPI(
|
||||
title="LightRAG API", description="API for RAG operations", lifespan=lifespan
|
||||
)
|
||||
|
||||
|
||||
@app.post("/query", response_model=Response)
|
||||
async def query_endpoint(request: QueryRequest):
|
||||
# try:
|
||||
# loop = asyncio.get_event_loop()
|
||||
if request.mode == "naive":
|
||||
top_k = 3
|
||||
else:
|
||||
top_k = 60
|
||||
result = await rag.aquery(
|
||||
request.query,
|
||||
param=QueryParam(
|
||||
mode=request.mode,
|
||||
only_need_context=request.only_need_context,
|
||||
only_need_prompt=request.only_need_prompt,
|
||||
top_k=top_k,
|
||||
),
|
||||
)
|
||||
return Response(status="success", data=result)
|
||||
# except Exception as e:
|
||||
# raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@app.get("/data", response_model=Response)
|
||||
async def query_all_nodes(type: str = Query("nodes"), limit: int = Query(100)):
|
||||
if type == "nodes":
|
||||
result = await rag.chunk_entity_relation_graph.get_all_nodes(limit=limit)
|
||||
elif type == "edges":
|
||||
result = await rag.chunk_entity_relation_graph.get_all_edges(limit=limit)
|
||||
elif type == "statistics":
|
||||
result = await rag.chunk_entity_relation_graph.get_statistics()
|
||||
return Response(status="success", data=result)
|
||||
|
||||
|
||||
@app.post("/insert", response_model=Response)
|
||||
async def insert_endpoint(request: InsertRequest):
|
||||
try:
|
||||
loop = asyncio.get_event_loop()
|
||||
await loop.run_in_executor(None, lambda: rag.insert(request.text))
|
||||
return Response(status="success", message="Text inserted successfully")
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@app.post("/insert_file", response_model=Response)
|
||||
async def insert_file(file: UploadFile = File(...)):
|
||||
try:
|
||||
file_content = await file.read()
|
||||
# Read file content
|
||||
try:
|
||||
content = file_content.decode("utf-8")
|
||||
except UnicodeDecodeError:
|
||||
# If UTF-8 decoding fails, try other encodings
|
||||
content = file_content.decode("gbk")
|
||||
# Insert file content
|
||||
loop = asyncio.get_event_loop()
|
||||
await loop.run_in_executor(None, lambda: rag.insert(content))
|
||||
|
||||
return Response(
|
||||
status="success",
|
||||
message=f"File content from {file.filename} inserted successfully",
|
||||
)
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
async def health_check():
|
||||
return {"status": "healthy"}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
|
||||
uvicorn.run(app, host="127.0.0.1", port=8020)
|
||||
|
||||
# Usage example
|
||||
# To run the server, use the following command in your terminal:
|
||||
# python lightrag_api_openai_compatible_demo.py
|
||||
|
||||
# Example requests:
|
||||
# 1. Query:
|
||||
# curl -X POST "http://127.0.0.1:8020/query" -H "Content-Type: application/json" -d '{"query": "your query here", "mode": "hybrid"}'
|
||||
|
||||
# 2. Insert text:
|
||||
# curl -X POST "http://127.0.0.1:8020/insert" -H "Content-Type: application/json" -d '{"text": "your text here"}'
|
||||
|
||||
# 3. Insert file:
|
||||
# curl -X POST "http://127.0.0.1:8020/insert_file" -H "Content-Type: multipart/form-data" -F "file=@path/to/your/file.txt"
|
||||
|
||||
|
||||
# 4. Health check:
|
||||
# curl -X GET "http://127.0.0.1:8020/health"
|
@@ -1,141 +0,0 @@
|
||||
import sys
|
||||
import os
|
||||
from pathlib import Path
|
||||
import asyncio
|
||||
from lightrag import LightRAG, QueryParam
|
||||
from lightrag.llm.openai import openai_complete_if_cache, openai_embed
|
||||
from lightrag.utils import EmbeddingFunc
|
||||
import numpy as np
|
||||
from lightrag.kg.shared_storage import initialize_pipeline_status
|
||||
|
||||
print(os.getcwd())
|
||||
script_directory = Path(__file__).resolve().parent.parent
|
||||
sys.path.append(os.path.abspath(script_directory))
|
||||
|
||||
WORKING_DIR = "./dickens"
|
||||
|
||||
# We use OpenAI compatible API to call LLM on Oracle Cloud
|
||||
# More docs here https://github.com/jin38324/OCI_GenAI_access_gateway
|
||||
BASE_URL = "http://xxx.xxx.xxx.xxx:8088/v1/"
|
||||
APIKEY = "ocigenerativeai"
|
||||
CHATMODEL = "cohere.command-r-plus"
|
||||
EMBEDMODEL = "cohere.embed-multilingual-v3.0"
|
||||
CHUNK_TOKEN_SIZE = 1024
|
||||
MAX_TOKENS = 4000
|
||||
|
||||
if not os.path.exists(WORKING_DIR):
|
||||
os.mkdir(WORKING_DIR)
|
||||
|
||||
os.environ["ORACLE_USER"] = "username"
|
||||
os.environ["ORACLE_PASSWORD"] = "xxxxxxxxx"
|
||||
os.environ["ORACLE_DSN"] = "xxxxxxx_medium"
|
||||
os.environ["ORACLE_CONFIG_DIR"] = "path_to_config_dir"
|
||||
os.environ["ORACLE_WALLET_LOCATION"] = "path_to_wallet_location"
|
||||
os.environ["ORACLE_WALLET_PASSWORD"] = "wallet_password"
|
||||
os.environ["ORACLE_WORKSPACE"] = "company"
|
||||
|
||||
|
||||
async def llm_model_func(
|
||||
prompt, system_prompt=None, history_messages=[], keyword_extraction=False, **kwargs
|
||||
) -> str:
|
||||
return await openai_complete_if_cache(
|
||||
CHATMODEL,
|
||||
prompt,
|
||||
system_prompt=system_prompt,
|
||||
history_messages=history_messages,
|
||||
api_key=APIKEY,
|
||||
base_url=BASE_URL,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
||||
async def embedding_func(texts: list[str]) -> np.ndarray:
|
||||
return await openai_embed(
|
||||
texts,
|
||||
model=EMBEDMODEL,
|
||||
api_key=APIKEY,
|
||||
base_url=BASE_URL,
|
||||
)
|
||||
|
||||
|
||||
async def get_embedding_dim():
|
||||
test_text = ["This is a test sentence."]
|
||||
embedding = await embedding_func(test_text)
|
||||
embedding_dim = embedding.shape[1]
|
||||
return embedding_dim
|
||||
|
||||
|
||||
async def initialize_rag():
|
||||
# Detect embedding dimension
|
||||
embedding_dimension = await get_embedding_dim()
|
||||
print(f"Detected embedding dimension: {embedding_dimension}")
|
||||
|
||||
# Initialize LightRAG
|
||||
# We use Oracle DB as the KV/vector/graph storage
|
||||
# You can add `addon_params={"example_number": 1, "language": "Simplfied Chinese"}` to control the prompt
|
||||
rag = LightRAG(
|
||||
# log_level="DEBUG",
|
||||
working_dir=WORKING_DIR,
|
||||
entity_extract_max_gleaning=1,
|
||||
enable_llm_cache=True,
|
||||
enable_llm_cache_for_entity_extract=True,
|
||||
embedding_cache_config=None, # {"enabled": True,"similarity_threshold": 0.90},
|
||||
chunk_token_size=CHUNK_TOKEN_SIZE,
|
||||
llm_model_max_token_size=MAX_TOKENS,
|
||||
llm_model_func=llm_model_func,
|
||||
embedding_func=EmbeddingFunc(
|
||||
embedding_dim=embedding_dimension,
|
||||
max_token_size=500,
|
||||
func=embedding_func,
|
||||
),
|
||||
graph_storage="OracleGraphStorage",
|
||||
kv_storage="OracleKVStorage",
|
||||
vector_storage="OracleVectorDBStorage",
|
||||
addon_params={
|
||||
"example_number": 1,
|
||||
"language": "Simplfied Chinese",
|
||||
"entity_types": ["organization", "person", "geo", "event"],
|
||||
"insert_batch_size": 2,
|
||||
},
|
||||
)
|
||||
await rag.initialize_storages()
|
||||
await initialize_pipeline_status()
|
||||
|
||||
return rag
|
||||
|
||||
|
||||
async def main():
|
||||
try:
|
||||
# Initialize RAG instance
|
||||
rag = await initialize_rag()
|
||||
|
||||
# Extract and Insert into LightRAG storage
|
||||
with open(WORKING_DIR + "/docs.txt", "r", encoding="utf-8") as f:
|
||||
all_text = f.read()
|
||||
texts = [x for x in all_text.split("\n") if x]
|
||||
|
||||
# New mode use pipeline
|
||||
await rag.apipeline_enqueue_documents(texts)
|
||||
await rag.apipeline_process_enqueue_documents()
|
||||
|
||||
# Old method use ainsert
|
||||
# await rag.ainsert(texts)
|
||||
|
||||
# Perform search in different modes
|
||||
modes = ["naive", "local", "global", "hybrid"]
|
||||
for mode in modes:
|
||||
print("=" * 20, mode, "=" * 20)
|
||||
print(
|
||||
await rag.aquery(
|
||||
"What are the top themes in this story?",
|
||||
param=QueryParam(mode=mode),
|
||||
)
|
||||
)
|
||||
print("-" * 100, "\n")
|
||||
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
Reference in New Issue
Block a user