Merge pull request #581 from jiabin2wang20230918/fix_mongo

Fix for mongoDB document 16MB limit and add os env NEO4J_MAX_CONNECTION_POOL_SIZE for neo4j
This commit is contained in:
zrguo
2025-01-13 18:38:22 +08:00
committed by GitHub
2 changed files with 34 additions and 5 deletions

View File

@@ -2,7 +2,7 @@ import os
from tqdm.asyncio import tqdm as tqdm_async from tqdm.asyncio import tqdm as tqdm_async
from dataclasses import dataclass from dataclasses import dataclass
from pymongo import MongoClient from pymongo import MongoClient
from typing import Union
from lightrag.utils import logger from lightrag.utils import logger
from lightrag.base import BaseKVStorage from lightrag.base import BaseKVStorage
@@ -41,11 +41,35 @@ class MongoKVStorage(BaseKVStorage):
return set([s for s in data if s not in existing_ids]) return set([s for s in data if s not in existing_ids])
async def upsert(self, data: dict[str, dict]): async def upsert(self, data: dict[str, dict]):
if self.namespace == "llm_response_cache":
for mode, items in data.items():
for k, v in tqdm_async(items.items(), desc="Upserting"):
key = f"{mode}_{k}"
result = self._data.update_one(
{"_id": key}, {"$setOnInsert": v}, upsert=True
)
if result.upserted_id:
logger.debug(f"\nInserted new document with key: {key}")
data[mode][k]["_id"] = key
else:
for k, v in tqdm_async(data.items(), desc="Upserting"): for k, v in tqdm_async(data.items(), desc="Upserting"):
self._data.update_one({"_id": k}, {"$set": v}, upsert=True) self._data.update_one({"_id": k}, {"$set": v}, upsert=True)
data[k]["_id"] = k data[k]["_id"] = k
return data return data
async def get_by_mode_and_id(self, mode: str, id: str) -> Union[dict, None]:
if "llm_response_cache" == self.namespace:
res = {}
v = self._data.find_one({"_id": mode + "_" + id})
if v:
res[id] = v
logger.debug(f"llm_response_cache find one by:{id}")
return res
else:
return None
else:
return None
async def drop(self): async def drop(self):
""" """ """ """
pass pass

View File

@@ -39,6 +39,7 @@ class Neo4JStorage(BaseGraphStorage):
URI = os.environ["NEO4J_URI"] URI = os.environ["NEO4J_URI"]
USERNAME = os.environ["NEO4J_USERNAME"] USERNAME = os.environ["NEO4J_USERNAME"]
PASSWORD = os.environ["NEO4J_PASSWORD"] PASSWORD = os.environ["NEO4J_PASSWORD"]
MAX_CONNECTION_POOL_SIZE = os.environ.get("NEO4J_MAX_CONNECTION_POOL_SIZE", 800)
DATABASE = os.environ.get( DATABASE = os.environ.get(
"NEO4J_DATABASE" "NEO4J_DATABASE"
) # If this param is None, the home database will be used. If it is not None, the specified database will be used. ) # If this param is None, the home database will be used. If it is not None, the specified database will be used.
@@ -47,7 +48,11 @@ class Neo4JStorage(BaseGraphStorage):
URI, auth=(USERNAME, PASSWORD) URI, auth=(USERNAME, PASSWORD)
) )
_database_name = "home database" if DATABASE is None else f"database {DATABASE}" _database_name = "home database" if DATABASE is None else f"database {DATABASE}"
with GraphDatabase.driver(URI, auth=(USERNAME, PASSWORD)) as _sync_driver: with GraphDatabase.driver(
URI,
auth=(USERNAME, PASSWORD),
max_connection_pool_size=MAX_CONNECTION_POOL_SIZE,
) as _sync_driver:
try: try:
with _sync_driver.session(database=DATABASE) as session: with _sync_driver.session(database=DATABASE) as session:
try: try: