implement endpoint to retrieve document statuses

This commit is contained in:
ArnoChen
2025-02-17 01:03:05 +08:00
parent 801a980c03
commit ca85b82a47
2 changed files with 70 additions and 6 deletions

View File

@@ -19,6 +19,7 @@ from lightrag import LightRAG, QueryParam
from lightrag.types import GPTKeywordExtractionFormat from lightrag.types import GPTKeywordExtractionFormat
from lightrag.api import __api_version__ from lightrag.api import __api_version__
from lightrag.utils import EmbeddingFunc from lightrag.utils import EmbeddingFunc
from lightrag.base import DocStatus, DocProcessingStatus
from enum import Enum from enum import Enum
from pathlib import Path from pathlib import Path
import shutil import shutil
@@ -693,6 +694,22 @@ class InsertResponse(BaseModel):
message: str message: str
class DocStatusResponse(BaseModel):
id: str
content_summary: str
content_length: int
status: DocStatus
created_at: str
updated_at: str
chunks_count: Optional[int] = None
error: Optional[str] = None
metadata: Optional[dict[str, Any]] = None
class DocsStatusesResponse(BaseModel):
statuses: Dict[DocStatus, List[DocStatusResponse]] = {}
def QueryRequestToQueryParams(request: QueryRequest): def QueryRequestToQueryParams(request: QueryRequest):
param = QueryParam(mode=request.mode, stream=request.stream) param = QueryParam(mode=request.mode, stream=request.stream)
if request.only_need_context is not None: if request.only_need_context is not None:
@@ -1728,20 +1745,57 @@ def create_app(args):
app.include_router(ollama_api.router, prefix="/api") app.include_router(ollama_api.router, prefix="/api")
@app.get("/documents", dependencies=[Depends(optional_api_key)]) @app.get("/documents", dependencies=[Depends(optional_api_key)])
async def documents(): async def documents() -> DocsStatusesResponse:
"""Get current system status""" """
return doc_manager.indexed_files Get documents statuses
Returns:
DocsStatusesResponse: A response object containing the status, message, and the number of indexed documents.
"""
try:
statuses = (
DocStatus.PENDING,
DocStatus.PROCESSING,
DocStatus.PROCESSED,
DocStatus.FAILED,
)
tasks = [rag.get_docs_by_status(status) for status in statuses]
results: List[Dict[str, DocProcessingStatus]] = await asyncio.gather(*tasks)
response = DocsStatusesResponse()
for idx, result in enumerate(results):
status = statuses[idx]
for doc_id, doc_status in result.items():
if status not in response.statuses:
response.statuses[status] = []
response.statuses[status].append(
DocStatusResponse(
id=doc_id,
content_summary=doc_status.content_summary,
content_length=doc_status.content_length,
status=doc_status.status,
created_at=doc_status.created_at,
updated_at=doc_status.updated_at,
chunks_count=doc_status.chunks_count,
error=doc_status.error,
metadata=doc_status.metadata,
)
)
return response
except Exception as e:
logging.error(f"Error GET /documents: {str(e)}")
logging.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health", dependencies=[Depends(optional_api_key)]) @app.get("/health", dependencies=[Depends(optional_api_key)])
async def get_status(): async def get_status():
"""Get current system status""" """Get current system status"""
files = doc_manager.scan_directory()
return { return {
"status": "healthy", "status": "healthy",
"working_directory": str(args.working_dir), "working_directory": str(args.working_dir),
"input_directory": str(args.input_dir), "input_directory": str(args.input_dir),
"indexed_files": [str(f) for f in files],
"indexed_files_count": len(files),
"configuration": { "configuration": {
# LLM configuration binding/host address (if applicable)/model (if applicable) # LLM configuration binding/host address (if applicable)/model (if applicable)
"llm_binding": args.llm_binding, "llm_binding": args.llm_binding,

View File

@@ -1254,6 +1254,16 @@ class LightRAG:
""" """
return await self.doc_status.get_status_counts() return await self.doc_status.get_status_counts()
async def get_docs_by_status(
self, status: DocStatus
) -> dict[str, DocProcessingStatus]:
"""Get documents by status
Returns:
Dict with document id is keys and document status is values
"""
return await self.doc_status.get_docs_by_status(status)
async def adelete_by_doc_id(self, doc_id: str) -> None: async def adelete_by_doc_id(self, doc_id: str) -> None:
"""Delete a document and all its related data """Delete a document and all its related data