diff --git a/README.md b/README.md index 71248056..da1a1d56 100644 --- a/README.md +++ b/README.md @@ -361,6 +361,7 @@ see test_neo4j.py for a working example. ### Using PostgreSQL for Storage For production level scenarios you will most likely want to leverage an enterprise solution. PostgreSQL can provide a one-stop solution for you as KV store, VectorDB (pgvector) and GraphDB (apache AGE). * PostgreSQL is lightweight,the whole binary distribution including all necessary plugins can be zipped to 40MB: Ref to [Windows Release](https://github.com/ShanGor/apache-age-windows/releases/tag/PG17%2Fv1.5.0-rc0) as it is easy to install for Linux/Mac. +* If you prefer docker, please start with this image if you are a beginner to avoid hiccups (DO read the overview): https://hub.docker.com/r/shangor/postgres-for-rag * How to start? Ref to: [examples/lightrag_zhipu_postgres_demo.py](https://github.com/HKUDS/LightRAG/blob/main/examples/lightrag_zhipu_postgres_demo.py) * Create index for AGE example: (Change below `dickens` to your graph name if necessary) ``` diff --git a/examples/copy_llm_cache_to_another_storage.py b/examples/copy_llm_cache_to_another_storage.py new file mode 100644 index 00000000..b9378c7c --- /dev/null +++ b/examples/copy_llm_cache_to_another_storage.py @@ -0,0 +1,97 @@ +""" +Sometimes you need to switch a storage solution, but you want to save LLM token and time. +This handy script helps you to copy the LLM caches from one storage solution to another. +(Not all the storage impl are supported) +""" + +import asyncio +import logging +import os +from dotenv import load_dotenv + +from lightrag.kg.postgres_impl import PostgreSQLDB, PGKVStorage +from lightrag.storage import JsonKVStorage + +load_dotenv() +ROOT_DIR = os.environ.get("ROOT_DIR") +WORKING_DIR = f"{ROOT_DIR}/dickens" + +logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO) + +if not os.path.exists(WORKING_DIR): + os.mkdir(WORKING_DIR) + +# AGE +os.environ["AGE_GRAPH_NAME"] = "chinese" + +postgres_db = PostgreSQLDB( + config={ + "host": "localhost", + "port": 15432, + "user": "rag", + "password": "rag", + "database": "r2", + } +) + + +async def copy_from_postgres_to_json(): + await postgres_db.initdb() + + from_llm_response_cache = PGKVStorage( + namespace="llm_response_cache", + global_config={"embedding_batch_num": 6}, + embedding_func=None, + db=postgres_db, + ) + + to_llm_response_cache = JsonKVStorage( + namespace="llm_response_cache", + global_config={"working_dir": WORKING_DIR}, + embedding_func=None, + ) + + kv = {} + for c_id in await from_llm_response_cache.all_keys(): + print(f"Copying {c_id}") + workspace = c_id["workspace"] + mode = c_id["mode"] + _id = c_id["id"] + postgres_db.workspace = workspace + obj = await from_llm_response_cache.get_by_mode_and_id(mode, _id) + if mode not in kv: + kv[mode] = {} + kv[mode][_id] = obj[_id] + print(f"Object {obj}") + await to_llm_response_cache.upsert(kv) + await to_llm_response_cache.index_done_callback() + print("Mission accomplished!") + + +async def copy_from_json_to_postgres(): + await postgres_db.initdb() + + from_llm_response_cache = JsonKVStorage( + namespace="llm_response_cache", + global_config={"working_dir": WORKING_DIR}, + embedding_func=None, + ) + + to_llm_response_cache = PGKVStorage( + namespace="llm_response_cache", + global_config={"embedding_batch_num": 6}, + embedding_func=None, + db=postgres_db, + ) + + for mode in await from_llm_response_cache.all_keys(): + print(f"Copying {mode}") + caches = await from_llm_response_cache.get_by_id(mode) + for k, v in caches.items(): + item = {mode: {k: v}} + print(f"\tCopying {item}") + await to_llm_response_cache.upsert(item) + + +if __name__ == "__main__": + asyncio.run(copy_from_json_to_postgres()) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index b93a345b..86072c9f 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -231,6 +231,16 @@ class PGKVStorage(BaseKVStorage): else: return None + async def all_keys(self) -> list[dict]: + if "llm_response_cache" == self.namespace: + sql = "select workspace,mode,id from lightrag_llm_cache" + res = await self.db.query(sql, multirows=True) + return res + else: + logger.error( + f"all_keys is only implemented for llm_response_cache, not for {self.namespace}" + ) + async def filter_keys(self, keys: List[str]) -> Set[str]: """Filter out duplicated content""" sql = SQL_TEMPLATES["filter_keys"].format( @@ -412,7 +422,10 @@ class PGDocStatusStorage(DocStatusStorage): async def filter_keys(self, data: list[str]) -> set[str]: """Return keys that don't exist in storage""" - sql = f"SELECT id FROM LIGHTRAG_DOC_STATUS WHERE workspace=$1 AND id IN ({",".join([f"'{_id}'" for _id in data])})" + keys = ",".join([f"'{_id}'" for _id in data]) + sql = ( + f"SELECT id FROM LIGHTRAG_DOC_STATUS WHERE workspace=$1 AND id IN ({keys})" + ) result = await self.db.query(sql, {"workspace": self.db.workspace}, True) # The result is like [{'id': 'id1'}, {'id': 'id2'}, ...]. if result is None: