feat: support aget_docs_by_ids
This commit is contained in:
@@ -1517,6 +1517,70 @@ class LightRAG:
|
||||
"""
|
||||
return await self.doc_status.get_docs_by_status(status)
|
||||
|
||||
async def aget_docs_by_ids(
|
||||
self, ids: str | list[str]
|
||||
) -> dict[str, DocProcessingStatus]:
|
||||
"""Retrieves the processing status for one or more documents by their IDs.
|
||||
|
||||
Args:
|
||||
ids: A single document ID (string) or a list of document IDs (list of strings).
|
||||
|
||||
Returns:
|
||||
A dictionary where keys are the document IDs for which a status was found,
|
||||
and values are the corresponding DocProcessingStatus objects. IDs that
|
||||
are not found in the storage will be omitted from the result dictionary.
|
||||
"""
|
||||
if isinstance(ids, str):
|
||||
# Ensure input is always a list of IDs for uniform processing
|
||||
id_list = [ids]
|
||||
elif (
|
||||
ids is None
|
||||
): # Handle potential None input gracefully, although type hint suggests str/list
|
||||
logger.warning(
|
||||
"aget_docs_by_ids called with None input, returning empty dict."
|
||||
)
|
||||
return {}
|
||||
else:
|
||||
# Assume input is already a list if not a string
|
||||
id_list = ids
|
||||
|
||||
# Return early if the final list of IDs is empty
|
||||
if not id_list:
|
||||
logger.debug("aget_docs_by_ids called with an empty list of IDs.")
|
||||
return {}
|
||||
|
||||
# Create tasks to fetch document statuses concurrently using the doc_status storage
|
||||
tasks = [self.doc_status.get_by_id(doc_id) for doc_id in id_list]
|
||||
# Execute tasks concurrently and gather the results. Results maintain order.
|
||||
# Type hint indicates results can be DocProcessingStatus or None if not found.
|
||||
results_list: list[Optional[DocProcessingStatus]] = await asyncio.gather(*tasks)
|
||||
|
||||
# Build the result dictionary, mapping found IDs to their statuses
|
||||
found_statuses: dict[str, DocProcessingStatus] = {}
|
||||
# Keep track of IDs for which no status was found (for logging purposes)
|
||||
not_found_ids: list[str] = []
|
||||
|
||||
# Iterate through the results, correlating them back to the original IDs
|
||||
for i, status_obj in enumerate(results_list):
|
||||
doc_id = id_list[
|
||||
i
|
||||
] # Get the original ID corresponding to this result index
|
||||
if status_obj:
|
||||
# If a status object was returned (not None), add it to the result dict
|
||||
found_statuses[doc_id] = status_obj
|
||||
else:
|
||||
# If status_obj is None, the document ID was not found in storage
|
||||
not_found_ids.append(doc_id)
|
||||
|
||||
# Log a warning if any of the requested document IDs were not found
|
||||
if not_found_ids:
|
||||
logger.warning(
|
||||
f"Document statuses not found for the following IDs: {not_found_ids}"
|
||||
)
|
||||
|
||||
# Return the dictionary containing statuses only for the found document IDs
|
||||
return found_statuses
|
||||
|
||||
# TODO: Deprecated (Deleting documents can cause hallucinations in RAG.)
|
||||
# Document delete is not working properly for most of the storage implementations.
|
||||
async def adelete_by_doc_id(self, doc_id: str) -> None:
|
||||
|
Reference in New Issue
Block a user