Merge pull request #1068 from danielaskdd/add-pipeline-status-response-obj
Add PipelineStatusResponse model for API endpoint
This commit is contained in:
@@ -99,6 +99,37 @@ class DocsStatusesResponse(BaseModel):
|
|||||||
statuses: Dict[DocStatus, List[DocStatusResponse]] = {}
|
statuses: Dict[DocStatus, List[DocStatusResponse]] = {}
|
||||||
|
|
||||||
|
|
||||||
|
class PipelineStatusResponse(BaseModel):
|
||||||
|
"""Response model for pipeline status
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
autoscanned: Whether auto-scan has started
|
||||||
|
busy: Whether the pipeline is currently busy
|
||||||
|
job_name: Current job name (e.g., indexing files/indexing texts)
|
||||||
|
job_start: Job start time as ISO format string (optional)
|
||||||
|
docs: Total number of documents to be indexed
|
||||||
|
batchs: Number of batches for processing documents
|
||||||
|
cur_batch: Current processing batch
|
||||||
|
request_pending: Flag for pending request for processing
|
||||||
|
latest_message: Latest message from pipeline processing
|
||||||
|
history_messages: List of history messages
|
||||||
|
"""
|
||||||
|
|
||||||
|
autoscanned: bool = False
|
||||||
|
busy: bool = False
|
||||||
|
job_name: str = "Default Job"
|
||||||
|
job_start: Optional[str] = None
|
||||||
|
docs: int = 0
|
||||||
|
batchs: int = 0
|
||||||
|
cur_batch: int = 0
|
||||||
|
request_pending: bool = False
|
||||||
|
latest_message: str = ""
|
||||||
|
history_messages: Optional[List[str]] = None
|
||||||
|
|
||||||
|
class Config:
|
||||||
|
extra = "allow" # Allow additional fields from the pipeline status
|
||||||
|
|
||||||
|
|
||||||
class DocumentManager:
|
class DocumentManager:
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@@ -247,7 +278,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool:
|
|||||||
if global_args["main_args"].document_loading_engine == "DOCLING":
|
if global_args["main_args"].document_loading_engine == "DOCLING":
|
||||||
if not pm.is_installed("docling"): # type: ignore
|
if not pm.is_installed("docling"): # type: ignore
|
||||||
pm.install("docling")
|
pm.install("docling")
|
||||||
from docling.document_converter import DocumentConverter
|
from docling.document_converter import DocumentConverter # type: ignore
|
||||||
|
|
||||||
converter = DocumentConverter()
|
converter = DocumentConverter()
|
||||||
result = converter.convert(file_path)
|
result = converter.convert(file_path)
|
||||||
@@ -266,7 +297,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool:
|
|||||||
if global_args["main_args"].document_loading_engine == "DOCLING":
|
if global_args["main_args"].document_loading_engine == "DOCLING":
|
||||||
if not pm.is_installed("docling"): # type: ignore
|
if not pm.is_installed("docling"): # type: ignore
|
||||||
pm.install("docling")
|
pm.install("docling")
|
||||||
from docling.document_converter import DocumentConverter
|
from docling.document_converter import DocumentConverter # type: ignore
|
||||||
|
|
||||||
converter = DocumentConverter()
|
converter = DocumentConverter()
|
||||||
result = converter.convert(file_path)
|
result = converter.convert(file_path)
|
||||||
@@ -286,7 +317,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool:
|
|||||||
if global_args["main_args"].document_loading_engine == "DOCLING":
|
if global_args["main_args"].document_loading_engine == "DOCLING":
|
||||||
if not pm.is_installed("docling"): # type: ignore
|
if not pm.is_installed("docling"): # type: ignore
|
||||||
pm.install("docling")
|
pm.install("docling")
|
||||||
from docling.document_converter import DocumentConverter
|
from docling.document_converter import DocumentConverter # type: ignore
|
||||||
|
|
||||||
converter = DocumentConverter()
|
converter = DocumentConverter()
|
||||||
result = converter.convert(file_path)
|
result = converter.convert(file_path)
|
||||||
@@ -307,7 +338,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool:
|
|||||||
if global_args["main_args"].document_loading_engine == "DOCLING":
|
if global_args["main_args"].document_loading_engine == "DOCLING":
|
||||||
if not pm.is_installed("docling"): # type: ignore
|
if not pm.is_installed("docling"): # type: ignore
|
||||||
pm.install("docling")
|
pm.install("docling")
|
||||||
from docling.document_converter import DocumentConverter
|
from docling.document_converter import DocumentConverter # type: ignore
|
||||||
|
|
||||||
converter = DocumentConverter()
|
converter = DocumentConverter()
|
||||||
result = converter.convert(file_path)
|
result = converter.convert(file_path)
|
||||||
@@ -718,17 +749,33 @@ def create_document_routes(
|
|||||||
logger.error(traceback.format_exc())
|
logger.error(traceback.format_exc())
|
||||||
raise HTTPException(status_code=500, detail=str(e))
|
raise HTTPException(status_code=500, detail=str(e))
|
||||||
|
|
||||||
@router.get("/pipeline_status", dependencies=[Depends(optional_api_key)])
|
@router.get(
|
||||||
async def get_pipeline_status():
|
"/pipeline_status",
|
||||||
|
dependencies=[Depends(optional_api_key)],
|
||||||
|
response_model=PipelineStatusResponse,
|
||||||
|
)
|
||||||
|
async def get_pipeline_status() -> PipelineStatusResponse:
|
||||||
"""
|
"""
|
||||||
Get the current status of the document indexing pipeline.
|
Get the current status of the document indexing pipeline.
|
||||||
|
|
||||||
This endpoint returns information about the current state of the document processing pipeline,
|
This endpoint returns information about the current state of the document processing pipeline,
|
||||||
including whether it's busy, the current job name, when it started, how many documents
|
including the processing status, progress information, and history messages.
|
||||||
are being processed, how many batches there are, and which batch is currently being processed.
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict: A dictionary containing the pipeline status information
|
PipelineStatusResponse: A response object containing:
|
||||||
|
- autoscanned (bool): Whether auto-scan has started
|
||||||
|
- busy (bool): Whether the pipeline is currently busy
|
||||||
|
- job_name (str): Current job name (e.g., indexing files/indexing texts)
|
||||||
|
- job_start (str, optional): Job start time as ISO format string
|
||||||
|
- docs (int): Total number of documents to be indexed
|
||||||
|
- batchs (int): Number of batches for processing documents
|
||||||
|
- cur_batch (int): Current processing batch
|
||||||
|
- request_pending (bool): Flag for pending request for processing
|
||||||
|
- latest_message (str): Latest message from pipeline processing
|
||||||
|
- history_messages (List[str], optional): List of history messages
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HTTPException: If an error occurs while retrieving pipeline status (500)
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
from lightrag.kg.shared_storage import get_namespace_data
|
from lightrag.kg.shared_storage import get_namespace_data
|
||||||
@@ -746,7 +793,7 @@ def create_document_routes(
|
|||||||
if status_dict.get("job_start"):
|
if status_dict.get("job_start"):
|
||||||
status_dict["job_start"] = str(status_dict["job_start"])
|
status_dict["job_start"] = str(status_dict["job_start"])
|
||||||
|
|
||||||
return status_dict
|
return PipelineStatusResponse(**status_dict)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error getting pipeline status: {str(e)}")
|
logger.error(f"Error getting pipeline status: {str(e)}")
|
||||||
logger.error(traceback.format_exc())
|
logger.error(traceback.format_exc())
|
||||||
|
Reference in New Issue
Block a user