diff --git a/lightrag/api/__init__.py b/lightrag/api/__init__.py index 97955f02..608fbe26 100644 --- a/lightrag/api/__init__.py +++ b/lightrag/api/__init__.py @@ -1 +1 @@ -__api_version__ = "0142" +__api_version__ = "0143" diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index 944d57d7..2611e78f 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -116,7 +116,7 @@ class JsonDocStatusStorage(DocStatusStorage): """ if not data: return - logger.info(f"Inserting {len(data)} records to {self.namespace}") + logger.debug(f"Inserting {len(data)} records to {self.namespace}") async with self._storage_lock: self._data.update(data) await set_all_update_flags(self.namespace) diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index 82c18d95..d0cd64fb 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -121,7 +121,7 @@ class JsonKVStorage(BaseKVStorage): """ if not data: return - logger.info(f"Inserting {len(data)} records to {self.namespace}") + logger.debug(f"Inserting {len(data)} records to {self.namespace}") async with self._storage_lock: self._data.update(data) await set_all_update_flags(self.namespace) diff --git a/lightrag/kg/nano_vector_db_impl.py b/lightrag/kg/nano_vector_db_impl.py index 56a52b92..3003eea0 100644 --- a/lightrag/kg/nano_vector_db_impl.py +++ b/lightrag/kg/nano_vector_db_impl.py @@ -85,7 +85,7 @@ class NanoVectorDBStorage(BaseVectorStorage): KG-storage-log should be used to avoid data corruption """ - logger.info(f"Inserting {len(data)} to {self.namespace}") + logger.debug(f"Inserting {len(data)} to {self.namespace}") if not data: return diff --git a/lightrag/kg/networkx_impl.py b/lightrag/kg/networkx_impl.py index 2bea85a3..e746af2e 100644 --- a/lightrag/kg/networkx_impl.py +++ b/lightrag/kg/networkx_impl.py @@ -392,7 +392,7 @@ class NetworkXStorage(BaseGraphStorage): # Check if storage was updated by another process if self.storage_updated.value: # Storage was updated by another process, reload data instead of saving - logger.warning( + logger.info( f"Graph for {self.namespace} was updated by another process, reloading..." ) self._graph = ( diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index c7947e43..1ba4ada5 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -361,7 +361,7 @@ class PGKVStorage(BaseKVStorage): ################ INSERT METHODS ################ async def upsert(self, data: dict[str, dict[str, Any]]) -> None: - logger.info(f"Inserting {len(data)} to {self.namespace}") + logger.debug(f"Inserting {len(data)} to {self.namespace}") if not data: return @@ -560,7 +560,7 @@ class PGVectorStorage(BaseVectorStorage): return upsert_sql, data async def upsert(self, data: dict[str, dict[str, Any]]) -> None: - logger.info(f"Inserting {len(data)} to {self.namespace}") + logger.debug(f"Inserting {len(data)} to {self.namespace}") if not data: return @@ -949,7 +949,7 @@ class PGDocStatusStorage(DocStatusStorage): Args: data: dictionary of document IDs and their status data """ - logger.info(f"Inserting {len(data)} to {self.namespace}") + logger.debug(f"Inserting {len(data)} to {self.namespace}") if not data: return diff --git a/lightrag/operate.py b/lightrag/operate.py index 97a356ad..c7f9a99c 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -24,8 +24,8 @@ from .utils import ( handle_cache, save_to_cache, CacheData, - statistic_data, get_conversation_turns, + use_llm_func_with_cache, ) from .base import ( BaseGraphStorage, @@ -106,6 +106,9 @@ async def _handle_entity_relation_summary( entity_or_relation_name: str, description: str, global_config: dict, + pipeline_status: dict = None, + pipeline_status_lock=None, + llm_response_cache: BaseKVStorage | None = None, ) -> str: """Handle entity relation summary For each entity or relation, input is the combined description of already existing description and new description. @@ -122,6 +125,7 @@ async def _handle_entity_relation_summary( tokens = encode_string_by_tiktoken(description, model_name=tiktoken_model_name) if len(tokens) < summary_max_tokens: # No need for summary return description + prompt_template = PROMPTS["summarize_entity_descriptions"] use_description = decode_tokens_by_tiktoken( tokens[:llm_max_tokens], model_name=tiktoken_model_name @@ -133,7 +137,23 @@ async def _handle_entity_relation_summary( ) use_prompt = prompt_template.format(**context_base) logger.debug(f"Trigger summary: {entity_or_relation_name}") - summary = await use_llm_func(use_prompt, max_tokens=summary_max_tokens) + + # Update pipeline status when LLM summary is needed + status_message = "Use LLM to re-summary description..." + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + + # Use LLM function with cache + summary = await use_llm_func_with_cache( + use_prompt, + use_llm_func, + llm_response_cache=llm_response_cache, + max_tokens=summary_max_tokens, + cache_type="extract", + ) return summary @@ -212,6 +232,9 @@ async def _merge_nodes_then_upsert( nodes_data: list[dict], knowledge_graph_inst: BaseGraphStorage, global_config: dict, + pipeline_status: dict = None, + pipeline_status_lock=None, + llm_response_cache: BaseKVStorage | None = None, ): """Get existing nodes from knowledge graph use name,if exists, merge data, else create, then upsert.""" already_entity_types = [] @@ -221,6 +244,14 @@ async def _merge_nodes_then_upsert( already_node = await knowledge_graph_inst.get_node(entity_name) if already_node is not None: + # Update pipeline status when a node that needs merging is found + status_message = f"Merging entity: {entity_name}" + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + already_entity_types.append(already_node["entity_type"]) already_source_ids.extend( split_string_by_multi_markers(already_node["source_id"], [GRAPH_FIELD_SEP]) @@ -249,7 +280,12 @@ async def _merge_nodes_then_upsert( logger.debug(f"file_path: {file_path}") description = await _handle_entity_relation_summary( - entity_name, description, global_config + entity_name, + description, + global_config, + pipeline_status, + pipeline_status_lock, + llm_response_cache, ) node_data = dict( entity_id=entity_name, @@ -272,6 +308,9 @@ async def _merge_edges_then_upsert( edges_data: list[dict], knowledge_graph_inst: BaseGraphStorage, global_config: dict, + pipeline_status: dict = None, + pipeline_status_lock=None, + llm_response_cache: BaseKVStorage | None = None, ): already_weights = [] already_source_ids = [] @@ -280,6 +319,14 @@ async def _merge_edges_then_upsert( already_file_paths = [] if await knowledge_graph_inst.has_edge(src_id, tgt_id): + # Update pipeline status when an edge that needs merging is found + status_message = f"Merging edge::: {src_id} - {tgt_id}" + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + already_edge = await knowledge_graph_inst.get_edge(src_id, tgt_id) # Handle the case where get_edge returns None or missing fields if already_edge: @@ -358,7 +405,12 @@ async def _merge_edges_then_upsert( }, ) description = await _handle_entity_relation_summary( - f"({src_id}, {tgt_id})", description, global_config + f"({src_id}, {tgt_id})", + description, + global_config, + pipeline_status, + pipeline_status_lock, + llm_response_cache, ) await knowledge_graph_inst.upsert_edge( src_id, @@ -396,9 +448,6 @@ async def extract_entities( ) -> None: use_llm_func: callable = global_config["llm_model_func"] entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"] - enable_llm_cache_for_entity_extract: bool = global_config[ - "enable_llm_cache_for_entity_extract" - ] ordered_chunks = list(chunks.items()) # add language and example number params to prompt @@ -449,51 +498,7 @@ async def extract_entities( graph_db_lock = get_graph_db_lock(enable_logging=False) - async def _user_llm_func_with_cache( - input_text: str, history_messages: list[dict[str, str]] = None - ) -> str: - if enable_llm_cache_for_entity_extract and llm_response_cache: - if history_messages: - history = json.dumps(history_messages, ensure_ascii=False) - _prompt = history + "\n" + input_text - else: - _prompt = input_text - - # TODO: add cache_type="extract" - arg_hash = compute_args_hash(_prompt) - cached_return, _1, _2, _3 = await handle_cache( - llm_response_cache, - arg_hash, - _prompt, - "default", - cache_type="extract", - ) - if cached_return: - logger.debug(f"Found cache for {arg_hash}") - statistic_data["llm_cache"] += 1 - return cached_return - statistic_data["llm_call"] += 1 - if history_messages: - res: str = await use_llm_func( - input_text, history_messages=history_messages - ) - else: - res: str = await use_llm_func(input_text) - await save_to_cache( - llm_response_cache, - CacheData( - args_hash=arg_hash, - content=res, - prompt=_prompt, - cache_type="extract", - ), - ) - return res - - if history_messages: - return await use_llm_func(input_text, history_messages=history_messages) - else: - return await use_llm_func(input_text) + # Use the global use_llm_func_with_cache function from utils.py async def _process_extraction_result( result: str, chunk_key: str, file_path: str = "unknown_source" @@ -558,7 +563,12 @@ async def extract_entities( **context_base, input_text="{input_text}" ).format(**context_base, input_text=content) - final_result = await _user_llm_func_with_cache(hint_prompt) + final_result = await use_llm_func_with_cache( + hint_prompt, + use_llm_func, + llm_response_cache=llm_response_cache, + cache_type="extract", + ) history = pack_user_ass_to_openai_messages(hint_prompt, final_result) # Process initial extraction with file path @@ -568,8 +578,12 @@ async def extract_entities( # Process additional gleaning results for now_glean_index in range(entity_extract_max_gleaning): - glean_result = await _user_llm_func_with_cache( - continue_prompt, history_messages=history + glean_result = await use_llm_func_with_cache( + continue_prompt, + use_llm_func, + llm_response_cache=llm_response_cache, + history_messages=history, + cache_type="extract", ) history += pack_user_ass_to_openai_messages(continue_prompt, glean_result) @@ -588,8 +602,12 @@ async def extract_entities( if now_glean_index == entity_extract_max_gleaning - 1: break - if_loop_result: str = await _user_llm_func_with_cache( - if_loop_prompt, history_messages=history + if_loop_result: str = await use_llm_func_with_cache( + if_loop_prompt, + use_llm_func, + llm_response_cache=llm_response_cache, + history_messages=history, + cache_type="extract", ) if_loop_result = if_loop_result.strip().strip('"').strip("'").lower() if if_loop_result != "yes": @@ -613,7 +631,13 @@ async def extract_entities( # Process and update entities for entity_name, entities in maybe_nodes.items(): entity_data = await _merge_nodes_then_upsert( - entity_name, entities, knowledge_graph_inst, global_config + entity_name, + entities, + knowledge_graph_inst, + global_config, + pipeline_status, + pipeline_status_lock, + llm_response_cache, ) chunk_entities_data.append(entity_data) @@ -627,6 +651,9 @@ async def extract_entities( edges, knowledge_graph_inst, global_config, + pipeline_status, + pipeline_status_lock, + llm_response_cache, ) chunk_relationships_data.append(edge_data) diff --git a/lightrag/utils.py b/lightrag/utils.py index 4f1f8064..dd7f217a 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -12,13 +12,17 @@ import re from dataclasses import dataclass from functools import wraps from hashlib import md5 -from typing import Any, Callable +from typing import Any, Callable, TYPE_CHECKING import xml.etree.ElementTree as ET import numpy as np import tiktoken from lightrag.prompt import PROMPTS from dotenv import load_dotenv +# Use TYPE_CHECKING to avoid circular imports +if TYPE_CHECKING: + from lightrag.base import BaseKVStorage + # use the .env that is inside the current folder # allows to use different .env file for each lightrag instance # the OS environment variables take precedence over the .env file @@ -908,6 +912,84 @@ def lazy_external_import(module_name: str, class_name: str) -> Callable[..., Any return import_class +async def use_llm_func_with_cache( + input_text: str, + use_llm_func: callable, + llm_response_cache: "BaseKVStorage | None" = None, + max_tokens: int = None, + history_messages: list[dict[str, str]] = None, + cache_type: str = "extract", +) -> str: + """Call LLM function with cache support + + If cache is available and enabled (determined by handle_cache based on mode), + retrieve result from cache; otherwise call LLM function and save result to cache. + + Args: + input_text: Input text to send to LLM + use_llm_func: LLM function to call + llm_response_cache: Cache storage instance + max_tokens: Maximum tokens for generation + history_messages: History messages list + cache_type: Type of cache + + Returns: + LLM response text + """ + if llm_response_cache: + if history_messages: + history = json.dumps(history_messages, ensure_ascii=False) + _prompt = history + "\n" + input_text + else: + _prompt = input_text + + arg_hash = compute_args_hash(_prompt) + cached_return, _1, _2, _3 = await handle_cache( + llm_response_cache, + arg_hash, + _prompt, + "default", + cache_type=cache_type, + ) + if cached_return: + logger.debug(f"Found cache for {arg_hash}") + statistic_data["llm_cache"] += 1 + return cached_return + statistic_data["llm_call"] += 1 + + # Call LLM + kwargs = {} + if history_messages: + kwargs["history_messages"] = history_messages + if max_tokens is not None: + kwargs["max_tokens"] = max_tokens + + res: str = await use_llm_func(input_text, **kwargs) + + # Save to cache + logger.info(f"Saving LLM cache for {arg_hash}") + await save_to_cache( + llm_response_cache, + CacheData( + args_hash=arg_hash, + content=res, + prompt=_prompt, + cache_type=cache_type, + ), + ) + return res + + # When cache is disabled, directly call LLM + kwargs = {} + if history_messages: + kwargs["history_messages"] = history_messages + if max_tokens is not None: + kwargs["max_tokens"] = max_tokens + + logger.info(f"Call LLM function with query text lenght: {len(input_text)}") + return await use_llm_func(input_text, **kwargs) + + def get_content_summary(content: str, max_length: int = 250) -> str: """Get summary of document content