diff --git a/lightrag/kg/mongo_impl.py b/lightrag/kg/mongo_impl.py index 61222357..fbbae8c2 100644 --- a/lightrag/kg/mongo_impl.py +++ b/lightrag/kg/mongo_impl.py @@ -2,7 +2,7 @@ import os from tqdm.asyncio import tqdm as tqdm_async from dataclasses import dataclass from pymongo import MongoClient - +from typing import Union from lightrag.utils import logger from lightrag.base import BaseKVStorage @@ -41,11 +41,35 @@ class MongoKVStorage(BaseKVStorage): return set([s for s in data if s not in existing_ids]) async def upsert(self, data: dict[str, dict]): - for k, v in tqdm_async(data.items(), desc="Upserting"): - self._data.update_one({"_id": k}, {"$set": v}, upsert=True) - data[k]["_id"] = k + if self.namespace == "llm_response_cache": + for mode, items in data.items(): + for k, v in tqdm_async(items.items(), desc="Upserting"): + key = f"{mode}_{k}" + result = self._data.update_one( + {"_id": key}, {"$setOnInsert": v}, upsert=True + ) + if result.upserted_id: + logger.debug(f"\nInserted new document with key: {key}") + data[mode][k]["_id"] = key + else: + for k, v in tqdm_async(data.items(), desc="Upserting"): + self._data.update_one({"_id": k}, {"$set": v}, upsert=True) + data[k]["_id"] = k return data + async def get_by_mode_and_id(self, mode: str, id: str) -> Union[dict, None]: + if "llm_response_cache" == self.namespace: + res = {} + v = self._data.find_one({"_id": mode + "_" + id}) + if v: + res[id] = v + logger.debug(f"llm_response_cache find one by:{id}") + return res + else: + return None + else: + return None + async def drop(self): """ """ pass diff --git a/lightrag/kg/neo4j_impl.py b/lightrag/kg/neo4j_impl.py index 884fcb40..8c2afb5d 100644 --- a/lightrag/kg/neo4j_impl.py +++ b/lightrag/kg/neo4j_impl.py @@ -39,6 +39,7 @@ class Neo4JStorage(BaseGraphStorage): URI = os.environ["NEO4J_URI"] USERNAME = os.environ["NEO4J_USERNAME"] PASSWORD = os.environ["NEO4J_PASSWORD"] + MAX_CONNECTION_POOL_SIZE = os.environ.get("NEO4J_MAX_CONNECTION_POOL_SIZE", 800) DATABASE = os.environ.get( "NEO4J_DATABASE" ) # If this param is None, the home database will be used. If it is not None, the specified database will be used. @@ -47,7 +48,11 @@ class Neo4JStorage(BaseGraphStorage): URI, auth=(USERNAME, PASSWORD) ) _database_name = "home database" if DATABASE is None else f"database {DATABASE}" - with GraphDatabase.driver(URI, auth=(USERNAME, PASSWORD)) as _sync_driver: + with GraphDatabase.driver( + URI, + auth=(USERNAME, PASSWORD), + max_connection_pool_size=MAX_CONNECTION_POOL_SIZE, + ) as _sync_driver: try: with _sync_driver.session(database=DATABASE) as session: try: