cleaned set

This commit is contained in:
Yannick Stephan
2025-02-09 19:56:12 +01:00
parent 6480ddee5d
commit 62115b836f
3 changed files with 8 additions and 9 deletions

View File

@@ -39,7 +39,7 @@ class JsonKVStorage(BaseKVStorage):
] ]
async def filter_keys(self, data: set[str]) -> set[str]: async def filter_keys(self, data: set[str]) -> set[str]:
return data - set(self._data.keys()) return set(self._data.keys()).difference(data)
async def upsert(self, data: dict[str, dict[str, Any]]) -> None: async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
left_data = {k: v for k, v in data.items() if k not in self._data} left_data = {k: v for k, v in data.items() if k not in self._data}

View File

@@ -76,7 +76,7 @@ class JsonDocStatusStorage(DocStatusStorage):
async def filter_keys(self, data: set[str]) -> set[str]: async def filter_keys(self, data: set[str]) -> set[str]:
"""Return keys that should be processed (not in storage or not successfully processed)""" """Return keys that should be processed (not in storage or not successfully processed)"""
return set(k for k in data if k not in self._data) return set(self._data.keys()).difference(data)
async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
result: list[dict[str, Any]] = [] result: list[dict[str, Any]] = []

View File

@@ -409,7 +409,7 @@ class LightRAG:
doc_key = compute_mdhash_id(full_text.strip(), prefix="doc-") doc_key = compute_mdhash_id(full_text.strip(), prefix="doc-")
new_docs = {doc_key: {"content": full_text.strip()}} new_docs = {doc_key: {"content": full_text.strip()}}
_add_doc_keys = await self.full_docs.filter_keys([doc_key]) _add_doc_keys = await self.full_docs.filter_keys(set(doc_key))
new_docs = {k: v for k, v in new_docs.items() if k in _add_doc_keys} new_docs = {k: v for k, v in new_docs.items() if k in _add_doc_keys}
if not len(new_docs): if not len(new_docs):
logger.warning("This document is already in the storage.") logger.warning("This document is already in the storage.")
@@ -418,7 +418,7 @@ class LightRAG:
update_storage = True update_storage = True
logger.info(f"[New Docs] inserting {len(new_docs)} docs") logger.info(f"[New Docs] inserting {len(new_docs)} docs")
inserting_chunks = {} inserting_chunks: dict[str, Any] = {}
for chunk_text in text_chunks: for chunk_text in text_chunks:
chunk_text_stripped = chunk_text.strip() chunk_text_stripped = chunk_text.strip()
chunk_key = compute_mdhash_id(chunk_text_stripped, prefix="chunk-") chunk_key = compute_mdhash_id(chunk_text_stripped, prefix="chunk-")
@@ -428,11 +428,10 @@ class LightRAG:
"full_doc_id": doc_key, "full_doc_id": doc_key,
} }
_add_chunk_keys = await self.text_chunks.filter_keys( doc_ids = set(inserting_chunks.keys())
list(inserting_chunks.keys()) add_chunk_keys = await self.text_chunks.filter_keys(doc_ids)
)
inserting_chunks = { inserting_chunks = {
k: v for k, v in inserting_chunks.items() if k in _add_chunk_keys k: v for k, v in inserting_chunks.items() if k in add_chunk_keys
} }
if not len(inserting_chunks): if not len(inserting_chunks):
logger.warning("All chunks are already in the storage.") logger.warning("All chunks are already in the storage.")
@@ -539,7 +538,7 @@ class LightRAG:
logger.info("All documents have been processed or are duplicates") logger.info("All documents have been processed or are duplicates")
return return
to_process_docs_ids = list(to_process_docs.keys()) to_process_docs_ids = set(to_process_docs.keys())
# Get allready processed documents (text chunks and full docs) # Get allready processed documents (text chunks and full docs)
text_chunks_processed_doc_ids = await self.text_chunks.filter_keys( text_chunks_processed_doc_ids = await self.text_chunks.filter_keys(