From 88d691deb9190bf27d1021d5fcc2294dab18afc0 Mon Sep 17 00:00:00 2001 From: ArnoChen Date: Fri, 7 Feb 2025 23:04:29 +0800 Subject: [PATCH 1/3] add namespace prefix to storage namespaces --- lightrag/api/lightrag_server.py | 16 ++++++++++++++- lightrag/api/ollama_api.py | 2 +- lightrag/kg/mongo_impl.py | 4 ++-- lightrag/kg/oracle_impl.py | 18 +++++++++-------- lightrag/kg/postgres_impl.py | 26 +++++++++++++----------- lightrag/kg/tidb_impl.py | 16 ++++++++------- lightrag/lightrag.py | 35 +++++++++++++++++---------------- 7 files changed, 69 insertions(+), 48 deletions(-) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index af413f58..836411d1 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -40,7 +40,7 @@ from .ollama_api import ( from .ollama_api import ollama_server_infos # Load environment variables -load_dotenv() +load_dotenv(override=True) class RAGStorageConfig: @@ -532,6 +532,16 @@ def parse_args() -> argparse.Namespace: help="Number of conversation history turns to include (default: from env or 3)", ) + # Namespace + parser.add_argument( + "--namespace-prefix", + type=str, + default=get_env_value( + "NAMESPACE_PREFIX", "" + ), + help="Prefix of the namespace", + ) + args = parser.parse_args() ollama_server_infos.LIGHTRAG_MODEL = args.simulated_model_name @@ -861,6 +871,8 @@ def create_app(args): "similarity_threshold": 0.95, "use_llm_check": False, }, + log_level=args.log_level, + namespace_prefix=args.namespace_prefix, ) else: rag = LightRAG( @@ -890,6 +902,8 @@ def create_app(args): "similarity_threshold": 0.95, "use_llm_check": False, }, + log_level=args.log_level, + namespace_prefix=args.namespace_prefix, ) async def index_file(file_path: Union[str, Path]) -> None: diff --git a/lightrag/api/ollama_api.py b/lightrag/api/ollama_api.py index edf97993..0d96e16d 100644 --- a/lightrag/api/ollama_api.py +++ b/lightrag/api/ollama_api.py @@ -15,7 +15,7 @@ from dotenv import load_dotenv # Load environment variables -load_dotenv() +load_dotenv(override=True) class OllamaServerInfos: diff --git a/lightrag/kg/mongo_impl.py b/lightrag/kg/mongo_impl.py index 1c9ce50e..7cef8c0f 100644 --- a/lightrag/kg/mongo_impl.py +++ b/lightrag/kg/mongo_impl.py @@ -52,7 +52,7 @@ class MongoKVStorage(BaseKVStorage): return set([s for s in data if s not in existing_ids]) async def upsert(self, data: dict[str, dict]): - if self.namespace == "llm_response_cache": + if self.namespace.endswith("llm_response_cache"): for mode, items in data.items(): for k, v in tqdm_async(items.items(), desc="Upserting"): key = f"{mode}_{k}" @@ -69,7 +69,7 @@ class MongoKVStorage(BaseKVStorage): return data async def get_by_mode_and_id(self, mode: str, id: str) -> Union[dict, None]: - if "llm_response_cache" == self.namespace: + if self.namespace.endswith("llm_response_cache"): res = {} v = self._data.find_one({"_id": mode + "_" + id}) if v: diff --git a/lightrag/kg/oracle_impl.py b/lightrag/kg/oracle_impl.py index 368e8618..8af01c47 100644 --- a/lightrag/kg/oracle_impl.py +++ b/lightrag/kg/oracle_impl.py @@ -185,7 +185,7 @@ class OracleKVStorage(BaseKVStorage): SQL = SQL_TEMPLATES["get_by_id_" + self.namespace] params = {"workspace": self.db.workspace, "id": id} # print("get_by_id:"+SQL) - if "llm_response_cache" == self.namespace: + if self.namespace.endswith("llm_response_cache"): array_res = await self.db.query(SQL, params, multirows=True) res = {} for row in array_res: @@ -201,7 +201,7 @@ class OracleKVStorage(BaseKVStorage): """Specifically for llm_response_cache.""" SQL = SQL_TEMPLATES["get_by_mode_id_" + self.namespace] params = {"workspace": self.db.workspace, "cache_mode": mode, "id": id} - if "llm_response_cache" == self.namespace: + if self.namespace.endswith("llm_response_cache"): array_res = await self.db.query(SQL, params, multirows=True) res = {} for row in array_res: @@ -218,7 +218,7 @@ class OracleKVStorage(BaseKVStorage): params = {"workspace": self.db.workspace} # print("get_by_ids:"+SQL) res = await self.db.query(SQL, params, multirows=True) - if "llm_response_cache" == self.namespace: + if self.namespace.endswith("llm_response_cache"): modes = set() dict_res: dict[str, dict] = {} for row in res: @@ -269,7 +269,7 @@ class OracleKVStorage(BaseKVStorage): ################ INSERT METHODS ################ async def upsert(self, data: dict[str, dict]): - if self.namespace == "text_chunks": + if self.namespace.endswith("text_chunks"): list_data = [ { "id": k, @@ -302,7 +302,7 @@ class OracleKVStorage(BaseKVStorage): "status": item["status"], } await self.db.execute(merge_sql, _data) - if self.namespace == "full_docs": + if self.namespace.endswith("full_docs"): for k, v in data.items(): # values.clear() merge_sql = SQL_TEMPLATES["merge_doc_full"] @@ -313,7 +313,7 @@ class OracleKVStorage(BaseKVStorage): } await self.db.execute(merge_sql, _data) - if self.namespace == "llm_response_cache": + if self.namespace.endswith("llm_response_cache"): for mode, items in data.items(): for k, v in items.items(): upsert_sql = SQL_TEMPLATES["upsert_llm_response_cache"] @@ -334,8 +334,10 @@ class OracleKVStorage(BaseKVStorage): await self.db.execute(SQL, params) async def index_done_callback(self): - if self.namespace in ["full_docs", "text_chunks"]: - logger.info("full doc and chunk data had been saved into oracle db!") + for n in ("full_docs", "text_chunks"): + if self.namespace.endswith(n): + logger.info("full doc and chunk data had been saved into oracle db!") + break @dataclass diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index ba91c94a..2f6d1bc4 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -187,7 +187,7 @@ class PGKVStorage(BaseKVStorage): """Get doc_full data by id.""" sql = SQL_TEMPLATES["get_by_id_" + self.namespace] params = {"workspace": self.db.workspace, "id": id} - if "llm_response_cache" == self.namespace: + if self.namespace.endswith("llm_response_cache"): array_res = await self.db.query(sql, params, multirows=True) res = {} for row in array_res: @@ -203,7 +203,7 @@ class PGKVStorage(BaseKVStorage): """Specifically for llm_response_cache.""" sql = SQL_TEMPLATES["get_by_mode_id_" + self.namespace] params = {"workspace": self.db.workspace, mode: mode, "id": id} - if "llm_response_cache" == self.namespace: + if self.namespace.endswith("llm_response_cache"): array_res = await self.db.query(sql, params, multirows=True) res = {} for row in array_res: @@ -219,7 +219,7 @@ class PGKVStorage(BaseKVStorage): ids=",".join([f"'{id}'" for id in ids]) ) params = {"workspace": self.db.workspace} - if "llm_response_cache" == self.namespace: + if self.namespace.endswith("llm_response_cache"): array_res = await self.db.query(sql, params, multirows=True) modes = set() dict_res: dict[str, dict] = {} @@ -239,7 +239,7 @@ class PGKVStorage(BaseKVStorage): return None async def all_keys(self) -> list[dict]: - if "llm_response_cache" == self.namespace: + if self.namespace.endswith("llm_response_cache"): sql = "select workspace,mode,id from lightrag_llm_cache" res = await self.db.query(sql, multirows=True) return res @@ -270,9 +270,9 @@ class PGKVStorage(BaseKVStorage): ################ INSERT METHODS ################ async def upsert(self, data: Dict[str, dict]): - if self.namespace == "text_chunks": + if self.namespace.endswith("text_chunks"): pass - elif self.namespace == "full_docs": + elif self.namespace.endswith("full_docs"): for k, v in data.items(): upsert_sql = SQL_TEMPLATES["upsert_doc_full"] _data = { @@ -281,7 +281,7 @@ class PGKVStorage(BaseKVStorage): "workspace": self.db.workspace, } await self.db.execute(upsert_sql, _data) - elif self.namespace == "llm_response_cache": + elif self.namespace.endswith("llm_response_cache"): for mode, items in data.items(): for k, v in items.items(): upsert_sql = SQL_TEMPLATES["upsert_llm_response_cache"] @@ -296,8 +296,10 @@ class PGKVStorage(BaseKVStorage): await self.db.execute(upsert_sql, _data) async def index_done_callback(self): - if self.namespace in ["full_docs", "text_chunks"]: - logger.info("full doc and chunk data had been saved into postgresql db!") + for n in ("full_docs", "text_chunks"): + if self.namespace.endswith(n): + logger.info("full doc and chunk data had been saved into postgresql db!") + break @dataclass @@ -389,11 +391,11 @@ class PGVectorStorage(BaseVectorStorage): for i, d in enumerate(list_data): d["__vector__"] = embeddings[i] for item in list_data: - if self.namespace == "chunks": + if self.namespace.endswith("chunks"): upsert_sql, data = self._upsert_chunks(item) - elif self.namespace == "entities": + elif self.namespace.endswith("entities"): upsert_sql, data = self._upsert_entities(item) - elif self.namespace == "relationships": + elif self.namespace.endswith("relationships"): upsert_sql, data = self._upsert_relationships(item) else: raise ValueError(f"{self.namespace} is not supported") diff --git a/lightrag/kg/tidb_impl.py b/lightrag/kg/tidb_impl.py index 0579a57c..548ca8e0 100644 --- a/lightrag/kg/tidb_impl.py +++ b/lightrag/kg/tidb_impl.py @@ -160,7 +160,7 @@ class TiDBKVStorage(BaseKVStorage): async def upsert(self, data: dict[str, dict]): left_data = {k: v for k, v in data.items() if k not in self._data} self._data.update(left_data) - if self.namespace == "text_chunks": + if self.namespace.endswith("text_chunks"): list_data = [ { "__id__": k, @@ -196,7 +196,7 @@ class TiDBKVStorage(BaseKVStorage): ) await self.db.execute(merge_sql, data) - if self.namespace == "full_docs": + if self.namespace.endswith("full_docs"): merge_sql = SQL_TEMPLATES["upsert_doc_full"] data = [] for k, v in self._data.items(): @@ -211,8 +211,10 @@ class TiDBKVStorage(BaseKVStorage): return left_data async def index_done_callback(self): - if self.namespace in ["full_docs", "text_chunks"]: - logger.info("full doc and chunk data had been saved into TiDB db!") + for n in ("full_docs", "text_chunks"): + if self.namespace.endswith(n): + logger.info("full doc and chunk data had been saved into TiDB db!") + break @dataclass @@ -258,7 +260,7 @@ class TiDBVectorDBStorage(BaseVectorStorage): if not len(data): logger.warning("You insert an empty data to vector DB") return [] - if self.namespace == "chunks": + if self.namespace.endswith("chunks"): return [] logger.info(f"Inserting {len(data)} vectors to {self.namespace}") @@ -288,7 +290,7 @@ class TiDBVectorDBStorage(BaseVectorStorage): for i, d in enumerate(list_data): d["content_vector"] = embeddings[i] - if self.namespace == "entities": + if self.namespace.endswith("entities"): data = [] for item in list_data: param = { @@ -309,7 +311,7 @@ class TiDBVectorDBStorage(BaseVectorStorage): merge_sql = SQL_TEMPLATES["insert_entity"] await self.db.execute(merge_sql, data) - elif self.namespace == "relationships": + elif self.namespace.endswith("relationships"): data = [] for item in list_data: param = { diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 420b82eb..55327df2 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -167,6 +167,7 @@ class LightRAG: # storage vector_db_storage_cls_kwargs: dict = field(default_factory=dict) + namespace_prefix: str = field(default="") enable_llm_cache: bool = True # Sometimes there are some reason the LLM failed at Extracting Entities, and we want to continue without LLM cost, we can use this flag @@ -228,12 +229,12 @@ class LightRAG: ) self.json_doc_status_storage = self.key_string_value_json_storage_cls( - namespace="json_doc_status_storage", + namespace=self.namespace_prefix + "json_doc_status_storage", embedding_func=None, ) self.llm_response_cache = self.key_string_value_json_storage_cls( - namespace="llm_response_cache", + namespace=self.namespace_prefix + "llm_response_cache", embedding_func=self.embedding_func, ) @@ -241,15 +242,15 @@ class LightRAG: # add embedding func by walter #### self.full_docs = self.key_string_value_json_storage_cls( - namespace="full_docs", + namespace=self.namespace_prefix + "full_docs", embedding_func=self.embedding_func, ) self.text_chunks = self.key_string_value_json_storage_cls( - namespace="text_chunks", + namespace=self.namespace_prefix + "text_chunks", embedding_func=self.embedding_func, ) self.chunk_entity_relation_graph = self.graph_storage_cls( - namespace="chunk_entity_relation", + namespace=self.namespace_prefix + "chunk_entity_relation", embedding_func=self.embedding_func, ) #### @@ -257,17 +258,17 @@ class LightRAG: #### self.entities_vdb = self.vector_db_storage_cls( - namespace="entities", + namespace=self.namespace_prefix + "entities", embedding_func=self.embedding_func, meta_fields={"entity_name"}, ) self.relationships_vdb = self.vector_db_storage_cls( - namespace="relationships", + namespace=self.namespace_prefix + "relationships", embedding_func=self.embedding_func, meta_fields={"src_id", "tgt_id"}, ) self.chunks_vdb = self.vector_db_storage_cls( - namespace="chunks", + namespace=self.namespace_prefix + "chunks", embedding_func=self.embedding_func, ) @@ -277,7 +278,7 @@ class LightRAG: hashing_kv = self.llm_response_cache else: hashing_kv = self.key_string_value_json_storage_cls( - namespace="llm_response_cache", + namespace=self.namespace_prefix + "llm_response_cache", embedding_func=self.embedding_func, ) @@ -292,7 +293,7 @@ class LightRAG: # Initialize document status storage self.doc_status_storage_cls = self._get_storage_class(self.doc_status_storage) self.doc_status = self.doc_status_storage_cls( - namespace="doc_status", + namespace=self.namespace_prefix + "doc_status", global_config=global_config, embedding_func=None, ) @@ -928,7 +929,7 @@ class LightRAG: if self.llm_response_cache and hasattr(self.llm_response_cache, "global_config") else self.key_string_value_json_storage_cls( - namespace="llm_response_cache", + namespace=self.namespace_prefix + "llm_response_cache", global_config=asdict(self), embedding_func=self.embedding_func, ), @@ -945,7 +946,7 @@ class LightRAG: if self.llm_response_cache and hasattr(self.llm_response_cache, "global_config") else self.key_string_value_json_storage_cls( - namespace="llm_response_cache", + namespace=self.namespace_prefix + "llm_response_cache", global_config=asdict(self), embedding_func=self.embedding_func, ), @@ -964,7 +965,7 @@ class LightRAG: if self.llm_response_cache and hasattr(self.llm_response_cache, "global_config") else self.key_string_value_json_storage_cls( - namespace="llm_response_cache", + namespace=self.namespace_prefix + "llm_response_cache", global_config=asdict(self), embedding_func=self.embedding_func, ), @@ -1005,7 +1006,7 @@ class LightRAG: global_config=asdict(self), hashing_kv=self.llm_response_cache or self.key_string_value_json_storage_cls( - namespace="llm_response_cache", + namespace=self.namespace_prefix + "llm_response_cache", global_config=asdict(self), embedding_func=self.embedding_func, ), @@ -1036,7 +1037,7 @@ class LightRAG: if self.llm_response_cache and hasattr(self.llm_response_cache, "global_config") else self.key_string_value_json_storage_cls( - namespace="llm_response_cache", + namespace=self.namespace_prefix + "llm_response_cache", global_config=asdict(self), embedding_func=self.embedding_funcne, ), @@ -1052,7 +1053,7 @@ class LightRAG: if self.llm_response_cache and hasattr(self.llm_response_cache, "global_config") else self.key_string_value_json_storage_cls( - namespace="llm_response_cache", + namespace=self.namespace_prefix + "llm_response_cache", global_config=asdict(self), embedding_func=self.embedding_func, ), @@ -1071,7 +1072,7 @@ class LightRAG: if self.llm_response_cache and hasattr(self.llm_response_cache, "global_config") else self.key_string_value_json_storage_cls( - namespace="llm_response_cache", + namespace=self.namespace_prefix + "llm_response_cache", global_config=asdict(self), embedding_func=self.embedding_func, ), From f974bf39bb08b29aab9b81ae3d88922413641335 Mon Sep 17 00:00:00 2001 From: ArnoChen Date: Fri, 7 Feb 2025 23:13:28 +0800 Subject: [PATCH 2/3] format format --- lightrag/api/lightrag_server.py | 4 +--- lightrag/kg/postgres_impl.py | 4 +++- lightrag/kg/tidb_impl.py | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 836411d1..23507dd1 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -536,9 +536,7 @@ def parse_args() -> argparse.Namespace: parser.add_argument( "--namespace-prefix", type=str, - default=get_env_value( - "NAMESPACE_PREFIX", "" - ), + default=get_env_value("NAMESPACE_PREFIX", ""), help="Prefix of the namespace", ) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 2f6d1bc4..a031a0c3 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -298,7 +298,9 @@ class PGKVStorage(BaseKVStorage): async def index_done_callback(self): for n in ("full_docs", "text_chunks"): if self.namespace.endswith(n): - logger.info("full doc and chunk data had been saved into postgresql db!") + logger.info( + "full doc and chunk data had been saved into postgresql db!" + ) break diff --git a/lightrag/kg/tidb_impl.py b/lightrag/kg/tidb_impl.py index 548ca8e0..4a21d067 100644 --- a/lightrag/kg/tidb_impl.py +++ b/lightrag/kg/tidb_impl.py @@ -190,7 +190,7 @@ class TiDBKVStorage(BaseKVStorage): "tokens": item["tokens"], "chunk_order_index": item["chunk_order_index"], "full_doc_id": item["full_doc_id"], - "content_vector": f"{item["__vector__"].tolist()}", + "content_vector": f"{item['__vector__'].tolist()}", "workspace": self.db.workspace, } ) @@ -297,7 +297,7 @@ class TiDBVectorDBStorage(BaseVectorStorage): "id": item["id"], "name": item["entity_name"], "content": item["content"], - "content_vector": f"{item["content_vector"].tolist()}", + "content_vector": f"{item['content_vector'].tolist()}", "workspace": self.db.workspace, } # update entity_id if node inserted by graph_storage_instance before @@ -319,7 +319,7 @@ class TiDBVectorDBStorage(BaseVectorStorage): "source_name": item["src_id"], "target_name": item["tgt_id"], "content": item["content"], - "content_vector": f"{item["content_vector"].tolist()}", + "content_vector": f"{item['content_vector'].tolist()}", "workspace": self.db.workspace, } # update relation_id if node inserted by graph_storage_instance before From b2b8cf0aa440b1cb32bc9bf477b0bd2bd4f450a1 Mon Sep 17 00:00:00 2001 From: ArnoChen Date: Sat, 8 Feb 2025 13:56:12 +0800 Subject: [PATCH 3/3] remove unused json_doc_status_storage --- lightrag/lightrag.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 55327df2..abc9390c 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -228,11 +228,6 @@ class LightRAG: self.graph_storage_cls, global_config=global_config ) - self.json_doc_status_storage = self.key_string_value_json_storage_cls( - namespace=self.namespace_prefix + "json_doc_status_storage", - embedding_func=None, - ) - self.llm_response_cache = self.key_string_value_json_storage_cls( namespace=self.namespace_prefix + "llm_response_cache", embedding_func=self.embedding_func, @@ -253,6 +248,7 @@ class LightRAG: namespace=self.namespace_prefix + "chunk_entity_relation", embedding_func=self.embedding_func, ) + #### # add embedding func by walter over ####