feat: 增加redis KV存储,增加openai+neo4j+milvus+redis的demo测试,新增lightrag.py: RedisKVStorage,新增requirements.txt:aioredis依赖
This commit is contained in:
69
examples/lightrag_openai_neo4j_milvus_redis_demo.py
Normal file
69
examples/lightrag_openai_neo4j_milvus_redis_demo.py
Normal file
@@ -0,0 +1,69 @@
|
||||
import os
|
||||
from lightrag import LightRAG, QueryParam
|
||||
from lightrag.llm import ollama_embed, openai_complete_if_cache
|
||||
from lightrag.utils import EmbeddingFunc
|
||||
|
||||
# WorkingDir
|
||||
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
WORKING_DIR = os.path.join(ROOT_DIR, "myKG")
|
||||
if not os.path.exists(WORKING_DIR):
|
||||
os.mkdir(WORKING_DIR)
|
||||
print(f"WorkingDir: {WORKING_DIR}")
|
||||
|
||||
# redis
|
||||
os.environ["REDIS_URI"] = "redis://localhost:6379"
|
||||
|
||||
# neo4j
|
||||
BATCH_SIZE_NODES = 500
|
||||
BATCH_SIZE_EDGES = 100
|
||||
os.environ["NEO4J_URI"] = "bolt://117.50.173.35:7687"
|
||||
os.environ["NEO4J_USERNAME"] = "neo4j"
|
||||
os.environ["NEO4J_PASSWORD"] = "12345678"
|
||||
|
||||
# milvus
|
||||
os.environ["MILVUS_URI"] = "http://117.50.173.35:19530"
|
||||
os.environ["MILVUS_USER"] = "root"
|
||||
os.environ["MILVUS_PASSWORD"] = "Milvus"
|
||||
os.environ["MILVUS_DB_NAME"] = "lightrag"
|
||||
|
||||
|
||||
async def llm_model_func(
|
||||
prompt, system_prompt=None, history_messages=[], keyword_extraction=False, **kwargs
|
||||
) -> str:
|
||||
return await openai_complete_if_cache(
|
||||
"deepseek-chat",
|
||||
prompt,
|
||||
system_prompt=system_prompt,
|
||||
history_messages=history_messages,
|
||||
api_key="sk-91d0b59f25554251aa813ed756d79a6d",
|
||||
base_url="https://api.deepseek.com",
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
||||
embedding_func = EmbeddingFunc(
|
||||
embedding_dim=768,
|
||||
max_token_size=512,
|
||||
func=lambda texts: ollama_embed(
|
||||
texts, embed_model="shaw/dmeta-embedding-zh", host="http://117.50.173.35:11434"
|
||||
),
|
||||
)
|
||||
|
||||
rag = LightRAG(
|
||||
working_dir=WORKING_DIR,
|
||||
llm_model_func=llm_model_func,
|
||||
llm_model_max_token_size=32768,
|
||||
embedding_func=embedding_func,
|
||||
chunk_token_size=512,
|
||||
chunk_overlap_token_size=256,
|
||||
kv_storage="RedisKVStorage",
|
||||
graph_storage="Neo4JStorage",
|
||||
vector_storage="MilvusVectorDBStorge",
|
||||
doc_status_storage="RedisKVStorage",
|
||||
)
|
||||
|
||||
file = "../book.txt"
|
||||
with open(file, "r", encoding="utf-8") as f:
|
||||
rag.insert(f.read())
|
||||
|
||||
print(rag.query("谁会3D建模 ?", param=QueryParam(mode="mix")))
|
64
lightrag/kg/redis_impl.py
Normal file
64
lightrag/kg/redis_impl.py
Normal file
@@ -0,0 +1,64 @@
|
||||
import os
|
||||
from tqdm.asyncio import tqdm as tqdm_async
|
||||
from dataclasses import dataclass
|
||||
import aioredis
|
||||
from lightrag.utils import logger
|
||||
from lightrag.base import BaseKVStorage
|
||||
import json
|
||||
|
||||
|
||||
@dataclass
|
||||
class RedisKVStorage(BaseKVStorage):
|
||||
def __post_init__(self):
|
||||
redis_url = os.environ.get("REDIS_URI", "redis://localhost:6379")
|
||||
self._redis = aioredis.from_url(redis_url, decode_responses=True)
|
||||
logger.info(f"Use Redis as KV {self.namespace}")
|
||||
|
||||
async def all_keys(self) -> list[str]:
|
||||
keys = await self._redis.keys(f"{self.namespace}:*")
|
||||
return [key.split(":", 1)[-1] for key in keys]
|
||||
|
||||
async def get_by_id(self, id):
|
||||
data = await self._redis.get(f"{self.namespace}:{id}")
|
||||
return json.loads(data) if data else None
|
||||
|
||||
async def get_by_ids(self, ids, fields=None):
|
||||
pipe = self._redis.pipeline()
|
||||
for id in ids:
|
||||
pipe.get(f"{self.namespace}:{id}")
|
||||
results = await pipe.execute()
|
||||
|
||||
if fields:
|
||||
# Filter fields if specified
|
||||
return [
|
||||
{field: value.get(field) for field in fields if field in value}
|
||||
if (value := json.loads(result))
|
||||
else None
|
||||
for result in results
|
||||
]
|
||||
|
||||
return [json.loads(result) if result else None for result in results]
|
||||
|
||||
async def filter_keys(self, data: list[str]) -> set[str]:
|
||||
pipe = self._redis.pipeline()
|
||||
for key in data:
|
||||
pipe.exists(f"{self.namespace}:{key}")
|
||||
results = await pipe.execute()
|
||||
|
||||
existing_ids = {data[i] for i, exists in enumerate(results) if exists}
|
||||
return set(data) - existing_ids
|
||||
|
||||
async def upsert(self, data: dict[str, dict]):
|
||||
pipe = self._redis.pipeline()
|
||||
for k, v in tqdm_async(data.items(), desc="Upserting"):
|
||||
pipe.set(f"{self.namespace}:{k}", json.dumps(v))
|
||||
await pipe.execute()
|
||||
|
||||
for k in data:
|
||||
data[k]["_id"] = k
|
||||
return data
|
||||
|
||||
async def drop(self):
|
||||
keys = await self._redis.keys(f"{self.namespace}:*")
|
||||
if keys:
|
||||
await self._redis.delete(*keys)
|
@@ -52,6 +52,7 @@ STORAGES = {
|
||||
"OracleVectorDBStorage": ".kg.oracle_impl",
|
||||
"MilvusVectorDBStorge": ".kg.milvus_impl",
|
||||
"MongoKVStorage": ".kg.mongo_impl",
|
||||
"RedisKVStorage": ".kg.redis_impl",
|
||||
"ChromaVectorDBStorage": ".kg.chroma_impl",
|
||||
"TiDBKVStorage": ".kg.tidb_impl",
|
||||
"TiDBVectorDBStorage": ".kg.tidb_impl",
|
||||
|
@@ -2,6 +2,7 @@ accelerate
|
||||
aioboto3
|
||||
aiofiles
|
||||
aiohttp
|
||||
aioredis
|
||||
asyncpg
|
||||
|
||||
# database packages
|
||||
|
Reference in New Issue
Block a user