Add API endpoint to retrieve document indexing pipeline status

• GET /pipeline_status endpoint added
• Returns current pipeline processing state
This commit is contained in:
yangdx
2025-02-28 12:21:50 +08:00
parent b2da69b7f1
commit 04bd5413c9

View File

@@ -653,6 +653,35 @@ def create_document_routes(
logging.error(traceback.format_exc()) logging.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@router.get("/pipeline_status", dependencies=[Depends(optional_api_key)])
async def get_pipeline_status():
"""
Get the current status of the document indexing pipeline.
This endpoint returns information about the current state of the document processing pipeline,
including whether it's busy, the current job name, when it started, how many documents
are being processed, how many batches there are, and which batch is currently being processed.
Returns:
dict: A dictionary containing the pipeline status information
"""
try:
from lightrag.kg.shared_storage import get_namespace_data
pipeline_status = get_namespace_data("pipeline_status")
# Convert to regular dict if it's a Manager.dict
status_dict = dict(pipeline_status)
# Format the job_start time if it exists
if status_dict.get("job_start"):
status_dict["job_start"] = str(status_dict["job_start"])
return status_dict
except Exception as e:
logging.error(f"Error getting pipeline status: {str(e)}")
logging.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@router.get("", dependencies=[Depends(optional_api_key)]) @router.get("", dependencies=[Depends(optional_api_key)])
async def documents() -> DocsStatusesResponse: async def documents() -> DocsStatusesResponse:
""" """