diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index e545c650..f5df0833 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -39,7 +39,7 @@ class JsonKVStorage(BaseKVStorage): ] async def filter_keys(self, data: set[str]) -> set[str]: - return data - set(self._data.keys()) + return set(self._data.keys()).difference(data) async def upsert(self, data: dict[str, dict[str, Any]]) -> None: left_data = {k: v for k, v in data.items() if k not in self._data} diff --git a/lightrag/kg/jsondocstatus_impl.py b/lightrag/kg/jsondocstatus_impl.py index 2ff06d3a..35d31fba 100644 --- a/lightrag/kg/jsondocstatus_impl.py +++ b/lightrag/kg/jsondocstatus_impl.py @@ -76,7 +76,7 @@ class JsonDocStatusStorage(DocStatusStorage): async def filter_keys(self, data: set[str]) -> set[str]: """Return keys that should be processed (not in storage or not successfully processed)""" - return set(k for k in data if k not in self._data) + return set(self._data.keys()).difference(data) async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: result: list[dict[str, Any]] = [] diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 6ff283c6..bf395e29 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -409,7 +409,7 @@ class LightRAG: doc_key = compute_mdhash_id(full_text.strip(), prefix="doc-") new_docs = {doc_key: {"content": full_text.strip()}} - _add_doc_keys = await self.full_docs.filter_keys([doc_key]) + _add_doc_keys = await self.full_docs.filter_keys(set(doc_key)) new_docs = {k: v for k, v in new_docs.items() if k in _add_doc_keys} if not len(new_docs): logger.warning("This document is already in the storage.") @@ -418,7 +418,7 @@ class LightRAG: update_storage = True logger.info(f"[New Docs] inserting {len(new_docs)} docs") - inserting_chunks = {} + inserting_chunks: dict[str, Any] = {} for chunk_text in text_chunks: chunk_text_stripped = chunk_text.strip() chunk_key = compute_mdhash_id(chunk_text_stripped, prefix="chunk-") @@ -428,11 +428,10 @@ class LightRAG: "full_doc_id": doc_key, } - _add_chunk_keys = await self.text_chunks.filter_keys( - list(inserting_chunks.keys()) - ) + doc_ids = set(inserting_chunks.keys()) + add_chunk_keys = await self.text_chunks.filter_keys(doc_ids) inserting_chunks = { - k: v for k, v in inserting_chunks.items() if k in _add_chunk_keys + k: v for k, v in inserting_chunks.items() if k in add_chunk_keys } if not len(inserting_chunks): logger.warning("All chunks are already in the storage.") @@ -539,7 +538,7 @@ class LightRAG: logger.info("All documents have been processed or are duplicates") return - to_process_docs_ids = list(to_process_docs.keys()) + to_process_docs_ids = set(to_process_docs.keys()) # Get allready processed documents (text chunks and full docs) text_chunks_processed_doc_ids = await self.text_chunks.filter_keys(