Fix update status handling bugs in drop function of json kv storage
This commit is contained in:
@@ -141,9 +141,14 @@ class JsonDocStatusStorage(DocStatusStorage):
|
|||||||
None
|
None
|
||||||
"""
|
"""
|
||||||
async with self._storage_lock:
|
async with self._storage_lock:
|
||||||
|
any_deleted = False
|
||||||
for doc_id in doc_ids:
|
for doc_id in doc_ids:
|
||||||
self._data.pop(doc_id, None)
|
result = self._data.pop(doc_id, None)
|
||||||
await set_all_update_flags(self.namespace)
|
if result is not None:
|
||||||
|
any_deleted = True
|
||||||
|
|
||||||
|
if any_deleted:
|
||||||
|
await set_all_update_flags(self.namespace)
|
||||||
|
|
||||||
async def drop(self) -> dict[str, str]:
|
async def drop(self) -> dict[str, str]:
|
||||||
"""Drop all document status data from storage and clean up resources
|
"""Drop all document status data from storage and clean up resources
|
||||||
@@ -160,7 +165,9 @@ class JsonDocStatusStorage(DocStatusStorage):
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
async with self._storage_lock:
|
async with self._storage_lock:
|
||||||
self._data.update({})
|
self._data.clear()
|
||||||
|
await set_all_update_flags(self.namespace)
|
||||||
|
|
||||||
await self.index_done_callback()
|
await self.index_done_callback()
|
||||||
logger.info(f"Process {os.getpid()} drop {self.namespace}")
|
logger.info(f"Process {os.getpid()} drop {self.namespace}")
|
||||||
return {"status": "success", "message": "data dropped"}
|
return {"status": "success", "message": "data dropped"}
|
||||||
|
@@ -140,9 +140,14 @@ class JsonKVStorage(BaseKVStorage):
|
|||||||
None
|
None
|
||||||
"""
|
"""
|
||||||
async with self._storage_lock:
|
async with self._storage_lock:
|
||||||
|
any_deleted = False
|
||||||
for doc_id in ids:
|
for doc_id in ids:
|
||||||
self._data.pop(doc_id, None)
|
result = self._data.pop(doc_id, None)
|
||||||
await set_all_update_flags(self.namespace)
|
if result is not None:
|
||||||
|
any_deleted = True
|
||||||
|
|
||||||
|
if any_deleted:
|
||||||
|
await set_all_update_flags(self.namespace)
|
||||||
|
|
||||||
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
|
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
|
||||||
"""Delete specific records from storage by by cache mode
|
"""Delete specific records from storage by by cache mode
|
||||||
@@ -183,7 +188,9 @@ class JsonKVStorage(BaseKVStorage):
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
async with self._storage_lock:
|
async with self._storage_lock:
|
||||||
self._data.update({})
|
self._data.clear()
|
||||||
|
await set_all_update_flags(self.namespace)
|
||||||
|
|
||||||
await self.index_done_callback()
|
await self.index_done_callback()
|
||||||
logger.info(f"Process {os.getpid()} drop {self.namespace}")
|
logger.info(f"Process {os.getpid()} drop {self.namespace}")
|
||||||
return {"status": "success", "message": "data dropped"}
|
return {"status": "success", "message": "data dropped"}
|
||||||
|
Reference in New Issue
Block a user