diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index 4502397b..57a34ae5 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -20,7 +20,6 @@ from .shared_storage import ( set_all_update_flags, clear_all_update_flags, try_initialize_namespace, - is_multiprocess, ) @@ -96,9 +95,7 @@ class JsonDocStatusStorage(DocStatusStorage): async def index_done_callback(self) -> None: async with self._storage_lock: - if (is_multiprocess and self.storage_updated.value) or ( - not is_multiprocess and self.storage_updated - ): + if self.storage_updated.value: data_dict = ( dict(self._data) if hasattr(self._data, "_getvalue") else self._data ) diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index 80abe92e..e7deaf15 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -18,7 +18,6 @@ from .shared_storage import ( set_all_update_flags, clear_all_update_flags, try_initialize_namespace, - is_multiprocess, ) @@ -63,9 +62,7 @@ class JsonKVStorage(BaseKVStorage): async def index_done_callback(self) -> None: async with self._storage_lock: - if (is_multiprocess and self.storage_updated.value) or ( - not is_multiprocess and self.storage_updated - ): + if self.storage_updated.value: data_dict = ( dict(self._data) if hasattr(self._data, "_getvalue") else self._data ) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 63ff1f0d..9bf072be 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -322,7 +322,12 @@ async def get_update_flag(namespace: str): if is_multiprocess and _manager is not None: new_update_flag = _manager.Value("b", False) else: - new_update_flag = False + # Create a simple mutable object to store boolean value for compatibility with mutiprocess + class MutableBoolean: + def __init__(self, initial_value=False): + self.value = initial_value + + new_update_flag = MutableBoolean(False) _update_flags[namespace].append(new_update_flag) return new_update_flag @@ -342,7 +347,8 @@ async def set_all_update_flags(namespace: str): if is_multiprocess: _update_flags[namespace][i].value = True else: - _update_flags[namespace][i] = True + # Use .value attribute instead of direct assignment + _update_flags[namespace][i].value = True async def clear_all_update_flags(namespace: str): @@ -359,7 +365,8 @@ async def clear_all_update_flags(namespace: str): if is_multiprocess: _update_flags[namespace][i].value = False else: - _update_flags[namespace][i] = False + # Use .value attribute instead of direct assignment + _update_flags[namespace][i].value = False async def get_all_update_flags_status() -> Dict[str, list]: