diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index c1666192..7b6f11c1 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -99,6 +99,37 @@ class DocsStatusesResponse(BaseModel): statuses: Dict[DocStatus, List[DocStatusResponse]] = {} +class PipelineStatusResponse(BaseModel): + """Response model for pipeline status + + Attributes: + autoscanned: Whether auto-scan has started + busy: Whether the pipeline is currently busy + job_name: Current job name (e.g., indexing files/indexing texts) + job_start: Job start time as ISO format string (optional) + docs: Total number of documents to be indexed + batchs: Number of batches for processing documents + cur_batch: Current processing batch + request_pending: Flag for pending request for processing + latest_message: Latest message from pipeline processing + history_messages: List of history messages + """ + + autoscanned: bool = False + busy: bool = False + job_name: str = "Default Job" + job_start: Optional[str] = None + docs: int = 0 + batchs: int = 0 + cur_batch: int = 0 + request_pending: bool = False + latest_message: str = "" + history_messages: Optional[List[str]] = None + + class Config: + extra = "allow" # Allow additional fields from the pipeline status + + class DocumentManager: def __init__( self, @@ -247,7 +278,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool: if global_args["main_args"].document_loading_engine == "DOCLING": if not pm.is_installed("docling"): # type: ignore pm.install("docling") - from docling.document_converter import DocumentConverter + from docling.document_converter import DocumentConverter # type: ignore converter = DocumentConverter() result = converter.convert(file_path) @@ -266,7 +297,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool: if global_args["main_args"].document_loading_engine == "DOCLING": if not pm.is_installed("docling"): # type: ignore pm.install("docling") - from docling.document_converter import DocumentConverter + from docling.document_converter import DocumentConverter # type: ignore converter = DocumentConverter() result = converter.convert(file_path) @@ -286,7 +317,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool: if global_args["main_args"].document_loading_engine == "DOCLING": if not pm.is_installed("docling"): # type: ignore pm.install("docling") - from docling.document_converter import DocumentConverter + from docling.document_converter import DocumentConverter # type: ignore converter = DocumentConverter() result = converter.convert(file_path) @@ -307,7 +338,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool: if global_args["main_args"].document_loading_engine == "DOCLING": if not pm.is_installed("docling"): # type: ignore pm.install("docling") - from docling.document_converter import DocumentConverter + from docling.document_converter import DocumentConverter # type: ignore converter = DocumentConverter() result = converter.convert(file_path) @@ -718,17 +749,33 @@ def create_document_routes( logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) - @router.get("/pipeline_status", dependencies=[Depends(optional_api_key)]) - async def get_pipeline_status(): + @router.get( + "/pipeline_status", + dependencies=[Depends(optional_api_key)], + response_model=PipelineStatusResponse, + ) + async def get_pipeline_status() -> PipelineStatusResponse: """ Get the current status of the document indexing pipeline. This endpoint returns information about the current state of the document processing pipeline, - including whether it's busy, the current job name, when it started, how many documents - are being processed, how many batches there are, and which batch is currently being processed. + including the processing status, progress information, and history messages. Returns: - dict: A dictionary containing the pipeline status information + PipelineStatusResponse: A response object containing: + - autoscanned (bool): Whether auto-scan has started + - busy (bool): Whether the pipeline is currently busy + - job_name (str): Current job name (e.g., indexing files/indexing texts) + - job_start (str, optional): Job start time as ISO format string + - docs (int): Total number of documents to be indexed + - batchs (int): Number of batches for processing documents + - cur_batch (int): Current processing batch + - request_pending (bool): Flag for pending request for processing + - latest_message (str): Latest message from pipeline processing + - history_messages (List[str], optional): List of history messages + + Raises: + HTTPException: If an error occurs while retrieving pipeline status (500) """ try: from lightrag.kg.shared_storage import get_namespace_data @@ -746,7 +793,7 @@ def create_document_routes( if status_dict.get("job_start"): status_dict["job_start"] = str(status_dict["job_start"]) - return status_dict + return PipelineStatusResponse(**status_dict) except Exception as e: logger.error(f"Error getting pipeline status: {str(e)}") logger.error(traceback.format_exc())