From ca85b82a47207340235360628426e9ffe722876a Mon Sep 17 00:00:00 2001 From: ArnoChen Date: Mon, 17 Feb 2025 01:03:05 +0800 Subject: [PATCH] implement endpoint to retrieve document statuses --- lightrag/api/lightrag_server.py | 66 ++++++++++++++++++++++++++++++--- lightrag/lightrag.py | 10 +++++ 2 files changed, 70 insertions(+), 6 deletions(-) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index a392e67a..2973fa09 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -19,6 +19,7 @@ from lightrag import LightRAG, QueryParam from lightrag.types import GPTKeywordExtractionFormat from lightrag.api import __api_version__ from lightrag.utils import EmbeddingFunc +from lightrag.base import DocStatus, DocProcessingStatus from enum import Enum from pathlib import Path import shutil @@ -693,6 +694,22 @@ class InsertResponse(BaseModel): message: str +class DocStatusResponse(BaseModel): + id: str + content_summary: str + content_length: int + status: DocStatus + created_at: str + updated_at: str + chunks_count: Optional[int] = None + error: Optional[str] = None + metadata: Optional[dict[str, Any]] = None + + +class DocsStatusesResponse(BaseModel): + statuses: Dict[DocStatus, List[DocStatusResponse]] = {} + + def QueryRequestToQueryParams(request: QueryRequest): param = QueryParam(mode=request.mode, stream=request.stream) if request.only_need_context is not None: @@ -1728,20 +1745,57 @@ def create_app(args): app.include_router(ollama_api.router, prefix="/api") @app.get("/documents", dependencies=[Depends(optional_api_key)]) - async def documents(): - """Get current system status""" - return doc_manager.indexed_files + async def documents() -> DocsStatusesResponse: + """ + Get documents statuses + + Returns: + DocsStatusesResponse: A response object containing the status, message, and the number of indexed documents. + """ + try: + statuses = ( + DocStatus.PENDING, + DocStatus.PROCESSING, + DocStatus.PROCESSED, + DocStatus.FAILED, + ) + + tasks = [rag.get_docs_by_status(status) for status in statuses] + results: List[Dict[str, DocProcessingStatus]] = await asyncio.gather(*tasks) + + response = DocsStatusesResponse() + + for idx, result in enumerate(results): + status = statuses[idx] + for doc_id, doc_status in result.items(): + if status not in response.statuses: + response.statuses[status] = [] + response.statuses[status].append( + DocStatusResponse( + id=doc_id, + content_summary=doc_status.content_summary, + content_length=doc_status.content_length, + status=doc_status.status, + created_at=doc_status.created_at, + updated_at=doc_status.updated_at, + chunks_count=doc_status.chunks_count, + error=doc_status.error, + metadata=doc_status.metadata, + ) + ) + return response + except Exception as e: + logging.error(f"Error GET /documents: {str(e)}") + logging.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=str(e)) @app.get("/health", dependencies=[Depends(optional_api_key)]) async def get_status(): """Get current system status""" - files = doc_manager.scan_directory() return { "status": "healthy", "working_directory": str(args.working_dir), "input_directory": str(args.input_dir), - "indexed_files": [str(f) for f in files], - "indexed_files_count": len(files), "configuration": { # LLM configuration binding/host address (if applicable)/model (if applicable) "llm_binding": args.llm_binding, diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 9909b4b7..a7c5920e 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1254,6 +1254,16 @@ class LightRAG: """ return await self.doc_status.get_status_counts() + async def get_docs_by_status( + self, status: DocStatus + ) -> dict[str, DocProcessingStatus]: + """Get documents by status + + Returns: + Dict with document id is keys and document status is values + """ + return await self.doc_status.get_docs_by_status(status) + async def adelete_by_doc_id(self, doc_id: str) -> None: """Delete a document and all its related data