Avoid redundant llm cache updates
This commit is contained in:
@@ -20,6 +20,7 @@ from .shared_storage import (
|
||||
set_all_update_flags,
|
||||
clear_all_update_flags,
|
||||
try_initialize_namespace,
|
||||
is_multiprocess,
|
||||
)
|
||||
|
||||
|
||||
@@ -95,7 +96,7 @@ class JsonDocStatusStorage(DocStatusStorage):
|
||||
|
||||
async def index_done_callback(self) -> None:
|
||||
async with self._storage_lock:
|
||||
if self.storage_updated:
|
||||
if (is_multiprocess and self.storage_updated.value) or (not is_multiprocess and self.storage_updated):
|
||||
data_dict = (
|
||||
dict(self._data) if hasattr(self._data, "_getvalue") else self._data
|
||||
)
|
||||
|
@@ -18,6 +18,7 @@ from .shared_storage import (
|
||||
set_all_update_flags,
|
||||
clear_all_update_flags,
|
||||
try_initialize_namespace,
|
||||
is_multiprocess,
|
||||
)
|
||||
|
||||
|
||||
@@ -57,7 +58,7 @@ class JsonKVStorage(BaseKVStorage):
|
||||
|
||||
async def index_done_callback(self) -> None:
|
||||
async with self._storage_lock:
|
||||
if self.storage_updated:
|
||||
if (is_multiprocess and self.storage_updated.value) or (not is_multiprocess and self.storage_updated):
|
||||
data_dict = (
|
||||
dict(self._data) if hasattr(self._data, "_getvalue") else self._data
|
||||
)
|
||||
|
@@ -705,9 +705,22 @@ class CacheData:
|
||||
|
||||
|
||||
async def save_to_cache(hashing_kv, cache_data: CacheData):
|
||||
if hashing_kv is None or hasattr(cache_data.content, "__aiter__"):
|
||||
"""Save data to cache, with improved handling for streaming responses and duplicate content.
|
||||
|
||||
Args:
|
||||
hashing_kv: The key-value storage for caching
|
||||
cache_data: The cache data to save
|
||||
"""
|
||||
# Skip if storage is None or content is a streaming response
|
||||
if hashing_kv is None or not cache_data.content:
|
||||
return
|
||||
|
||||
|
||||
# If content is a streaming response, don't cache it
|
||||
if hasattr(cache_data.content, "__aiter__"):
|
||||
logger.debug("Streaming response detected, skipping cache")
|
||||
return
|
||||
|
||||
# Get existing cache data
|
||||
if exists_func(hashing_kv, "get_by_mode_and_id"):
|
||||
mode_cache = (
|
||||
await hashing_kv.get_by_mode_and_id(cache_data.mode, cache_data.args_hash)
|
||||
@@ -715,7 +728,15 @@ async def save_to_cache(hashing_kv, cache_data: CacheData):
|
||||
)
|
||||
else:
|
||||
mode_cache = await hashing_kv.get_by_id(cache_data.mode) or {}
|
||||
|
||||
|
||||
# Check if we already have identical content cached
|
||||
if cache_data.args_hash in mode_cache:
|
||||
existing_content = mode_cache[cache_data.args_hash].get("return")
|
||||
if existing_content == cache_data.content:
|
||||
logger.info(f"Cache content unchanged for {cache_data.args_hash}, skipping update")
|
||||
return
|
||||
|
||||
# Update cache with new content
|
||||
mode_cache[cache_data.args_hash] = {
|
||||
"return": cache_data.content,
|
||||
"cache_type": cache_data.cache_type,
|
||||
@@ -729,7 +750,8 @@ async def save_to_cache(hashing_kv, cache_data: CacheData):
|
||||
"embedding_max": cache_data.max_val,
|
||||
"original_prompt": cache_data.prompt,
|
||||
}
|
||||
|
||||
|
||||
# Only upsert if there's actual new content
|
||||
await hashing_kv.upsert({cache_data.mode: mode_cache})
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user