Add PipelineStatusResponse model for API endpoint

This commit is contained in:
yangdx
2025-03-12 16:16:39 +08:00
parent 4263c327ea
commit 16e06100e7

View File

@@ -99,6 +99,36 @@ class DocsStatusesResponse(BaseModel):
statuses: Dict[DocStatus, List[DocStatusResponse]] = {} statuses: Dict[DocStatus, List[DocStatusResponse]] = {}
class PipelineStatusResponse(BaseModel):
"""Response model for pipeline status
Attributes:
autoscanned: Whether auto-scan has started
busy: Whether the pipeline is currently busy
job_name: Current job name (e.g., indexing files/indexing texts)
job_start: Job start time as ISO format string (optional)
docs: Total number of documents to be indexed
batchs: Number of batches for processing documents
cur_batch: Current processing batch
request_pending: Flag for pending request for processing
latest_message: Latest message from pipeline processing
history_messages: List of history messages
"""
autoscanned: bool = False
busy: bool = False
job_name: str = "Default Job"
job_start: Optional[str] = None
docs: int = 0
batchs: int = 0
cur_batch: int = 0
request_pending: bool = False
latest_message: str = ""
history_messages: Optional[List[str]] = None
class Config:
extra = "allow" # Allow additional fields from the pipeline status
class DocumentManager: class DocumentManager:
def __init__( def __init__(
self, self,
@@ -247,7 +277,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool:
if global_args["main_args"].document_loading_engine == "DOCLING": if global_args["main_args"].document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore if not pm.is_installed("docling"): # type: ignore
pm.install("docling") pm.install("docling")
from docling.document_converter import DocumentConverter from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter() converter = DocumentConverter()
result = converter.convert(file_path) result = converter.convert(file_path)
@@ -266,7 +296,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool:
if global_args["main_args"].document_loading_engine == "DOCLING": if global_args["main_args"].document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore if not pm.is_installed("docling"): # type: ignore
pm.install("docling") pm.install("docling")
from docling.document_converter import DocumentConverter from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter() converter = DocumentConverter()
result = converter.convert(file_path) result = converter.convert(file_path)
@@ -286,7 +316,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool:
if global_args["main_args"].document_loading_engine == "DOCLING": if global_args["main_args"].document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore if not pm.is_installed("docling"): # type: ignore
pm.install("docling") pm.install("docling")
from docling.document_converter import DocumentConverter from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter() converter = DocumentConverter()
result = converter.convert(file_path) result = converter.convert(file_path)
@@ -307,7 +337,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool:
if global_args["main_args"].document_loading_engine == "DOCLING": if global_args["main_args"].document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore if not pm.is_installed("docling"): # type: ignore
pm.install("docling") pm.install("docling")
from docling.document_converter import DocumentConverter from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter() converter = DocumentConverter()
result = converter.convert(file_path) result = converter.convert(file_path)
@@ -718,17 +748,29 @@ def create_document_routes(
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@router.get("/pipeline_status", dependencies=[Depends(optional_api_key)]) @router.get("/pipeline_status", dependencies=[Depends(optional_api_key)], response_model=PipelineStatusResponse)
async def get_pipeline_status(): async def get_pipeline_status() -> PipelineStatusResponse:
""" """
Get the current status of the document indexing pipeline. Get the current status of the document indexing pipeline.
This endpoint returns information about the current state of the document processing pipeline, This endpoint returns information about the current state of the document processing pipeline,
including whether it's busy, the current job name, when it started, how many documents including the processing status, progress information, and history messages.
are being processed, how many batches there are, and which batch is currently being processed.
Returns: Returns:
dict: A dictionary containing the pipeline status information PipelineStatusResponse: A response object containing:
- autoscanned (bool): Whether auto-scan has started
- busy (bool): Whether the pipeline is currently busy
- job_name (str): Current job name (e.g., indexing files/indexing texts)
- job_start (str, optional): Job start time as ISO format string
- docs (int): Total number of documents to be indexed
- batchs (int): Number of batches for processing documents
- cur_batch (int): Current processing batch
- request_pending (bool): Flag for pending request for processing
- latest_message (str): Latest message from pipeline processing
- history_messages (List[str], optional): List of history messages
Raises:
HTTPException: If an error occurs while retrieving pipeline status (500)
""" """
try: try:
from lightrag.kg.shared_storage import get_namespace_data from lightrag.kg.shared_storage import get_namespace_data
@@ -746,7 +788,7 @@ def create_document_routes(
if status_dict.get("job_start"): if status_dict.get("job_start"):
status_dict["job_start"] = str(status_dict["job_start"]) status_dict["job_start"] = str(status_dict["job_start"])
return status_dict return PipelineStatusResponse(**status_dict)
except Exception as e: except Exception as e:
logger.error(f"Error getting pipeline status: {str(e)}") logger.error(f"Error getting pipeline status: {str(e)}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())