From 6b0acce6440dea3e438b10f5e28208277440572b Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 10 Mar 2025 01:45:58 +0800 Subject: [PATCH] Avoid redundant llm cache updates --- lightrag/kg/json_doc_status_impl.py | 3 ++- lightrag/kg/json_kv_impl.py | 3 ++- lightrag/utils.py | 30 +++++++++++++++++++++++++---- 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index c33059ad..3c1fb4c2 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -20,6 +20,7 @@ from .shared_storage import ( set_all_update_flags, clear_all_update_flags, try_initialize_namespace, + is_multiprocess, ) @@ -95,7 +96,7 @@ class JsonDocStatusStorage(DocStatusStorage): async def index_done_callback(self) -> None: async with self._storage_lock: - if self.storage_updated: + if (is_multiprocess and self.storage_updated.value) or (not is_multiprocess and self.storage_updated): data_dict = ( dict(self._data) if hasattr(self._data, "_getvalue") else self._data ) diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index c69b53ec..b5d963fb 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -18,6 +18,7 @@ from .shared_storage import ( set_all_update_flags, clear_all_update_flags, try_initialize_namespace, + is_multiprocess, ) @@ -57,7 +58,7 @@ class JsonKVStorage(BaseKVStorage): async def index_done_callback(self) -> None: async with self._storage_lock: - if self.storage_updated: + if (is_multiprocess and self.storage_updated.value) or (not is_multiprocess and self.storage_updated): data_dict = ( dict(self._data) if hasattr(self._data, "_getvalue") else self._data ) diff --git a/lightrag/utils.py b/lightrag/utils.py index 02c3236d..56548420 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -705,9 +705,22 @@ class CacheData: async def save_to_cache(hashing_kv, cache_data: CacheData): - if hashing_kv is None or hasattr(cache_data.content, "__aiter__"): + """Save data to cache, with improved handling for streaming responses and duplicate content. + + Args: + hashing_kv: The key-value storage for caching + cache_data: The cache data to save + """ + # Skip if storage is None or content is a streaming response + if hashing_kv is None or not cache_data.content: return - + + # If content is a streaming response, don't cache it + if hasattr(cache_data.content, "__aiter__"): + logger.debug("Streaming response detected, skipping cache") + return + + # Get existing cache data if exists_func(hashing_kv, "get_by_mode_and_id"): mode_cache = ( await hashing_kv.get_by_mode_and_id(cache_data.mode, cache_data.args_hash) @@ -715,7 +728,15 @@ async def save_to_cache(hashing_kv, cache_data: CacheData): ) else: mode_cache = await hashing_kv.get_by_id(cache_data.mode) or {} - + + # Check if we already have identical content cached + if cache_data.args_hash in mode_cache: + existing_content = mode_cache[cache_data.args_hash].get("return") + if existing_content == cache_data.content: + logger.info(f"Cache content unchanged for {cache_data.args_hash}, skipping update") + return + + # Update cache with new content mode_cache[cache_data.args_hash] = { "return": cache_data.content, "cache_type": cache_data.cache_type, @@ -729,7 +750,8 @@ async def save_to_cache(hashing_kv, cache_data: CacheData): "embedding_max": cache_data.max_val, "original_prompt": cache_data.prompt, } - + + # Only upsert if there's actual new content await hashing_kv.upsert({cache_data.mode: mode_cache})