Merge pull request #1419 from earayu/feat/support_aget_docs_by_ids

feat: Add `aget_docs_by_ids` to check document processing status
This commit is contained in:
Daniel.y
2025-04-22 11:09:54 +08:00
committed by GitHub

View File

@@ -1517,6 +1517,70 @@ class LightRAG:
"""
return await self.doc_status.get_docs_by_status(status)
async def aget_docs_by_ids(
self, ids: str | list[str]
) -> dict[str, DocProcessingStatus]:
"""Retrieves the processing status for one or more documents by their IDs.
Args:
ids: A single document ID (string) or a list of document IDs (list of strings).
Returns:
A dictionary where keys are the document IDs for which a status was found,
and values are the corresponding DocProcessingStatus objects. IDs that
are not found in the storage will be omitted from the result dictionary.
"""
if isinstance(ids, str):
# Ensure input is always a list of IDs for uniform processing
id_list = [ids]
elif (
ids is None
): # Handle potential None input gracefully, although type hint suggests str/list
logger.warning(
"aget_docs_by_ids called with None input, returning empty dict."
)
return {}
else:
# Assume input is already a list if not a string
id_list = ids
# Return early if the final list of IDs is empty
if not id_list:
logger.debug("aget_docs_by_ids called with an empty list of IDs.")
return {}
# Create tasks to fetch document statuses concurrently using the doc_status storage
tasks = [self.doc_status.get_by_id(doc_id) for doc_id in id_list]
# Execute tasks concurrently and gather the results. Results maintain order.
# Type hint indicates results can be DocProcessingStatus or None if not found.
results_list: list[Optional[DocProcessingStatus]] = await asyncio.gather(*tasks)
# Build the result dictionary, mapping found IDs to their statuses
found_statuses: dict[str, DocProcessingStatus] = {}
# Keep track of IDs for which no status was found (for logging purposes)
not_found_ids: list[str] = []
# Iterate through the results, correlating them back to the original IDs
for i, status_obj in enumerate(results_list):
doc_id = id_list[
i
] # Get the original ID corresponding to this result index
if status_obj:
# If a status object was returned (not None), add it to the result dict
found_statuses[doc_id] = status_obj
else:
# If status_obj is None, the document ID was not found in storage
not_found_ids.append(doc_id)
# Log a warning if any of the requested document IDs were not found
if not_found_ids:
logger.warning(
f"Document statuses not found for the following IDs: {not_found_ids}"
)
# Return the dictionary containing statuses only for the found document IDs
return found_statuses
# TODO: Deprecated (Deleting documents can cause hallucinations in RAG.)
# Document delete is not working properly for most of the storage implementations.
async def adelete_by_doc_id(self, doc_id: str) -> None: