diff --git a/README.md b/README.md index f66fb3ce..6c981d92 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@
diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index b94ff821..9905ee74 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -323,7 +323,7 @@ class LightRAG: ) async def ainsert( - self, string_or_strings, split_by_character, split_by_character_only + self, string_or_strings, split_by_character=None, split_by_character_only=False ): """Insert documents with checkpoint support @@ -466,6 +466,73 @@ class LightRAG: # Ensure all indexes are updated after each document await self._insert_done() + def insert_custom_chunks(self, full_text: str, text_chunks: list[str]): + loop = always_get_an_event_loop() + return loop.run_until_complete( + self.ainsert_custom_chunks(full_text, text_chunks) + ) + + async def ainsert_custom_chunks(self, full_text: str, text_chunks: list[str]): + update_storage = False + try: + doc_key = compute_mdhash_id(full_text.strip(), prefix="doc-") + new_docs = {doc_key: {"content": full_text.strip()}} + + _add_doc_keys = await self.full_docs.filter_keys([doc_key]) + new_docs = {k: v for k, v in new_docs.items() if k in _add_doc_keys} + if not len(new_docs): + logger.warning("This document is already in the storage.") + return + + update_storage = True + logger.info(f"[New Docs] inserting {len(new_docs)} docs") + + inserting_chunks = {} + for chunk_text in text_chunks: + chunk_text_stripped = chunk_text.strip() + chunk_key = compute_mdhash_id(chunk_text_stripped, prefix="chunk-") + + inserting_chunks[chunk_key] = { + "content": chunk_text_stripped, + "full_doc_id": doc_key, + } + + _add_chunk_keys = await self.text_chunks.filter_keys( + list(inserting_chunks.keys()) + ) + inserting_chunks = { + k: v for k, v in inserting_chunks.items() if k in _add_chunk_keys + } + if not len(inserting_chunks): + logger.warning("All chunks are already in the storage.") + return + + logger.info(f"[New Chunks] inserting {len(inserting_chunks)} chunks") + + await self.chunks_vdb.upsert(inserting_chunks) + + logger.info("[Entity Extraction]...") + maybe_new_kg = await extract_entities( + inserting_chunks, + knowledge_graph_inst=self.chunk_entity_relation_graph, + entity_vdb=self.entities_vdb, + relationships_vdb=self.relationships_vdb, + global_config=asdict(self), + ) + + if maybe_new_kg is None: + logger.warning("No new entities and relationships found") + return + else: + self.chunk_entity_relation_graph = maybe_new_kg + + await self.full_docs.upsert(new_docs) + await self.text_chunks.upsert(inserting_chunks) + + finally: + if update_storage: + await self._insert_done() + async def _insert_done(self): tasks = [] for storage_inst in [ diff --git a/lightrag/operate.py b/lightrag/operate.py index 58ae3703..09871659 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -4,7 +4,6 @@ import re from tqdm.asyncio import tqdm as tqdm_async from typing import Union from collections import Counter, defaultdict -import warnings from .utils import ( logger, clean_str, @@ -611,15 +610,22 @@ async def kg_query( logger.warning("low_level_keywords and high_level_keywords is empty") return PROMPTS["fail_response"] if ll_keywords == [] and query_param.mode in ["local", "hybrid"]: - logger.warning("low_level_keywords is empty") - return PROMPTS["fail_response"] - else: - ll_keywords = ", ".join(ll_keywords) + logger.warning( + "low_level_keywords is empty, switching from %s mode to global mode", + query_param.mode, + ) + query_param.mode = "global" if hl_keywords == [] and query_param.mode in ["global", "hybrid"]: - logger.warning("high_level_keywords is empty") - return PROMPTS["fail_response"] - else: - hl_keywords = ", ".join(hl_keywords) + logger.warning( + "high_level_keywords is empty, switching from %s mode to local mode", + query_param.mode, + ) + query_param.mode = "local" + + ll_keywords = ", ".join(ll_keywords) if ll_keywords else "" + hl_keywords = ", ".join(hl_keywords) if hl_keywords else "" + + logger.info("Using %s mode for query processing", query_param.mode) # Build context keywords = [ll_keywords, hl_keywords] @@ -685,78 +691,52 @@ async def _build_query_context( # ll_entities_context, ll_relations_context, ll_text_units_context = "", "", "" # hl_entities_context, hl_relations_context, hl_text_units_context = "", "", "" - ll_kewwords, hl_keywrds = query[0], query[1] - if query_param.mode in ["local", "hybrid"]: - if ll_kewwords == "": - ll_entities_context, ll_relations_context, ll_text_units_context = ( - "", - "", - "", - ) - warnings.warn( - "Low Level context is None. Return empty Low entity/relationship/source" - ) - query_param.mode = "global" - else: - ( - ll_entities_context, - ll_relations_context, - ll_text_units_context, - ) = await _get_node_data( - ll_kewwords, - knowledge_graph_inst, - entities_vdb, - text_chunks_db, - query_param, - ) - if query_param.mode in ["global", "hybrid"]: - if hl_keywrds == "": - hl_entities_context, hl_relations_context, hl_text_units_context = ( - "", - "", - "", - ) - warnings.warn( - "High Level context is None. Return empty High entity/relationship/source" - ) - query_param.mode = "local" - else: - ( - hl_entities_context, - hl_relations_context, - hl_text_units_context, - ) = await _get_edge_data( - hl_keywrds, - knowledge_graph_inst, - relationships_vdb, - text_chunks_db, - query_param, - ) - if ( - hl_entities_context == "" - and hl_relations_context == "" - and hl_text_units_context == "" - ): - logger.warn("No high level context found. Switching to local mode.") - query_param.mode = "local" - if query_param.mode == "hybrid": + ll_keywords, hl_keywords = query[0], query[1] + + if query_param.mode == "local": + entities_context, relations_context, text_units_context = await _get_node_data( + ll_keywords, + knowledge_graph_inst, + entities_vdb, + text_chunks_db, + query_param, + ) + elif query_param.mode == "global": + entities_context, relations_context, text_units_context = await _get_edge_data( + hl_keywords, + knowledge_graph_inst, + relationships_vdb, + text_chunks_db, + query_param, + ) + else: # hybrid mode + ( + ll_entities_context, + ll_relations_context, + ll_text_units_context, + ) = await _get_node_data( + ll_keywords, + knowledge_graph_inst, + entities_vdb, + text_chunks_db, + query_param, + ) + ( + hl_entities_context, + hl_relations_context, + hl_text_units_context, + ) = await _get_edge_data( + hl_keywords, + knowledge_graph_inst, + relationships_vdb, + text_chunks_db, + query_param, + ) entities_context, relations_context, text_units_context = combine_contexts( [hl_entities_context, ll_entities_context], [hl_relations_context, ll_relations_context], [hl_text_units_context, ll_text_units_context], ) - elif query_param.mode == "local": - entities_context, relations_context, text_units_context = ( - ll_entities_context, - ll_relations_context, - ll_text_units_context, - ) - elif query_param.mode == "global": - entities_context, relations_context, text_units_context = ( - hl_entities_context, - hl_relations_context, - hl_text_units_context, - ) return f""" -----Entities----- ```csv