From a642bb31904624b4317c2a15a2ee953b33825d01 Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 25 Feb 2025 12:08:49 +0800 Subject: [PATCH] refactor: use shared manager from main process for storage implementations. --- lightrag/kg/faiss_impl.py | 7 +++---- lightrag/kg/json_doc_status_impl.py | 4 ++-- lightrag/kg/json_kv_impl.py | 4 ++-- lightrag/kg/nano_vector_db_impl.py | 5 ++--- lightrag/kg/networkx_impl.py | 5 ++--- 5 files changed, 11 insertions(+), 14 deletions(-) diff --git a/lightrag/kg/faiss_impl.py b/lightrag/kg/faiss_impl.py index 4324e965..2e129472 100644 --- a/lightrag/kg/faiss_impl.py +++ b/lightrag/kg/faiss_impl.py @@ -3,13 +3,12 @@ import time import asyncio from typing import Any, final import threading -from multiprocessing import Manager - import json import numpy as np from dataclasses import dataclass import pipmaster as pm +from lightrag.api.utils_api import manager as main_process_manager from lightrag.utils import ( logger, @@ -22,7 +21,7 @@ from lightrag.base import ( if not pm.is_installed("faiss"): pm.install("faiss") -import faiss +import faiss # type: ignore # Global variables for shared memory management _init_lock = threading.Lock() @@ -37,7 +36,7 @@ def _get_manager(): with _init_lock: if _manager is None: try: - _manager = Manager() + _manager = main_process_manager _shared_indices = _manager.dict() _shared_meta = _manager.dict() except Exception as e: diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index 431e340c..dd3a7b64 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -2,7 +2,6 @@ from dataclasses import dataclass import os from typing import Any, Union, final import threading -from multiprocessing import Manager from lightrag.base import ( DocProcessingStatus, @@ -14,6 +13,7 @@ from lightrag.utils import ( logger, write_json, ) +from lightrag.api.utils_api import manager as main_process_manager # Global variables for shared memory management _init_lock = threading.Lock() @@ -27,7 +27,7 @@ def _get_manager(): with _init_lock: if _manager is None: try: - _manager = Manager() + _manager = main_process_manager _shared_doc_status_data = _manager.dict() except Exception as e: logger.error(f"Failed to initialize shared memory manager: {e}") diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index f03fda63..f5a8b488 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -3,7 +3,6 @@ import os from dataclasses import dataclass from typing import Any, final import threading -from multiprocessing import Manager from lightrag.base import ( BaseKVStorage, @@ -13,6 +12,7 @@ from lightrag.utils import ( logger, write_json, ) +from lightrag.api.utils_api import manager as main_process_manager # Global variables for shared memory management _init_lock = threading.Lock() @@ -26,7 +26,7 @@ def _get_manager(): with _init_lock: if _manager is None: try: - _manager = Manager() + _manager = main_process_manager _shared_kv_data = _manager.dict() except Exception as e: logger.error(f"Failed to initialize shared memory manager: {e}") diff --git a/lightrag/kg/nano_vector_db_impl.py b/lightrag/kg/nano_vector_db_impl.py index d68b7f42..7c15142e 100644 --- a/lightrag/kg/nano_vector_db_impl.py +++ b/lightrag/kg/nano_vector_db_impl.py @@ -4,14 +4,13 @@ from typing import Any, final from dataclasses import dataclass import numpy as np import threading -from multiprocessing import Manager - import time from lightrag.utils import ( logger, compute_mdhash_id, ) +from lightrag.api.utils_api import manager as main_process_manager import pipmaster as pm from lightrag.base import ( BaseVectorStorage, @@ -34,7 +33,7 @@ def _get_manager(): with _init_lock: if _manager is None: try: - _manager = Manager() + _manager = main_process_manager _shared_vector_clients = _manager.dict() except Exception as e: logger.error(f"Failed to initialize shared memory manager: {e}") diff --git a/lightrag/kg/networkx_impl.py b/lightrag/kg/networkx_impl.py index c88d1c59..f3dd92dc 100644 --- a/lightrag/kg/networkx_impl.py +++ b/lightrag/kg/networkx_impl.py @@ -2,14 +2,13 @@ import os from dataclasses import dataclass from typing import Any, final import threading -from multiprocessing import Manager - import numpy as np from lightrag.types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge from lightrag.utils import ( logger, ) +from lightrag.api.utils_api import manager as main_process_manager from lightrag.base import ( BaseGraphStorage, @@ -37,7 +36,7 @@ def _get_manager(): with _init_lock: if _manager is None: try: - _manager = Manager() + _manager = main_process_manager _shared_graphs = _manager.dict() except Exception as e: logger.error(f"Failed to initialize shared memory manager: {e}")