feat(api): Add Pydantic models for all endpoints in document_routes.py

This commit is contained in:
yangdx
2025-03-31 23:53:14 +08:00
parent 8845779ed7
commit d54bda8d36

View File

@@ -10,7 +10,7 @@ import traceback
import pipmaster as pm import pipmaster as pm
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional, Any from typing import Dict, List, Optional, Any, Literal
from fastapi import APIRouter, BackgroundTasks, Depends, File, HTTPException, UploadFile from fastapi import APIRouter, BackgroundTasks, Depends, File, HTTPException, UploadFile
from pydantic import BaseModel, Field, field_validator from pydantic import BaseModel, Field, field_validator
@@ -30,7 +30,37 @@ router = APIRouter(
temp_prefix = "__tmp__" temp_prefix = "__tmp__"
class ScanResponse(BaseModel):
"""Response model for document scanning operation
Attributes:
status: Status of the scanning operation
message: Optional message with additional details
"""
status: Literal["scanning_started"] = Field(
description="Status of the scanning operation"
)
message: Optional[str] = Field(
default=None, description="Additional details about the scanning operation"
)
class Config:
json_schema_extra = {
"example": {
"status": "scanning_started",
"message": "Scanning process has been initiated in the background",
}
}
class InsertTextRequest(BaseModel): class InsertTextRequest(BaseModel):
"""Request model for inserting a single text document
Attributes:
text: The text content to be inserted into the RAG system
"""
text: str = Field( text: str = Field(
min_length=1, min_length=1,
description="The text to insert", description="The text to insert",
@@ -41,8 +71,21 @@ class InsertTextRequest(BaseModel):
def strip_after(cls, text: str) -> str: def strip_after(cls, text: str) -> str:
return text.strip() return text.strip()
class Config:
json_schema_extra = {
"example": {
"text": "This is a sample text to be inserted into the RAG system."
}
}
class InsertTextsRequest(BaseModel): class InsertTextsRequest(BaseModel):
"""Request model for inserting multiple text documents
Attributes:
texts: List of text contents to be inserted into the RAG system
"""
texts: list[str] = Field( texts: list[str] = Field(
min_length=1, min_length=1,
description="The texts to insert", description="The texts to insert",
@@ -53,39 +96,100 @@ class InsertTextsRequest(BaseModel):
def strip_after(cls, texts: list[str]) -> list[str]: def strip_after(cls, texts: list[str]) -> list[str]:
return [text.strip() for text in texts] return [text.strip() for text in texts]
class Config:
json_schema_extra = {
"example": {
"texts": [
"This is the first text to be inserted.",
"This is the second text to be inserted.",
]
}
}
class InsertResponse(BaseModel): class InsertResponse(BaseModel):
status: str = Field(description="Status of the operation") """Response model for document insertion operations
Attributes:
status: Status of the operation (success, duplicated, partial_success, failure)
message: Detailed message describing the operation result
"""
status: Literal["success", "duplicated", "partial_success", "failure"] = Field(
description="Status of the operation"
)
message: str = Field(description="Message describing the operation result") message: str = Field(description="Message describing the operation result")
class Config:
json_schema_extra = {
"example": {
"status": "success",
"message": "File 'document.pdf' uploaded successfully. Processing will continue in background.",
}
}
class ClearDocumentsResponse(BaseModel): class ClearDocumentsResponse(BaseModel):
status: str = Field( """Response model for document clearing operation
description="Status of the clear operation: success/partial_success/busy/fail"
Attributes:
status: Status of the clear operation
message: Detailed message describing the operation result
"""
status: Literal["success", "partial_success", "busy", "fail"] = Field(
description="Status of the clear operation"
) )
message: str = Field(description="Message describing the operation result") message: str = Field(description="Message describing the operation result")
class Config:
json_schema_extra = {
"example": {
"status": "success",
"message": "All documents cleared successfully. Deleted 15 files.",
}
}
class ClearCacheRequest(BaseModel): class ClearCacheRequest(BaseModel):
modes: Optional[List[str]] = Field( """Request model for clearing cache
Attributes:
modes: Optional list of cache modes to clear
"""
modes: Optional[
List[Literal["default", "naive", "local", "global", "hybrid", "mix"]]
] = Field(
default=None, default=None,
description="Modes of cache to clear. Options: ['default', 'naive', 'local', 'global', 'hybrid', 'mix']. If None, clears all cache.", description="Modes of cache to clear. If None, clears all cache.",
) )
class Config:
json_schema_extra = {"example": {"modes": ["default", "naive"]}}
class ClearCacheResponse(BaseModel): class ClearCacheResponse(BaseModel):
status: str = Field(description="Status of the clear operation: success/fail") """Response model for cache clearing operation
Attributes:
status: Status of the clear operation
message: Detailed message describing the operation result
"""
status: Literal["success", "fail"] = Field(
description="Status of the clear operation"
)
message: str = Field(description="Message describing the operation result") message: str = Field(description="Message describing the operation result")
class Config:
json_schema_extra = {
"example": {
"status": "success",
"message": "Successfully cleared cache for modes: ['default', 'naive']",
}
}
class DocStatusResponse(BaseModel):
@staticmethod
def format_datetime(dt: Any) -> Optional[str]:
if dt is None:
return None
if isinstance(dt, str):
return dt
return dt.isoformat()
"""Response model for document status """Response model for document status
@@ -99,22 +203,95 @@ class DocStatusResponse(BaseModel):
chunks_count: Number of chunks (optional) chunks_count: Number of chunks (optional)
error: Error message if any (optional) error: Error message if any (optional)
metadata: Additional metadata (optional) metadata: Additional metadata (optional)
file_path: Path to the document file
""" """
id: str
content_summary: str class DocStatusResponse(BaseModel):
content_length: int @staticmethod
status: DocStatus def format_datetime(dt: Any) -> Optional[str]:
created_at: str if dt is None:
updated_at: str return None
chunks_count: Optional[int] = None if isinstance(dt, str):
error: Optional[str] = None return dt
metadata: Optional[dict[str, Any]] = None return dt.isoformat()
file_path: str
id: str = Field(description="Document identifier")
content_summary: str = Field(description="Summary of document content")
content_length: int = Field(description="Length of document content in characters")
status: DocStatus = Field(description="Current processing status")
created_at: str = Field(description="Creation timestamp (ISO format string)")
updated_at: str = Field(description="Last update timestamp (ISO format string)")
chunks_count: Optional[int] = Field(
default=None, description="Number of chunks the document was split into"
)
error: Optional[str] = Field(
default=None, description="Error message if processing failed"
)
metadata: Optional[dict[str, Any]] = Field(
default=None, description="Additional metadata about the document"
)
file_path: str = Field(description="Path to the document file")
class Config:
json_schema_extra = {
"example": {
"id": "doc_123456",
"content_summary": "Research paper on machine learning",
"content_length": 15240,
"status": "PROCESSED",
"created_at": "2025-03-31T12:34:56",
"updated_at": "2025-03-31T12:35:30",
"chunks_count": 12,
"error": None,
"metadata": {"author": "John Doe", "year": 2025},
"file_path": "research_paper.pdf",
}
}
class DocsStatusesResponse(BaseModel): class DocsStatusesResponse(BaseModel):
statuses: Dict[DocStatus, List[DocStatusResponse]] = {} """Response model for document statuses
Attributes:
statuses: Dictionary mapping document status to lists of document status responses
"""
statuses: Dict[DocStatus, List[DocStatusResponse]] = Field(
default_factory=dict,
description="Dictionary mapping document status to lists of document status responses",
)
class Config:
json_schema_extra = {
"example": {
"statuses": {
"PENDING": [
{
"id": "doc_123",
"content_summary": "Pending document",
"content_length": 5000,
"status": "PENDING",
"created_at": "2025-03-31T10:00:00",
"updated_at": "2025-03-31T10:00:00",
"file_path": "pending_doc.pdf",
}
],
"PROCESSED": [
{
"id": "doc_456",
"content_summary": "Processed document",
"content_length": 8000,
"status": "PROCESSED",
"created_at": "2025-03-31T09:00:00",
"updated_at": "2025-03-31T09:05:00",
"chunks_count": 8,
"file_path": "processed_doc.pdf",
}
],
}
}
}
class PipelineStatusResponse(BaseModel): class PipelineStatusResponse(BaseModel):
@@ -529,7 +706,9 @@ def create_document_routes(
# Create combined auth dependency for document routes # Create combined auth dependency for document routes
combined_auth = get_combined_auth_dependency(api_key) combined_auth = get_combined_auth_dependency(api_key)
@router.post("/scan", dependencies=[Depends(combined_auth)]) @router.post(
"/scan", response_model=ScanResponse, dependencies=[Depends(combined_auth)]
)
async def scan_for_new_documents(background_tasks: BackgroundTasks): async def scan_for_new_documents(background_tasks: BackgroundTasks):
""" """
Trigger the scanning process for new documents. Trigger the scanning process for new documents.
@@ -539,13 +718,18 @@ def create_document_routes(
that fact. that fact.
Returns: Returns:
dict: A dictionary containing the scanning status ScanResponse: A response object containing the scanning status
""" """
# Start the scanning process in the background # Start the scanning process in the background
background_tasks.add_task(run_scanning_process, rag, doc_manager) background_tasks.add_task(run_scanning_process, rag, doc_manager)
return {"status": "scanning_started"} return ScanResponse(
status="scanning_started",
message="Scanning process has been initiated in the background",
)
@router.post("/upload", dependencies=[Depends(combined_auth)]) @router.post(
"/upload", response_model=InsertResponse, dependencies=[Depends(combined_auth)]
)
async def upload_to_input_dir( async def upload_to_input_dir(
background_tasks: BackgroundTasks, file: UploadFile = File(...) background_tasks: BackgroundTasks, file: UploadFile = File(...)
): ):
@@ -1016,7 +1200,9 @@ def create_document_routes(
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@router.get("", dependencies=[Depends(combined_auth)]) @router.get(
"", response_model=DocsStatusesResponse, dependencies=[Depends(combined_auth)]
)
async def documents() -> DocsStatusesResponse: async def documents() -> DocsStatusesResponse:
""" """
Get the status of all documents in the system. Get the status of all documents in the system.