From 147d73bd56ed03ecc0770e8ec0c7a47305d3b0cd Mon Sep 17 00:00:00 2001 From: ArnoChen Date: Sat, 15 Feb 2025 22:25:48 +0800 Subject: [PATCH] refactor file indexing for background async processing --- lightrag/api/lightrag_server.py | 609 ++++++++++++++++---------------- 1 file changed, 295 insertions(+), 314 deletions(-) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 1aeff264..c51933b3 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -14,7 +14,7 @@ import re from fastapi.staticfiles import StaticFiles import logging import argparse -from typing import List, Any, Optional, Union, Dict +from typing import List, Any, Optional, Dict from pydantic import BaseModel from lightrag import LightRAG, QueryParam from lightrag.types import GPTKeywordExtractionFormat @@ -34,6 +34,9 @@ from starlette.status import HTTP_403_FORBIDDEN import pipmaster as pm from dotenv import load_dotenv import configparser +import traceback +from datetime import datetime + from lightrag.utils import logger from .ollama_api import ( OllamaAPI, @@ -645,7 +648,6 @@ class InsertTextRequest(BaseModel): class InsertResponse(BaseModel): status: str message: str - document_count: int def get_api_key_dependency(api_key: Optional[str]): @@ -675,6 +677,7 @@ def get_api_key_dependency(api_key: Optional[str]): # Global configuration global_top_k = 60 # default value +temp_prefix = "__tmp_" # prefix for temporary files def create_app(args): @@ -1116,79 +1119,122 @@ def create_app(args): ("llm_response_cache", rag.llm_response_cache), ] - async def index_file(file_path: Union[str, Path]) -> None: - """Index all files inside the folder with support for multiple file formats + async def index_file(file_path: Path, description: Optional[str] = None): + """Index a file Args: - file_path: Path to the file to be indexed (str or Path object) - - Raises: - ValueError: If file format is not supported - FileNotFoundError: If file doesn't exist + file_path: Path to the saved file + description: Optional description of the file """ - if not pm.is_installed("aiofiles"): - pm.install("aiofiles") + try: + content = "" + ext = file_path.suffix.lower() - # Convert to Path object if string - file_path = Path(file_path) + file = None + async with aiofiles.open(file_path, "rb") as f: + file = await f.read() - # Check if file exists - if not file_path.exists(): - raise FileNotFoundError(f"File not found: {file_path}") + # Process based on file type + match ext: + case ".txt" | ".md": + content = file.decode("utf-8") + case ".pdf": + if not pm.is_installed("pypdf2"): + pm.install("pypdf2") + from PyPDF2 import PdfReader + from io import BytesIO - content = "" - # Get file extension in lowercase - ext = file_path.suffix.lower() + pdf_file = BytesIO(file) + reader = PdfReader(pdf_file) + for page in reader.pages: + content += page.extract_text() + "\n" + case ".docx": + if not pm.is_installed("docx"): + pm.install("docx") + from docx import Document + from io import BytesIO - match ext: - case ".txt" | ".md": - # Text files handling - async with aiofiles.open(file_path, "r", encoding="utf-8") as f: - content = await f.read() + docx_content = await file.read() + docx_file = BytesIO(docx_content) + doc = Document(docx_file) + content = "\n".join( + [paragraph.text for paragraph in doc.paragraphs] + ) + case ".pptx": + if not pm.is_installed("pptx"): + pm.install("pptx") + from pptx import Presentation # type: ignore + from io import BytesIO - case ".pdf" | ".docx" | ".pptx" | ".xlsx": - if not pm.is_installed("docling"): - pm.install("docling") - from docling.document_converter import DocumentConverter + pptx_content = await file.read() + pptx_file = BytesIO(pptx_content) + prs = Presentation(pptx_file) + for slide in prs.slides: + for shape in slide.shapes: + if hasattr(shape, "text"): + content += shape.text + "\n" + case _: + logging.error( + f"Unsupported file type: {file_path.name} (extension {ext})" + ) + return - async def convert_doc(): - def sync_convert(): - converter = DocumentConverter() - result = converter.convert(file_path) - return result.document.export_to_markdown() + # Add description if provided + if description: + content = f"{description}\n\n{content}" - return await asyncio.to_thread(sync_convert) + # Insert into RAG system + if content: + await rag.ainsert(content) + logging.info( + f"Successfully processed and indexed file: {file_path.name}" + ) + else: + logging.error( + f"No content could be extracted from file: {file_path.name}" + ) - content = await convert_doc() + except Exception as e: + logging.error(f"Error indexing file {file_path.name}: {str(e)}") + logging.error(traceback.format_exc()) + finally: + if file_path.name.startswith(temp_prefix): + # Clean up the temporary file after indexing + try: + file_path.unlink() + except Exception as e: + logging.error(f"Error deleting file {file_path}: {str(e)}") - case _: - raise ValueError(f"Unsupported file format: {ext}") + async def batch_index_files(file_paths: List[Path]): + """Index multiple files - # Insert content into RAG system - if content: - await rag.ainsert(content) - doc_manager.mark_as_indexed(file_path) - logging.info(f"Successfully indexed file: {file_path}") - else: - logging.warning(f"No content extracted from file: {file_path}") + Args: + file_paths: Paths to the files to index + """ + for file_path in file_paths: + await index_file(file_path) - @app.post("/documents/scan", dependencies=[Depends(optional_api_key)]) - async def scan_for_new_documents(background_tasks: BackgroundTasks): - """Trigger the scanning process""" - global scan_progress + async def save_temp_file(file: UploadFile = File(...)) -> Path: + """Save the uploaded file to a temporary location - with progress_lock: - if scan_progress["is_scanning"]: - return {"status": "already_scanning"} + Args: + file: The uploaded file - scan_progress["is_scanning"] = True - scan_progress["indexed_count"] = 0 - scan_progress["progress"] = 0 + Returns: + Path: The path to the saved file + """ + # Generate unique filename to avoid conflicts + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + unique_filename = f"{temp_prefix}{timestamp}_{file.filename}" - # Start the scanning process in the background - background_tasks.add_task(run_scanning_process) + # Create a temporary file to save the uploaded content + temp_path = doc_manager.input_dir / "temp" / unique_filename + temp_path.parent.mkdir(exist_ok=True) - return {"status": "scanning_started"} + # Save the file + with open(temp_path, "wb") as buffer: + shutil.copyfileobj(file.file, buffer) + return temp_path async def run_scanning_process(): """Background task to scan and index documents""" @@ -1221,6 +1267,24 @@ def create_app(args): with progress_lock: scan_progress["is_scanning"] = False + @app.post("/documents/scan", dependencies=[Depends(optional_api_key)]) + async def scan_for_new_documents(background_tasks: BackgroundTasks): + """Trigger the scanning process""" + global scan_progress + + with progress_lock: + if scan_progress["is_scanning"]: + return {"status": "already_scanning"} + + scan_progress["is_scanning"] = True + scan_progress["indexed_count"] = 0 + scan_progress["progress"] = 0 + + # Start the scanning process in the background + background_tasks.add_task(run_scanning_process) + + return {"status": "scanning_started"} + @app.get("/documents/scan-progress") async def get_scan_progress(): """Get the current scanning progress""" @@ -1228,7 +1292,9 @@ def create_app(args): return scan_progress @app.post("/documents/upload", dependencies=[Depends(optional_api_key)]) - async def upload_to_input_dir(file: UploadFile = File(...)): + async def upload_to_input_dir( + background_tasks: BackgroundTasks, file: UploadFile = File(...) + ): """ Endpoint for uploading a file to the input directory and indexing it. @@ -1237,6 +1303,7 @@ def create_app(args): indexes it for retrieval, and returns a success status with relevant details. Parameters: + background_tasks: FastAPI BackgroundTasks for async processing file (UploadFile): The file to be uploaded. It must have an allowed extension as per `doc_manager.supported_extensions`. @@ -1261,15 +1328,178 @@ def create_app(args): with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) - # Immediately index the uploaded file - await index_file(file_path) + # Add to background tasks + background_tasks.add_task(index_file, file_path) - return { - "status": "success", - "message": f"File uploaded and indexed: {file.filename}", - "total_documents": len(doc_manager.indexed_files), - } + return InsertResponse( + status="success", + message=f"File '{file.filename}' uploaded successfully. Processing will continue in background.", + ) except Exception as e: + logging.error(f"Error /documents/upload: {file.filename}: {str(e)}") + logging.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=str(e)) + + @app.post( + "/documents/text", + response_model=InsertResponse, + dependencies=[Depends(optional_api_key)], + ) + async def insert_text( + request: InsertTextRequest, background_tasks: BackgroundTasks + ): + """ + Insert text into the Retrieval-Augmented Generation (RAG) system. + + This endpoint allows you to insert text data into the RAG system for later retrieval and use in generating responses. + + Args: + request (InsertTextRequest): The request body containing the text to be inserted. + background_tasks: FastAPI BackgroundTasks for async processing + + Returns: + InsertResponse: A response object containing the status of the operation, a message, and the number of documents inserted. + """ + try: + background_tasks.add_task(rag.ainsert, request.text) + return InsertResponse( + status="success", + message="Text successfully received. Processing will continue in background.", + ) + except Exception as e: + logging.error(f"Error /documents/text: {str(e)}") + logging.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=str(e)) + + @app.post( + "/documents/file", + response_model=InsertResponse, + dependencies=[Depends(optional_api_key)], + ) + async def insert_file( + background_tasks: BackgroundTasks, + file: UploadFile = File(...), + description: str = Form(None), + ): + """Insert a file directly into the RAG system + + Args: + background_tasks: FastAPI BackgroundTasks for async processing + file: Uploaded file + description: Optional description of the file + + Returns: + InsertResponse: Status of the insertion operation + + Raises: + HTTPException: For unsupported file types or processing errors + """ + try: + if not doc_manager.is_supported_file(file.filename): + raise HTTPException( + status_code=400, + detail=f"Unsupported file type. Supported types: {doc_manager.supported_extensions}", + ) + + # Create a temporary file to save the uploaded content + temp_path = save_temp_file(file) + + # Add to background tasks + background_tasks.add_task(index_file, temp_path, description) + + return InsertResponse( + status="success", + message=f"File '{file.filename}' saved successfully. Processing will continue in background.", + ) + + except Exception as e: + logging.error(f"Error /documents/file: {str(e)}") + logging.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=str(e)) + + @app.post( + "/documents/batch", + response_model=InsertResponse, + dependencies=[Depends(optional_api_key)], + ) + async def insert_batch( + background_tasks: BackgroundTasks, files: List[UploadFile] = File(...) + ): + """Process multiple files in batch mode + + Args: + background_tasks: FastAPI BackgroundTasks for async processing + files: List of files to process + + Returns: + InsertResponse: Status of the batch insertion operation + + Raises: + HTTPException: For processing errors + """ + try: + inserted_count = 0 + failed_files = [] + temp_files = [] + + for file in files: + if doc_manager.is_supported_file(file.filename): + # Create a temporary file to save the uploaded content + temp_files.append(save_temp_file(file)) + inserted_count += 1 + else: + failed_files.append(f"{file.filename} (unsupported type)") + + if temp_files: + background_tasks.add_task(batch_index_files, temp_files) + + # Prepare status message + if inserted_count == len(files): + status = "success" + status_message = f"Successfully inserted all {inserted_count} documents" + elif inserted_count > 0: + status = "partial_success" + status_message = f"Successfully inserted {inserted_count} out of {len(files)} documents" + if failed_files: + status_message += f". Failed files: {', '.join(failed_files)}" + else: + status = "failure" + status_message = "No documents were successfully inserted" + if failed_files: + status_message += f". Failed files: {', '.join(failed_files)}" + + return InsertResponse(status=status, message=status_message) + + except Exception as e: + logging.error(f"Error /documents/batch: {file.filename}: {str(e)}") + logging.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=str(e)) + + @app.delete( + "/documents", + response_model=InsertResponse, + dependencies=[Depends(optional_api_key)], + ) + async def clear_documents(): + """ + Clear all documents from the LightRAG system. + + This endpoint deletes all text chunks, entities vector database, and relationships vector database, + effectively clearing all documents from the LightRAG system. + + Returns: + InsertResponse: A response object containing the status, message, and the new document count (0 in this case). + """ + try: + rag.text_chunks = [] + rag.entities_vdb = None + rag.relationships_vdb = None + return InsertResponse( + status="success", message="All documents cleared successfully" + ) + except Exception as e: + logging.error(f"Error DELETE /documents: {str(e)}") + logging.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) @app.post( @@ -1381,255 +1611,6 @@ def create_app(args): trace_exception(e) raise HTTPException(status_code=500, detail=str(e)) - @app.post( - "/documents/text", - response_model=InsertResponse, - dependencies=[Depends(optional_api_key)], - ) - async def insert_text(request: InsertTextRequest): - """ - Insert text into the Retrieval-Augmented Generation (RAG) system. - - This endpoint allows you to insert text data into the RAG system for later retrieval and use in generating responses. - - Args: - request (InsertTextRequest): The request body containing the text to be inserted. - - Returns: - InsertResponse: A response object containing the status of the operation, a message, and the number of documents inserted. - """ - try: - await rag.ainsert(request.text) - return InsertResponse( - status="success", - message="Text successfully inserted", - document_count=1, - ) - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - - @app.post( - "/documents/file", - response_model=InsertResponse, - dependencies=[Depends(optional_api_key)], - ) - async def insert_file(file: UploadFile = File(...), description: str = Form(None)): - """Insert a file directly into the RAG system - - Args: - file: Uploaded file - description: Optional description of the file - - Returns: - InsertResponse: Status of the insertion operation - - Raises: - HTTPException: For unsupported file types or processing errors - """ - try: - content = "" - # Get file extension in lowercase - ext = Path(file.filename).suffix.lower() - - match ext: - case ".txt" | ".md": - # Text files handling - text_content = await file.read() - content = text_content.decode("utf-8") - - case ".pdf" | ".docx" | ".pptx" | ".xlsx": - if not pm.is_installed("docling"): - pm.install("docling") - from docling.document_converter import DocumentConverter - - # Create a temporary file to save the uploaded content - temp_path = Path("temp") / file.filename - temp_path.parent.mkdir(exist_ok=True) - - # Save the uploaded file - with temp_path.open("wb") as f: - f.write(await file.read()) - - try: - - async def convert_doc(): - def sync_convert(): - converter = DocumentConverter() - result = converter.convert(str(temp_path)) - return result.document.export_to_markdown() - - return await asyncio.to_thread(sync_convert) - - content = await convert_doc() - finally: - # Clean up the temporary file - temp_path.unlink() - - # Insert content into RAG system - if content: - # Add description if provided - if description: - content = f"{description}\n\n{content}" - - await rag.ainsert(content) - logging.info(f"Successfully indexed file: {file.filename}") - - return InsertResponse( - status="success", - message=f"File '{file.filename}' successfully inserted", - document_count=1, - ) - else: - raise HTTPException( - status_code=400, - detail="No content could be extracted from the file", - ) - - except UnicodeDecodeError: - raise HTTPException(status_code=400, detail="File encoding not supported") - except Exception as e: - logging.error(f"Error processing file {file.filename}: {str(e)}") - raise HTTPException(status_code=500, detail=str(e)) - - @app.post( - "/documents/batch", - response_model=InsertResponse, - dependencies=[Depends(optional_api_key)], - ) - async def insert_batch(files: List[UploadFile] = File(...)): - """Process multiple files in batch mode - - Args: - files: List of files to process - - Returns: - InsertResponse: Status of the batch insertion operation - - Raises: - HTTPException: For processing errors - """ - try: - inserted_count = 0 - failed_files = [] - - for file in files: - try: - content = "" - ext = Path(file.filename).suffix.lower() - - match ext: - case ".txt" | ".md": - text_content = await file.read() - content = text_content.decode("utf-8") - - case ".pdf": - if not pm.is_installed("pypdf2"): - pm.install("pypdf2") - from PyPDF2 import PdfReader - from io import BytesIO - - pdf_content = await file.read() - pdf_file = BytesIO(pdf_content) - reader = PdfReader(pdf_file) - for page in reader.pages: - content += page.extract_text() + "\n" - - case ".docx": - if not pm.is_installed("docx"): - pm.install("docx") - from docx import Document - from io import BytesIO - - docx_content = await file.read() - docx_file = BytesIO(docx_content) - doc = Document(docx_file) - content = "\n".join( - [paragraph.text for paragraph in doc.paragraphs] - ) - - case ".pptx": - if not pm.is_installed("pptx"): - pm.install("pptx") - from pptx import Presentation # type: ignore - from io import BytesIO - - pptx_content = await file.read() - pptx_file = BytesIO(pptx_content) - prs = Presentation(pptx_file) - for slide in prs.slides: - for shape in slide.shapes: - if hasattr(shape, "text"): - content += shape.text + "\n" - - case _: - failed_files.append(f"{file.filename} (unsupported type)") - continue - - if content: - await rag.ainsert(content) - inserted_count += 1 - logging.info(f"Successfully indexed file: {file.filename}") - else: - failed_files.append(f"{file.filename} (no content extracted)") - - except UnicodeDecodeError: - failed_files.append(f"{file.filename} (encoding error)") - except Exception as e: - failed_files.append(f"{file.filename} ({str(e)})") - logging.error(f"Error processing file {file.filename}: {str(e)}") - - # Prepare status message - if inserted_count == len(files): - status = "success" - status_message = f"Successfully inserted all {inserted_count} documents" - elif inserted_count > 0: - status = "partial_success" - status_message = f"Successfully inserted {inserted_count} out of {len(files)} documents" - if failed_files: - status_message += f". Failed files: {', '.join(failed_files)}" - else: - status = "failure" - status_message = "No documents were successfully inserted" - if failed_files: - status_message += f". Failed files: {', '.join(failed_files)}" - - return InsertResponse( - status=status, - message=status_message, - document_count=inserted_count, - ) - - except Exception as e: - logging.error(f"Batch processing error: {str(e)}") - raise HTTPException(status_code=500, detail=str(e)) - - @app.delete( - "/documents", - response_model=InsertResponse, - dependencies=[Depends(optional_api_key)], - ) - async def clear_documents(): - """ - Clear all documents from the LightRAG system. - - This endpoint deletes all text chunks, entities vector database, and relationships vector database, - effectively clearing all documents from the LightRAG system. - - Returns: - InsertResponse: A response object containing the status, message, and the new document count (0 in this case). - """ - try: - rag.text_chunks = [] - rag.entities_vdb = None - rag.relationships_vdb = None - return InsertResponse( - status="success", - message="All documents cleared successfully", - document_count=0, - ) - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - # query all graph labels @app.get("/graph/label/list") async def get_graph_labels():