Merge pull request #589 from ShanGor/main
Enrich README.md for postgres usage, fix the issue for running postgres on python version<12
This commit is contained in:
@@ -361,6 +361,7 @@ see test_neo4j.py for a working example.
|
|||||||
### Using PostgreSQL for Storage
|
### Using PostgreSQL for Storage
|
||||||
For production level scenarios you will most likely want to leverage an enterprise solution. PostgreSQL can provide a one-stop solution for you as KV store, VectorDB (pgvector) and GraphDB (apache AGE).
|
For production level scenarios you will most likely want to leverage an enterprise solution. PostgreSQL can provide a one-stop solution for you as KV store, VectorDB (pgvector) and GraphDB (apache AGE).
|
||||||
* PostgreSQL is lightweight,the whole binary distribution including all necessary plugins can be zipped to 40MB: Ref to [Windows Release](https://github.com/ShanGor/apache-age-windows/releases/tag/PG17%2Fv1.5.0-rc0) as it is easy to install for Linux/Mac.
|
* PostgreSQL is lightweight,the whole binary distribution including all necessary plugins can be zipped to 40MB: Ref to [Windows Release](https://github.com/ShanGor/apache-age-windows/releases/tag/PG17%2Fv1.5.0-rc0) as it is easy to install for Linux/Mac.
|
||||||
|
* If you prefer docker, please start with this image if you are a beginner to avoid hiccups (DO read the overview): https://hub.docker.com/r/shangor/postgres-for-rag
|
||||||
* How to start? Ref to: [examples/lightrag_zhipu_postgres_demo.py](https://github.com/HKUDS/LightRAG/blob/main/examples/lightrag_zhipu_postgres_demo.py)
|
* How to start? Ref to: [examples/lightrag_zhipu_postgres_demo.py](https://github.com/HKUDS/LightRAG/blob/main/examples/lightrag_zhipu_postgres_demo.py)
|
||||||
* Create index for AGE example: (Change below `dickens` to your graph name if necessary)
|
* Create index for AGE example: (Change below `dickens` to your graph name if necessary)
|
||||||
```
|
```
|
||||||
|
97
examples/copy_llm_cache_to_another_storage.py
Normal file
97
examples/copy_llm_cache_to_another_storage.py
Normal file
@@ -0,0 +1,97 @@
|
|||||||
|
"""
|
||||||
|
Sometimes you need to switch a storage solution, but you want to save LLM token and time.
|
||||||
|
This handy script helps you to copy the LLM caches from one storage solution to another.
|
||||||
|
(Not all the storage impl are supported)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
from lightrag.kg.postgres_impl import PostgreSQLDB, PGKVStorage
|
||||||
|
from lightrag.storage import JsonKVStorage
|
||||||
|
|
||||||
|
load_dotenv()
|
||||||
|
ROOT_DIR = os.environ.get("ROOT_DIR")
|
||||||
|
WORKING_DIR = f"{ROOT_DIR}/dickens"
|
||||||
|
|
||||||
|
logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO)
|
||||||
|
|
||||||
|
if not os.path.exists(WORKING_DIR):
|
||||||
|
os.mkdir(WORKING_DIR)
|
||||||
|
|
||||||
|
# AGE
|
||||||
|
os.environ["AGE_GRAPH_NAME"] = "chinese"
|
||||||
|
|
||||||
|
postgres_db = PostgreSQLDB(
|
||||||
|
config={
|
||||||
|
"host": "localhost",
|
||||||
|
"port": 15432,
|
||||||
|
"user": "rag",
|
||||||
|
"password": "rag",
|
||||||
|
"database": "r2",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def copy_from_postgres_to_json():
|
||||||
|
await postgres_db.initdb()
|
||||||
|
|
||||||
|
from_llm_response_cache = PGKVStorage(
|
||||||
|
namespace="llm_response_cache",
|
||||||
|
global_config={"embedding_batch_num": 6},
|
||||||
|
embedding_func=None,
|
||||||
|
db=postgres_db,
|
||||||
|
)
|
||||||
|
|
||||||
|
to_llm_response_cache = JsonKVStorage(
|
||||||
|
namespace="llm_response_cache",
|
||||||
|
global_config={"working_dir": WORKING_DIR},
|
||||||
|
embedding_func=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
kv = {}
|
||||||
|
for c_id in await from_llm_response_cache.all_keys():
|
||||||
|
print(f"Copying {c_id}")
|
||||||
|
workspace = c_id["workspace"]
|
||||||
|
mode = c_id["mode"]
|
||||||
|
_id = c_id["id"]
|
||||||
|
postgres_db.workspace = workspace
|
||||||
|
obj = await from_llm_response_cache.get_by_mode_and_id(mode, _id)
|
||||||
|
if mode not in kv:
|
||||||
|
kv[mode] = {}
|
||||||
|
kv[mode][_id] = obj[_id]
|
||||||
|
print(f"Object {obj}")
|
||||||
|
await to_llm_response_cache.upsert(kv)
|
||||||
|
await to_llm_response_cache.index_done_callback()
|
||||||
|
print("Mission accomplished!")
|
||||||
|
|
||||||
|
|
||||||
|
async def copy_from_json_to_postgres():
|
||||||
|
await postgres_db.initdb()
|
||||||
|
|
||||||
|
from_llm_response_cache = JsonKVStorage(
|
||||||
|
namespace="llm_response_cache",
|
||||||
|
global_config={"working_dir": WORKING_DIR},
|
||||||
|
embedding_func=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
to_llm_response_cache = PGKVStorage(
|
||||||
|
namespace="llm_response_cache",
|
||||||
|
global_config={"embedding_batch_num": 6},
|
||||||
|
embedding_func=None,
|
||||||
|
db=postgres_db,
|
||||||
|
)
|
||||||
|
|
||||||
|
for mode in await from_llm_response_cache.all_keys():
|
||||||
|
print(f"Copying {mode}")
|
||||||
|
caches = await from_llm_response_cache.get_by_id(mode)
|
||||||
|
for k, v in caches.items():
|
||||||
|
item = {mode: {k: v}}
|
||||||
|
print(f"\tCopying {item}")
|
||||||
|
await to_llm_response_cache.upsert(item)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(copy_from_json_to_postgres())
|
@@ -231,6 +231,16 @@ class PGKVStorage(BaseKVStorage):
|
|||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
async def all_keys(self) -> list[dict]:
|
||||||
|
if "llm_response_cache" == self.namespace:
|
||||||
|
sql = "select workspace,mode,id from lightrag_llm_cache"
|
||||||
|
res = await self.db.query(sql, multirows=True)
|
||||||
|
return res
|
||||||
|
else:
|
||||||
|
logger.error(
|
||||||
|
f"all_keys is only implemented for llm_response_cache, not for {self.namespace}"
|
||||||
|
)
|
||||||
|
|
||||||
async def filter_keys(self, keys: List[str]) -> Set[str]:
|
async def filter_keys(self, keys: List[str]) -> Set[str]:
|
||||||
"""Filter out duplicated content"""
|
"""Filter out duplicated content"""
|
||||||
sql = SQL_TEMPLATES["filter_keys"].format(
|
sql = SQL_TEMPLATES["filter_keys"].format(
|
||||||
@@ -412,7 +422,10 @@ class PGDocStatusStorage(DocStatusStorage):
|
|||||||
|
|
||||||
async def filter_keys(self, data: list[str]) -> set[str]:
|
async def filter_keys(self, data: list[str]) -> set[str]:
|
||||||
"""Return keys that don't exist in storage"""
|
"""Return keys that don't exist in storage"""
|
||||||
sql = f"SELECT id FROM LIGHTRAG_DOC_STATUS WHERE workspace=$1 AND id IN ({",".join([f"'{_id}'" for _id in data])})"
|
keys = ",".join([f"'{_id}'" for _id in data])
|
||||||
|
sql = (
|
||||||
|
f"SELECT id FROM LIGHTRAG_DOC_STATUS WHERE workspace=$1 AND id IN ({keys})"
|
||||||
|
)
|
||||||
result = await self.db.query(sql, {"workspace": self.db.workspace}, True)
|
result = await self.db.query(sql, {"workspace": self.db.workspace}, True)
|
||||||
# The result is like [{'id': 'id1'}, {'id': 'id2'}, ...].
|
# The result is like [{'id': 'id1'}, {'id': 'id2'}, ...].
|
||||||
if result is None:
|
if result is None:
|
||||||
|
Reference in New Issue
Block a user