Refactor storage initialization to avoid redundant intitial data loads across processes, show init logs to first load only
This commit is contained in:
@@ -57,16 +57,16 @@ class FaissVectorDBStorage(BaseVectorStorage):
|
||||
# If you have a large number of vectors, you might want IVF or other indexes.
|
||||
# For demonstration, we use a simple IndexFlatIP.
|
||||
self._index.value = faiss.IndexFlatIP(self._dim)
|
||||
# Keep a local store for metadata, IDs, etc.
|
||||
# Maps <int faiss_id> → metadata (including your original ID).
|
||||
self._id_to_meta.update({})
|
||||
# Attempt to load an existing index + metadata from disk
|
||||
self._load_faiss_index()
|
||||
else:
|
||||
if self._index is None:
|
||||
self._index = faiss.IndexFlatIP(self._dim)
|
||||
|
||||
# Keep a local store for metadata, IDs, etc.
|
||||
# Maps <int faiss_id> → metadata (including your original ID).
|
||||
self._id_to_meta.update({})
|
||||
|
||||
# Attempt to load an existing index + metadata from disk
|
||||
self._load_faiss_index()
|
||||
self._id_to_meta.update({})
|
||||
self._load_faiss_index()
|
||||
|
||||
|
||||
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
|
||||
|
@@ -26,8 +26,9 @@ class JsonDocStatusStorage(DocStatusStorage):
|
||||
self._storage_lock = get_storage_lock()
|
||||
self._data = get_namespace_data(self.namespace)
|
||||
with self._storage_lock:
|
||||
self._data.update(load_json(self._file_name) or {})
|
||||
logger.info(f"Loaded document status storage with {len(self._data)} records")
|
||||
if not self._data:
|
||||
self._data.update(load_json(self._file_name) or {})
|
||||
logger.info(f"Loaded document status storage with {len(self._data)} records")
|
||||
|
||||
async def filter_keys(self, keys: set[str]) -> set[str]:
|
||||
"""Return keys that should be processed (not in storage or not successfully processed)"""
|
||||
|
@@ -18,13 +18,13 @@ from .shared_storage import get_namespace_data, get_storage_lock
|
||||
class JsonKVStorage(BaseKVStorage):
|
||||
def __post_init__(self):
|
||||
working_dir = self.global_config["working_dir"]
|
||||
self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json")
|
||||
self._storage_lock = get_storage_lock()
|
||||
self._data = get_namespace_data(self.namespace)
|
||||
with self._storage_lock:
|
||||
if not self._data:
|
||||
self._file_name = os.path.join(working_dir, f"kv_store_{self.namespace}.json")
|
||||
self._data: dict[str, Any] = load_json(self._file_name) or {}
|
||||
logger.info(f"Load KV {self.namespace} with {len(self._data)} data")
|
||||
logger.info(f"Load KV {self.namespace} with {len(self._data)} data")
|
||||
|
||||
|
||||
async def index_done_callback(self) -> None:
|
||||
|
@@ -47,14 +47,14 @@ class NanoVectorDBStorage(BaseVectorStorage):
|
||||
if self._client.value is None:
|
||||
self._client.value = NanoVectorDB(
|
||||
self.embedding_func.embedding_dim, storage_file=self._client_file_name
|
||||
)
|
||||
)
|
||||
logger.info(f"Initialized vector DB client for namespace {self.namespace}")
|
||||
else:
|
||||
if self._client is None:
|
||||
self._client = NanoVectorDB(
|
||||
self.embedding_func.embedding_dim, storage_file=self._client_file_name
|
||||
)
|
||||
|
||||
logger.info(f"Initialized vector DB client for namespace {self.namespace}")
|
||||
logger.info(f"Initialized vector DB client for namespace {self.namespace}")
|
||||
|
||||
def _get_client(self):
|
||||
"""Get the appropriate client instance based on multiprocess mode"""
|
||||
|
Reference in New Issue
Block a user