diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 503af001..45352e21 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1517,6 +1517,70 @@ class LightRAG: """ return await self.doc_status.get_docs_by_status(status) + async def aget_docs_by_ids( + self, ids: str | list[str] + ) -> dict[str, DocProcessingStatus]: + """Retrieves the processing status for one or more documents by their IDs. + + Args: + ids: A single document ID (string) or a list of document IDs (list of strings). + + Returns: + A dictionary where keys are the document IDs for which a status was found, + and values are the corresponding DocProcessingStatus objects. IDs that + are not found in the storage will be omitted from the result dictionary. + """ + if isinstance(ids, str): + # Ensure input is always a list of IDs for uniform processing + id_list = [ids] + elif ( + ids is None + ): # Handle potential None input gracefully, although type hint suggests str/list + logger.warning( + "aget_docs_by_ids called with None input, returning empty dict." + ) + return {} + else: + # Assume input is already a list if not a string + id_list = ids + + # Return early if the final list of IDs is empty + if not id_list: + logger.debug("aget_docs_by_ids called with an empty list of IDs.") + return {} + + # Create tasks to fetch document statuses concurrently using the doc_status storage + tasks = [self.doc_status.get_by_id(doc_id) for doc_id in id_list] + # Execute tasks concurrently and gather the results. Results maintain order. + # Type hint indicates results can be DocProcessingStatus or None if not found. + results_list: list[Optional[DocProcessingStatus]] = await asyncio.gather(*tasks) + + # Build the result dictionary, mapping found IDs to their statuses + found_statuses: dict[str, DocProcessingStatus] = {} + # Keep track of IDs for which no status was found (for logging purposes) + not_found_ids: list[str] = [] + + # Iterate through the results, correlating them back to the original IDs + for i, status_obj in enumerate(results_list): + doc_id = id_list[ + i + ] # Get the original ID corresponding to this result index + if status_obj: + # If a status object was returned (not None), add it to the result dict + found_statuses[doc_id] = status_obj + else: + # If status_obj is None, the document ID was not found in storage + not_found_ids.append(doc_id) + + # Log a warning if any of the requested document IDs were not found + if not_found_ids: + logger.warning( + f"Document statuses not found for the following IDs: {not_found_ids}" + ) + + # Return the dictionary containing statuses only for the found document IDs + return found_statuses + # TODO: Deprecated (Deleting documents can cause hallucinations in RAG.) # Document delete is not working properly for most of the storage implementations. async def adelete_by_doc_id(self, doc_id: str) -> None: