diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 5bcb149c..d9f7bf06 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -9,7 +9,7 @@ from lightrag.llm import openai_complete_if_cache, openai_embedding from lightrag.llm import azure_openai_complete_if_cache, azure_openai_embedding from lightrag.utils import EmbeddingFunc -from typing import Optional, List +from typing import Optional, List, Union from enum import Enum from pathlib import Path import shutil @@ -22,6 +22,7 @@ from fastapi.security import APIKeyHeader from fastapi.middleware.cors import CORSMiddleware from starlette.status import HTTP_403_FORBIDDEN +import pipmaster as pm def get_default_host(binding_type: str) -> str: @@ -174,7 +175,7 @@ def parse_args(): class DocumentManager: """Handles document operations and tracking""" - def __init__(self, input_dir: str, supported_extensions: tuple = (".txt", ".md")): + def __init__(self, input_dir: str, supported_extensions: tuple = (".txt", ".md", ".pdf", ".docx", ".pptx")): self.input_dir = Path(input_dir) self.supported_extensions = supported_extensions self.indexed_files = set() @@ -289,7 +290,7 @@ def create_app(args): + "(With authentication)" if api_key else "", - version="1.0.1", + version="1.0.2", openapi_tags=[{"name": "api"}], ) @@ -356,6 +357,85 @@ def create_app(args): ), ) + + + async def index_file(file_path: Union[str, Path]) -> None: + """ Index all files inside the folder with support for multiple file formats + + Args: + file_path: Path to the file to be indexed (str or Path object) + + Raises: + ValueError: If file format is not supported + FileNotFoundError: If file doesn't exist + """ + if not pm.is_installed("aiofiles"): + pm.install("aiofiles") + import aiofiles + + + # Convert to Path object if string + file_path = Path(file_path) + + # Check if file exists + if not file_path.exists(): + raise FileNotFoundError(f"File not found: {file_path}") + + content = "" + # Get file extension in lowercase + ext = file_path.suffix.lower() + + match ext: + case ".txt" | ".md": + # Text files handling + async with aiofiles.open(file_path, "r", encoding="utf-8") as f: + content = await f.read() + + case ".pdf": + if not pm.is_installed("pypdf2"): + pm.install("pypdf2") + from pypdf2 import PdfReader + # PDF handling + reader = PdfReader(str(file_path)) + content = "" + for page in reader.pages: + content += page.extract_text() + "\n" + + case ".docx": + if not pm.is_installed("docx"): + pm.install("docx") + from docx import Document + + # Word document handling + doc = Document(file_path) + content = "\n".join([paragraph.text for paragraph in doc.paragraphs]) + + case ".pptx": + if not pm.is_installed("pptx"): + pm.install("pptx") + from pptx import Presentation + # PowerPoint handling + prs = Presentation(file_path) + content = "" + for slide in prs.slides: + for shape in slide.shapes: + if hasattr(shape, "text"): + content += shape.text + "\n" + + case _: + raise ValueError(f"Unsupported file format: {ext}") + + # Insert content into RAG system + if content: + await rag.ainsert(content) + doc_manager.mark_as_indexed(file_path) + logging.info(f"Successfully indexed file: {file_path}") + else: + logging.warning(f"No content extracted from file: {file_path}") + + + + @app.on_event("startup") async def startup_event(): """Index all files in input directory during startup""" @@ -363,13 +443,7 @@ def create_app(args): new_files = doc_manager.scan_directory() for file_path in new_files: try: - # Use async file reading - async with aiofiles.open(file_path, "r", encoding="utf-8") as f: - content = await f.read() - # Use the async version of insert directly - await rag.ainsert(content) - doc_manager.mark_as_indexed(file_path) - logging.info(f"Indexed file: {file_path}") + await index_file(file_path) except Exception as e: trace_exception(e) logging.error(f"Error indexing file {file_path}: {str(e)}") @@ -388,11 +462,8 @@ def create_app(args): for file_path in new_files: try: - with open(file_path, "r", encoding="utf-8") as f: - content = f.read() - await rag.ainsert(content) - doc_manager.mark_as_indexed(file_path) - indexed_count += 1 + await index_file(file_path) + indexed_count += 1 except Exception as e: logging.error(f"Error indexing file {file_path}: {str(e)}") @@ -419,10 +490,7 @@ def create_app(args): shutil.copyfileobj(file.file, buffer) # Immediately index the uploaded file - with open(file_path, "r", encoding="utf-8") as f: - content = f.read() - await rag.ainsert(content) - doc_manager.mark_as_indexed(file_path) + await index_file(file_path) return { "status": "success", @@ -491,69 +559,219 @@ def create_app(args): ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) - @app.post( "/documents/file", response_model=InsertResponse, dependencies=[Depends(optional_api_key)], ) async def insert_file(file: UploadFile = File(...), description: str = Form(None)): + """Insert a file directly into the RAG system + + Args: + file: Uploaded file + description: Optional description of the file + + Returns: + InsertResponse: Status of the insertion operation + + Raises: + HTTPException: For unsupported file types or processing errors + """ try: - content = await file.read() + content = "" + # Get file extension in lowercase + ext = Path(file.filename).suffix.lower() + + match ext: + case ".txt" | ".md": + # Text files handling + text_content = await file.read() + content = text_content.decode("utf-8") + + case ".pdf": + if not pm.is_installed("pypdf2"): + pm.install("pypdf2") + from pypdf2 import PdfReader + from io import BytesIO + + # Read PDF from memory + pdf_content = await file.read() + pdf_file = BytesIO(pdf_content) + reader = PdfReader(pdf_file) + content = "" + for page in reader.pages: + content += page.extract_text() + "\n" + + case ".docx": + if not pm.is_installed("docx"): + pm.install("docx") + from docx import Document + from io import BytesIO + + # Read DOCX from memory + docx_content = await file.read() + docx_file = BytesIO(docx_content) + doc = Document(docx_file) + content = "\n".join([paragraph.text for paragraph in doc.paragraphs]) + + case ".pptx": + if not pm.is_installed("pptx"): + pm.install("pptx") + from pptx import Presentation + from io import BytesIO + + # Read PPTX from memory + pptx_content = await file.read() + pptx_file = BytesIO(pptx_content) + prs = Presentation(pptx_file) + content = "" + for slide in prs.slides: + for shape in slide.shapes: + if hasattr(shape, "text"): + content += shape.text + "\n" + + case _: + raise HTTPException( + status_code=400, + detail=f"Unsupported file type. Supported types: {doc_manager.supported_extensions}", + ) - if file.filename.endswith((".txt", ".md")): - text = content.decode("utf-8") - await rag.ainsert(text) + # Insert content into RAG system + if content: + # Add description if provided + if description: + content = f"{description}\n\n{content}" + + await rag.ainsert(content) + logging.info(f"Successfully indexed file: {file.filename}") + + return InsertResponse( + status="success", + message=f"File '{file.filename}' successfully inserted", + document_count=1, + ) else: raise HTTPException( status_code=400, - detail="Unsupported file type. Only .txt and .md files are supported", + detail="No content could be extracted from the file", ) - return InsertResponse( - status="success", - message=f"File '{file.filename}' successfully inserted", - document_count=1, - ) except UnicodeDecodeError: raise HTTPException(status_code=400, detail="File encoding not supported") except Exception as e: + logging.error(f"Error processing file {file.filename}: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) - @app.post( "/documents/batch", response_model=InsertResponse, dependencies=[Depends(optional_api_key)], ) async def insert_batch(files: List[UploadFile] = File(...)): + """Process multiple files in batch mode + + Args: + files: List of files to process + + Returns: + InsertResponse: Status of the batch insertion operation + + Raises: + HTTPException: For processing errors + """ try: inserted_count = 0 failed_files = [] for file in files: try: - content = await file.read() - if file.filename.endswith((".txt", ".md")): - text = content.decode("utf-8") - await rag.ainsert(text) + content = "" + ext = Path(file.filename).suffix.lower() + + match ext: + case ".txt" | ".md": + text_content = await file.read() + content = text_content.decode("utf-8") + + case ".pdf": + if not pm.is_installed("pypdf2"): + pm.install("pypdf2") + from pypdf2 import PdfReader + from io import BytesIO + + pdf_content = await file.read() + pdf_file = BytesIO(pdf_content) + reader = PdfReader(pdf_file) + for page in reader.pages: + content += page.extract_text() + "\n" + + case ".docx": + if not pm.is_installed("docx"): + pm.install("docx") + from docx import Document + from io import BytesIO + + docx_content = await file.read() + docx_file = BytesIO(docx_content) + doc = Document(docx_file) + content = "\n".join([paragraph.text for paragraph in doc.paragraphs]) + + case ".pptx": + if not pm.is_installed("pptx"): + pm.install("pptx") + from pptx import Presentation + from io import BytesIO + + pptx_content = await file.read() + pptx_file = BytesIO(pptx_content) + prs = Presentation(pptx_file) + for slide in prs.slides: + for shape in slide.shapes: + if hasattr(shape, "text"): + content += shape.text + "\n" + + case _: + failed_files.append(f"{file.filename} (unsupported type)") + continue + + if content: + await rag.ainsert(content) inserted_count += 1 + logging.info(f"Successfully indexed file: {file.filename}") else: - failed_files.append(f"{file.filename} (unsupported type)") + failed_files.append(f"{file.filename} (no content extracted)") + + except UnicodeDecodeError: + failed_files.append(f"{file.filename} (encoding error)") except Exception as e: failed_files.append(f"{file.filename} ({str(e)})") + logging.error(f"Error processing file {file.filename}: {str(e)}") - status_message = f"Successfully inserted {inserted_count} documents" - if failed_files: - status_message += f". Failed files: {', '.join(failed_files)}" + # Prepare status message + if inserted_count == len(files): + status = "success" + status_message = f"Successfully inserted all {inserted_count} documents" + elif inserted_count > 0: + status = "partial_success" + status_message = f"Successfully inserted {inserted_count} out of {len(files)} documents" + if failed_files: + status_message += f". Failed files: {', '.join(failed_files)}" + else: + status = "failure" + status_message = "No documents were successfully inserted" + if failed_files: + status_message += f". Failed files: {', '.join(failed_files)}" return InsertResponse( - status="success" if inserted_count > 0 else "partial_success", + status=status, message=status_message, - document_count=len(files), + document_count=inserted_count, ) + except Exception as e: + logging.error(f"Batch processing error: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) + @app.delete( "/documents", response_model=InsertResponse, diff --git a/lightrag/api/requirements.txt b/lightrag/api/requirements.txt index 221d7f40..b8fc41b2 100644 --- a/lightrag/api/requirements.txt +++ b/lightrag/api/requirements.txt @@ -15,3 +15,4 @@ torch tqdm transformers uvicorn +pipmaster \ No newline at end of file