From e08905b398a83c07bb1b23c4ce269a8b275bde83 Mon Sep 17 00:00:00 2001 From: hyb <468949484@qq.com> Date: Wed, 22 Jan 2025 16:42:13 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0redis=20KV=E5=AD=98?= =?UTF-8?q?=E5=82=A8=EF=BC=8C=E5=A2=9E=E5=8A=A0openai+neo4j+milvus+redis?= =?UTF-8?q?=E7=9A=84demo=E6=B5=8B=E8=AF=95=EF=BC=8C=E6=96=B0=E5=A2=9Elight?= =?UTF-8?q?rag.py:=20RedisKVStorage=EF=BC=8C=E6=96=B0=E5=A2=9Erequirements?= =?UTF-8?q?.txt:aioredis=E4=BE=9D=E8=B5=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...lightrag_openai_neo4j_milvus_redis_demo.py | 69 +++++++++++++++++++ lightrag/kg/redis_impl.py | 64 +++++++++++++++++ lightrag/lightrag.py | 1 + requirements.txt | 1 + 4 files changed, 135 insertions(+) create mode 100644 examples/lightrag_openai_neo4j_milvus_redis_demo.py create mode 100644 lightrag/kg/redis_impl.py diff --git a/examples/lightrag_openai_neo4j_milvus_redis_demo.py b/examples/lightrag_openai_neo4j_milvus_redis_demo.py new file mode 100644 index 00000000..156a711c --- /dev/null +++ b/examples/lightrag_openai_neo4j_milvus_redis_demo.py @@ -0,0 +1,69 @@ +import os +from lightrag import LightRAG, QueryParam +from lightrag.llm import ollama_embed, openai_complete_if_cache +from lightrag.utils import EmbeddingFunc + +# WorkingDir +ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) +WORKING_DIR = os.path.join(ROOT_DIR, "myKG") +if not os.path.exists(WORKING_DIR): + os.mkdir(WORKING_DIR) +print(f"WorkingDir: {WORKING_DIR}") + +# redis +os.environ["REDIS_URI"] = "redis://localhost:6379" + +# neo4j +BATCH_SIZE_NODES = 500 +BATCH_SIZE_EDGES = 100 +os.environ["NEO4J_URI"] = "bolt://117.50.173.35:7687" +os.environ["NEO4J_USERNAME"] = "neo4j" +os.environ["NEO4J_PASSWORD"] = "12345678" + +# milvus +os.environ["MILVUS_URI"] = "http://117.50.173.35:19530" +os.environ["MILVUS_USER"] = "root" +os.environ["MILVUS_PASSWORD"] = "Milvus" +os.environ["MILVUS_DB_NAME"] = "lightrag" + + +async def llm_model_func( + prompt, system_prompt=None, history_messages=[], keyword_extraction=False, **kwargs +) -> str: + return await openai_complete_if_cache( + "deepseek-chat", + prompt, + system_prompt=system_prompt, + history_messages=history_messages, + api_key="sk-91d0b59f25554251aa813ed756d79a6d", + base_url="https://api.deepseek.com", + **kwargs, + ) + + +embedding_func = EmbeddingFunc( + embedding_dim=768, + max_token_size=512, + func=lambda texts: ollama_embed( + texts, embed_model="shaw/dmeta-embedding-zh", host="http://117.50.173.35:11434" + ), +) + +rag = LightRAG( + working_dir=WORKING_DIR, + llm_model_func=llm_model_func, + llm_model_max_token_size=32768, + embedding_func=embedding_func, + chunk_token_size=512, + chunk_overlap_token_size=256, + kv_storage="RedisKVStorage", + graph_storage="Neo4JStorage", + vector_storage="MilvusVectorDBStorge", + doc_status_storage="RedisKVStorage", +) + +file = "../book.txt" +with open(file, "r", encoding="utf-8") as f: + rag.insert(f.read()) + +print(rag.query("谁会3D建模 ?", param=QueryParam(mode="mix"))) diff --git a/lightrag/kg/redis_impl.py b/lightrag/kg/redis_impl.py new file mode 100644 index 00000000..a861cb26 --- /dev/null +++ b/lightrag/kg/redis_impl.py @@ -0,0 +1,64 @@ +import os +from tqdm.asyncio import tqdm as tqdm_async +from dataclasses import dataclass +import aioredis +from lightrag.utils import logger +from lightrag.base import BaseKVStorage +import json + + +@dataclass +class RedisKVStorage(BaseKVStorage): + def __post_init__(self): + redis_url = os.environ.get("REDIS_URI", "redis://localhost:6379") + self._redis = aioredis.from_url(redis_url, decode_responses=True) + logger.info(f"Use Redis as KV {self.namespace}") + + async def all_keys(self) -> list[str]: + keys = await self._redis.keys(f"{self.namespace}:*") + return [key.split(":", 1)[-1] for key in keys] + + async def get_by_id(self, id): + data = await self._redis.get(f"{self.namespace}:{id}") + return json.loads(data) if data else None + + async def get_by_ids(self, ids, fields=None): + pipe = self._redis.pipeline() + for id in ids: + pipe.get(f"{self.namespace}:{id}") + results = await pipe.execute() + + if fields: + # Filter fields if specified + return [ + {field: value.get(field) for field in fields if field in value} + if (value := json.loads(result)) + else None + for result in results + ] + + return [json.loads(result) if result else None for result in results] + + async def filter_keys(self, data: list[str]) -> set[str]: + pipe = self._redis.pipeline() + for key in data: + pipe.exists(f"{self.namespace}:{key}") + results = await pipe.execute() + + existing_ids = {data[i] for i, exists in enumerate(results) if exists} + return set(data) - existing_ids + + async def upsert(self, data: dict[str, dict]): + pipe = self._redis.pipeline() + for k, v in tqdm_async(data.items(), desc="Upserting"): + pipe.set(f"{self.namespace}:{k}", json.dumps(v)) + await pipe.execute() + + for k in data: + data[k]["_id"] = k + return data + + async def drop(self): + keys = await self._redis.keys(f"{self.namespace}:*") + if keys: + await self._redis.delete(*keys) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index ad79afaa..edec85f1 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -52,6 +52,7 @@ STORAGES = { "OracleVectorDBStorage": ".kg.oracle_impl", "MilvusVectorDBStorge": ".kg.milvus_impl", "MongoKVStorage": ".kg.mongo_impl", + "RedisKVStorage": ".kg.redis_impl", "ChromaVectorDBStorage": ".kg.chroma_impl", "TiDBKVStorage": ".kg.tidb_impl", "TiDBVectorDBStorage": ".kg.tidb_impl", diff --git a/requirements.txt b/requirements.txt index 48c25ff8..d2d6da20 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ accelerate aioboto3 aiofiles aiohttp +aioredis asyncpg # database packages