refactor file indexing for background async processing

This commit is contained in:
ArnoChen
2025-02-15 22:25:48 +08:00
parent 4d58ff8bb4
commit 147d73bd56

View File

@@ -14,7 +14,7 @@ import re
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
import logging import logging
import argparse import argparse
from typing import List, Any, Optional, Union, Dict from typing import List, Any, Optional, Dict
from pydantic import BaseModel from pydantic import BaseModel
from lightrag import LightRAG, QueryParam from lightrag import LightRAG, QueryParam
from lightrag.types import GPTKeywordExtractionFormat from lightrag.types import GPTKeywordExtractionFormat
@@ -34,6 +34,9 @@ from starlette.status import HTTP_403_FORBIDDEN
import pipmaster as pm import pipmaster as pm
from dotenv import load_dotenv from dotenv import load_dotenv
import configparser import configparser
import traceback
from datetime import datetime
from lightrag.utils import logger from lightrag.utils import logger
from .ollama_api import ( from .ollama_api import (
OllamaAPI, OllamaAPI,
@@ -645,7 +648,6 @@ class InsertTextRequest(BaseModel):
class InsertResponse(BaseModel): class InsertResponse(BaseModel):
status: str status: str
message: str message: str
document_count: int
def get_api_key_dependency(api_key: Optional[str]): def get_api_key_dependency(api_key: Optional[str]):
@@ -675,6 +677,7 @@ def get_api_key_dependency(api_key: Optional[str]):
# Global configuration # Global configuration
global_top_k = 60 # default value global_top_k = 60 # default value
temp_prefix = "__tmp_" # prefix for temporary files
def create_app(args): def create_app(args):
@@ -1116,79 +1119,122 @@ def create_app(args):
("llm_response_cache", rag.llm_response_cache), ("llm_response_cache", rag.llm_response_cache),
] ]
async def index_file(file_path: Union[str, Path]) -> None: async def index_file(file_path: Path, description: Optional[str] = None):
"""Index all files inside the folder with support for multiple file formats """Index a file
Args: Args:
file_path: Path to the file to be indexed (str or Path object) file_path: Path to the saved file
description: Optional description of the file
Raises:
ValueError: If file format is not supported
FileNotFoundError: If file doesn't exist
""" """
if not pm.is_installed("aiofiles"): try:
pm.install("aiofiles") content = ""
ext = file_path.suffix.lower()
# Convert to Path object if string file = None
file_path = Path(file_path) async with aiofiles.open(file_path, "rb") as f:
file = await f.read()
# Check if file exists # Process based on file type
if not file_path.exists(): match ext:
raise FileNotFoundError(f"File not found: {file_path}") case ".txt" | ".md":
content = file.decode("utf-8")
case ".pdf":
if not pm.is_installed("pypdf2"):
pm.install("pypdf2")
from PyPDF2 import PdfReader
from io import BytesIO
content = "" pdf_file = BytesIO(file)
# Get file extension in lowercase reader = PdfReader(pdf_file)
ext = file_path.suffix.lower() for page in reader.pages:
content += page.extract_text() + "\n"
case ".docx":
if not pm.is_installed("docx"):
pm.install("docx")
from docx import Document
from io import BytesIO
match ext: docx_content = await file.read()
case ".txt" | ".md": docx_file = BytesIO(docx_content)
# Text files handling doc = Document(docx_file)
async with aiofiles.open(file_path, "r", encoding="utf-8") as f: content = "\n".join(
content = await f.read() [paragraph.text for paragraph in doc.paragraphs]
)
case ".pptx":
if not pm.is_installed("pptx"):
pm.install("pptx")
from pptx import Presentation # type: ignore
from io import BytesIO
case ".pdf" | ".docx" | ".pptx" | ".xlsx": pptx_content = await file.read()
if not pm.is_installed("docling"): pptx_file = BytesIO(pptx_content)
pm.install("docling") prs = Presentation(pptx_file)
from docling.document_converter import DocumentConverter for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + "\n"
case _:
logging.error(
f"Unsupported file type: {file_path.name} (extension {ext})"
)
return
async def convert_doc(): # Add description if provided
def sync_convert(): if description:
converter = DocumentConverter() content = f"{description}\n\n{content}"
result = converter.convert(file_path)
return result.document.export_to_markdown()
return await asyncio.to_thread(sync_convert) # Insert into RAG system
if content:
await rag.ainsert(content)
logging.info(
f"Successfully processed and indexed file: {file_path.name}"
)
else:
logging.error(
f"No content could be extracted from file: {file_path.name}"
)
content = await convert_doc() except Exception as e:
logging.error(f"Error indexing file {file_path.name}: {str(e)}")
logging.error(traceback.format_exc())
finally:
if file_path.name.startswith(temp_prefix):
# Clean up the temporary file after indexing
try:
file_path.unlink()
except Exception as e:
logging.error(f"Error deleting file {file_path}: {str(e)}")
case _: async def batch_index_files(file_paths: List[Path]):
raise ValueError(f"Unsupported file format: {ext}") """Index multiple files
# Insert content into RAG system Args:
if content: file_paths: Paths to the files to index
await rag.ainsert(content) """
doc_manager.mark_as_indexed(file_path) for file_path in file_paths:
logging.info(f"Successfully indexed file: {file_path}") await index_file(file_path)
else:
logging.warning(f"No content extracted from file: {file_path}")
@app.post("/documents/scan", dependencies=[Depends(optional_api_key)]) async def save_temp_file(file: UploadFile = File(...)) -> Path:
async def scan_for_new_documents(background_tasks: BackgroundTasks): """Save the uploaded file to a temporary location
"""Trigger the scanning process"""
global scan_progress
with progress_lock: Args:
if scan_progress["is_scanning"]: file: The uploaded file
return {"status": "already_scanning"}
scan_progress["is_scanning"] = True Returns:
scan_progress["indexed_count"] = 0 Path: The path to the saved file
scan_progress["progress"] = 0 """
# Generate unique filename to avoid conflicts
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
unique_filename = f"{temp_prefix}{timestamp}_{file.filename}"
# Start the scanning process in the background # Create a temporary file to save the uploaded content
background_tasks.add_task(run_scanning_process) temp_path = doc_manager.input_dir / "temp" / unique_filename
temp_path.parent.mkdir(exist_ok=True)
return {"status": "scanning_started"} # Save the file
with open(temp_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return temp_path
async def run_scanning_process(): async def run_scanning_process():
"""Background task to scan and index documents""" """Background task to scan and index documents"""
@@ -1221,6 +1267,24 @@ def create_app(args):
with progress_lock: with progress_lock:
scan_progress["is_scanning"] = False scan_progress["is_scanning"] = False
@app.post("/documents/scan", dependencies=[Depends(optional_api_key)])
async def scan_for_new_documents(background_tasks: BackgroundTasks):
"""Trigger the scanning process"""
global scan_progress
with progress_lock:
if scan_progress["is_scanning"]:
return {"status": "already_scanning"}
scan_progress["is_scanning"] = True
scan_progress["indexed_count"] = 0
scan_progress["progress"] = 0
# Start the scanning process in the background
background_tasks.add_task(run_scanning_process)
return {"status": "scanning_started"}
@app.get("/documents/scan-progress") @app.get("/documents/scan-progress")
async def get_scan_progress(): async def get_scan_progress():
"""Get the current scanning progress""" """Get the current scanning progress"""
@@ -1228,7 +1292,9 @@ def create_app(args):
return scan_progress return scan_progress
@app.post("/documents/upload", dependencies=[Depends(optional_api_key)]) @app.post("/documents/upload", dependencies=[Depends(optional_api_key)])
async def upload_to_input_dir(file: UploadFile = File(...)): async def upload_to_input_dir(
background_tasks: BackgroundTasks, file: UploadFile = File(...)
):
""" """
Endpoint for uploading a file to the input directory and indexing it. Endpoint for uploading a file to the input directory and indexing it.
@@ -1237,6 +1303,7 @@ def create_app(args):
indexes it for retrieval, and returns a success status with relevant details. indexes it for retrieval, and returns a success status with relevant details.
Parameters: Parameters:
background_tasks: FastAPI BackgroundTasks for async processing
file (UploadFile): The file to be uploaded. It must have an allowed extension as per file (UploadFile): The file to be uploaded. It must have an allowed extension as per
`doc_manager.supported_extensions`. `doc_manager.supported_extensions`.
@@ -1261,15 +1328,178 @@ def create_app(args):
with open(file_path, "wb") as buffer: with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer) shutil.copyfileobj(file.file, buffer)
# Immediately index the uploaded file # Add to background tasks
await index_file(file_path) background_tasks.add_task(index_file, file_path)
return { return InsertResponse(
"status": "success", status="success",
"message": f"File uploaded and indexed: {file.filename}", message=f"File '{file.filename}' uploaded successfully. Processing will continue in background.",
"total_documents": len(doc_manager.indexed_files), )
}
except Exception as e: except Exception as e:
logging.error(f"Error /documents/upload: {file.filename}: {str(e)}")
logging.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.post(
"/documents/text",
response_model=InsertResponse,
dependencies=[Depends(optional_api_key)],
)
async def insert_text(
request: InsertTextRequest, background_tasks: BackgroundTasks
):
"""
Insert text into the Retrieval-Augmented Generation (RAG) system.
This endpoint allows you to insert text data into the RAG system for later retrieval and use in generating responses.
Args:
request (InsertTextRequest): The request body containing the text to be inserted.
background_tasks: FastAPI BackgroundTasks for async processing
Returns:
InsertResponse: A response object containing the status of the operation, a message, and the number of documents inserted.
"""
try:
background_tasks.add_task(rag.ainsert, request.text)
return InsertResponse(
status="success",
message="Text successfully received. Processing will continue in background.",
)
except Exception as e:
logging.error(f"Error /documents/text: {str(e)}")
logging.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.post(
"/documents/file",
response_model=InsertResponse,
dependencies=[Depends(optional_api_key)],
)
async def insert_file(
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
description: str = Form(None),
):
"""Insert a file directly into the RAG system
Args:
background_tasks: FastAPI BackgroundTasks for async processing
file: Uploaded file
description: Optional description of the file
Returns:
InsertResponse: Status of the insertion operation
Raises:
HTTPException: For unsupported file types or processing errors
"""
try:
if not doc_manager.is_supported_file(file.filename):
raise HTTPException(
status_code=400,
detail=f"Unsupported file type. Supported types: {doc_manager.supported_extensions}",
)
# Create a temporary file to save the uploaded content
temp_path = save_temp_file(file)
# Add to background tasks
background_tasks.add_task(index_file, temp_path, description)
return InsertResponse(
status="success",
message=f"File '{file.filename}' saved successfully. Processing will continue in background.",
)
except Exception as e:
logging.error(f"Error /documents/file: {str(e)}")
logging.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.post(
"/documents/batch",
response_model=InsertResponse,
dependencies=[Depends(optional_api_key)],
)
async def insert_batch(
background_tasks: BackgroundTasks, files: List[UploadFile] = File(...)
):
"""Process multiple files in batch mode
Args:
background_tasks: FastAPI BackgroundTasks for async processing
files: List of files to process
Returns:
InsertResponse: Status of the batch insertion operation
Raises:
HTTPException: For processing errors
"""
try:
inserted_count = 0
failed_files = []
temp_files = []
for file in files:
if doc_manager.is_supported_file(file.filename):
# Create a temporary file to save the uploaded content
temp_files.append(save_temp_file(file))
inserted_count += 1
else:
failed_files.append(f"{file.filename} (unsupported type)")
if temp_files:
background_tasks.add_task(batch_index_files, temp_files)
# Prepare status message
if inserted_count == len(files):
status = "success"
status_message = f"Successfully inserted all {inserted_count} documents"
elif inserted_count > 0:
status = "partial_success"
status_message = f"Successfully inserted {inserted_count} out of {len(files)} documents"
if failed_files:
status_message += f". Failed files: {', '.join(failed_files)}"
else:
status = "failure"
status_message = "No documents were successfully inserted"
if failed_files:
status_message += f". Failed files: {', '.join(failed_files)}"
return InsertResponse(status=status, message=status_message)
except Exception as e:
logging.error(f"Error /documents/batch: {file.filename}: {str(e)}")
logging.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.delete(
"/documents",
response_model=InsertResponse,
dependencies=[Depends(optional_api_key)],
)
async def clear_documents():
"""
Clear all documents from the LightRAG system.
This endpoint deletes all text chunks, entities vector database, and relationships vector database,
effectively clearing all documents from the LightRAG system.
Returns:
InsertResponse: A response object containing the status, message, and the new document count (0 in this case).
"""
try:
rag.text_chunks = []
rag.entities_vdb = None
rag.relationships_vdb = None
return InsertResponse(
status="success", message="All documents cleared successfully"
)
except Exception as e:
logging.error(f"Error DELETE /documents: {str(e)}")
logging.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.post( @app.post(
@@ -1381,255 +1611,6 @@ def create_app(args):
trace_exception(e) trace_exception(e)
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.post(
"/documents/text",
response_model=InsertResponse,
dependencies=[Depends(optional_api_key)],
)
async def insert_text(request: InsertTextRequest):
"""
Insert text into the Retrieval-Augmented Generation (RAG) system.
This endpoint allows you to insert text data into the RAG system for later retrieval and use in generating responses.
Args:
request (InsertTextRequest): The request body containing the text to be inserted.
Returns:
InsertResponse: A response object containing the status of the operation, a message, and the number of documents inserted.
"""
try:
await rag.ainsert(request.text)
return InsertResponse(
status="success",
message="Text successfully inserted",
document_count=1,
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post(
"/documents/file",
response_model=InsertResponse,
dependencies=[Depends(optional_api_key)],
)
async def insert_file(file: UploadFile = File(...), description: str = Form(None)):
"""Insert a file directly into the RAG system
Args:
file: Uploaded file
description: Optional description of the file
Returns:
InsertResponse: Status of the insertion operation
Raises:
HTTPException: For unsupported file types or processing errors
"""
try:
content = ""
# Get file extension in lowercase
ext = Path(file.filename).suffix.lower()
match ext:
case ".txt" | ".md":
# Text files handling
text_content = await file.read()
content = text_content.decode("utf-8")
case ".pdf" | ".docx" | ".pptx" | ".xlsx":
if not pm.is_installed("docling"):
pm.install("docling")
from docling.document_converter import DocumentConverter
# Create a temporary file to save the uploaded content
temp_path = Path("temp") / file.filename
temp_path.parent.mkdir(exist_ok=True)
# Save the uploaded file
with temp_path.open("wb") as f:
f.write(await file.read())
try:
async def convert_doc():
def sync_convert():
converter = DocumentConverter()
result = converter.convert(str(temp_path))
return result.document.export_to_markdown()
return await asyncio.to_thread(sync_convert)
content = await convert_doc()
finally:
# Clean up the temporary file
temp_path.unlink()
# Insert content into RAG system
if content:
# Add description if provided
if description:
content = f"{description}\n\n{content}"
await rag.ainsert(content)
logging.info(f"Successfully indexed file: {file.filename}")
return InsertResponse(
status="success",
message=f"File '{file.filename}' successfully inserted",
document_count=1,
)
else:
raise HTTPException(
status_code=400,
detail="No content could be extracted from the file",
)
except UnicodeDecodeError:
raise HTTPException(status_code=400, detail="File encoding not supported")
except Exception as e:
logging.error(f"Error processing file {file.filename}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post(
"/documents/batch",
response_model=InsertResponse,
dependencies=[Depends(optional_api_key)],
)
async def insert_batch(files: List[UploadFile] = File(...)):
"""Process multiple files in batch mode
Args:
files: List of files to process
Returns:
InsertResponse: Status of the batch insertion operation
Raises:
HTTPException: For processing errors
"""
try:
inserted_count = 0
failed_files = []
for file in files:
try:
content = ""
ext = Path(file.filename).suffix.lower()
match ext:
case ".txt" | ".md":
text_content = await file.read()
content = text_content.decode("utf-8")
case ".pdf":
if not pm.is_installed("pypdf2"):
pm.install("pypdf2")
from PyPDF2 import PdfReader
from io import BytesIO
pdf_content = await file.read()
pdf_file = BytesIO(pdf_content)
reader = PdfReader(pdf_file)
for page in reader.pages:
content += page.extract_text() + "\n"
case ".docx":
if not pm.is_installed("docx"):
pm.install("docx")
from docx import Document
from io import BytesIO
docx_content = await file.read()
docx_file = BytesIO(docx_content)
doc = Document(docx_file)
content = "\n".join(
[paragraph.text for paragraph in doc.paragraphs]
)
case ".pptx":
if not pm.is_installed("pptx"):
pm.install("pptx")
from pptx import Presentation # type: ignore
from io import BytesIO
pptx_content = await file.read()
pptx_file = BytesIO(pptx_content)
prs = Presentation(pptx_file)
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + "\n"
case _:
failed_files.append(f"{file.filename} (unsupported type)")
continue
if content:
await rag.ainsert(content)
inserted_count += 1
logging.info(f"Successfully indexed file: {file.filename}")
else:
failed_files.append(f"{file.filename} (no content extracted)")
except UnicodeDecodeError:
failed_files.append(f"{file.filename} (encoding error)")
except Exception as e:
failed_files.append(f"{file.filename} ({str(e)})")
logging.error(f"Error processing file {file.filename}: {str(e)}")
# Prepare status message
if inserted_count == len(files):
status = "success"
status_message = f"Successfully inserted all {inserted_count} documents"
elif inserted_count > 0:
status = "partial_success"
status_message = f"Successfully inserted {inserted_count} out of {len(files)} documents"
if failed_files:
status_message += f". Failed files: {', '.join(failed_files)}"
else:
status = "failure"
status_message = "No documents were successfully inserted"
if failed_files:
status_message += f". Failed files: {', '.join(failed_files)}"
return InsertResponse(
status=status,
message=status_message,
document_count=inserted_count,
)
except Exception as e:
logging.error(f"Batch processing error: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.delete(
"/documents",
response_model=InsertResponse,
dependencies=[Depends(optional_api_key)],
)
async def clear_documents():
"""
Clear all documents from the LightRAG system.
This endpoint deletes all text chunks, entities vector database, and relationships vector database,
effectively clearing all documents from the LightRAG system.
Returns:
InsertResponse: A response object containing the status, message, and the new document count (0 in this case).
"""
try:
rag.text_chunks = []
rag.entities_vdb = None
rag.relationships_vdb = None
return InsertResponse(
status="success",
message="All documents cleared successfully",
document_count=0,
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# query all graph labels # query all graph labels
@app.get("/graph/label/list") @app.get("/graph/label/list")
async def get_graph_labels(): async def get_graph_labels():