Added more file types support

This commit is contained in:
Saifeddine ALOUI
2025-01-14 23:08:39 +01:00
parent 61349b0fe7
commit c3aba5423f
2 changed files with 260 additions and 41 deletions

View File

@@ -9,7 +9,7 @@ from lightrag.llm import openai_complete_if_cache, openai_embedding
from lightrag.llm import azure_openai_complete_if_cache, azure_openai_embedding from lightrag.llm import azure_openai_complete_if_cache, azure_openai_embedding
from lightrag.utils import EmbeddingFunc from lightrag.utils import EmbeddingFunc
from typing import Optional, List from typing import Optional, List, Union
from enum import Enum from enum import Enum
from pathlib import Path from pathlib import Path
import shutil import shutil
@@ -22,6 +22,7 @@ from fastapi.security import APIKeyHeader
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from starlette.status import HTTP_403_FORBIDDEN from starlette.status import HTTP_403_FORBIDDEN
import pipmaster as pm
def get_default_host(binding_type: str) -> str: def get_default_host(binding_type: str) -> str:
@@ -174,7 +175,7 @@ def parse_args():
class DocumentManager: class DocumentManager:
"""Handles document operations and tracking""" """Handles document operations and tracking"""
def __init__(self, input_dir: str, supported_extensions: tuple = (".txt", ".md")): def __init__(self, input_dir: str, supported_extensions: tuple = (".txt", ".md", ".pdf", ".docx", ".pptx")):
self.input_dir = Path(input_dir) self.input_dir = Path(input_dir)
self.supported_extensions = supported_extensions self.supported_extensions = supported_extensions
self.indexed_files = set() self.indexed_files = set()
@@ -289,7 +290,7 @@ def create_app(args):
+ "(With authentication)" + "(With authentication)"
if api_key if api_key
else "", else "",
version="1.0.1", version="1.0.2",
openapi_tags=[{"name": "api"}], openapi_tags=[{"name": "api"}],
) )
@@ -356,6 +357,85 @@ def create_app(args):
), ),
) )
async def index_file(file_path: Union[str, Path]) -> None:
""" Index all files inside the folder with support for multiple file formats
Args:
file_path: Path to the file to be indexed (str or Path object)
Raises:
ValueError: If file format is not supported
FileNotFoundError: If file doesn't exist
"""
if not pm.is_installed("aiofiles"):
pm.install("aiofiles")
import aiofiles
# Convert to Path object if string
file_path = Path(file_path)
# Check if file exists
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
content = ""
# Get file extension in lowercase
ext = file_path.suffix.lower()
match ext:
case ".txt" | ".md":
# Text files handling
async with aiofiles.open(file_path, "r", encoding="utf-8") as f:
content = await f.read()
case ".pdf":
if not pm.is_installed("pypdf2"):
pm.install("pypdf2")
from pypdf2 import PdfReader
# PDF handling
reader = PdfReader(str(file_path))
content = ""
for page in reader.pages:
content += page.extract_text() + "\n"
case ".docx":
if not pm.is_installed("docx"):
pm.install("docx")
from docx import Document
# Word document handling
doc = Document(file_path)
content = "\n".join([paragraph.text for paragraph in doc.paragraphs])
case ".pptx":
if not pm.is_installed("pptx"):
pm.install("pptx")
from pptx import Presentation
# PowerPoint handling
prs = Presentation(file_path)
content = ""
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + "\n"
case _:
raise ValueError(f"Unsupported file format: {ext}")
# Insert content into RAG system
if content:
await rag.ainsert(content)
doc_manager.mark_as_indexed(file_path)
logging.info(f"Successfully indexed file: {file_path}")
else:
logging.warning(f"No content extracted from file: {file_path}")
@app.on_event("startup") @app.on_event("startup")
async def startup_event(): async def startup_event():
"""Index all files in input directory during startup""" """Index all files in input directory during startup"""
@@ -363,13 +443,7 @@ def create_app(args):
new_files = doc_manager.scan_directory() new_files = doc_manager.scan_directory()
for file_path in new_files: for file_path in new_files:
try: try:
# Use async file reading await index_file(file_path)
async with aiofiles.open(file_path, "r", encoding="utf-8") as f:
content = await f.read()
# Use the async version of insert directly
await rag.ainsert(content)
doc_manager.mark_as_indexed(file_path)
logging.info(f"Indexed file: {file_path}")
except Exception as e: except Exception as e:
trace_exception(e) trace_exception(e)
logging.error(f"Error indexing file {file_path}: {str(e)}") logging.error(f"Error indexing file {file_path}: {str(e)}")
@@ -388,11 +462,8 @@ def create_app(args):
for file_path in new_files: for file_path in new_files:
try: try:
with open(file_path, "r", encoding="utf-8") as f: await index_file(file_path)
content = f.read() indexed_count += 1
await rag.ainsert(content)
doc_manager.mark_as_indexed(file_path)
indexed_count += 1
except Exception as e: except Exception as e:
logging.error(f"Error indexing file {file_path}: {str(e)}") logging.error(f"Error indexing file {file_path}: {str(e)}")
@@ -419,10 +490,7 @@ def create_app(args):
shutil.copyfileobj(file.file, buffer) shutil.copyfileobj(file.file, buffer)
# Immediately index the uploaded file # Immediately index the uploaded file
with open(file_path, "r", encoding="utf-8") as f: await index_file(file_path)
content = f.read()
await rag.ainsert(content)
doc_manager.mark_as_indexed(file_path)
return { return {
"status": "success", "status": "success",
@@ -491,69 +559,219 @@ def create_app(args):
) )
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.post( @app.post(
"/documents/file", "/documents/file",
response_model=InsertResponse, response_model=InsertResponse,
dependencies=[Depends(optional_api_key)], dependencies=[Depends(optional_api_key)],
) )
async def insert_file(file: UploadFile = File(...), description: str = Form(None)): async def insert_file(file: UploadFile = File(...), description: str = Form(None)):
try: """Insert a file directly into the RAG system
content = await file.read()
if file.filename.endswith((".txt", ".md")): Args:
text = content.decode("utf-8") file: Uploaded file
await rag.ainsert(text) description: Optional description of the file
Returns:
InsertResponse: Status of the insertion operation
Raises:
HTTPException: For unsupported file types or processing errors
"""
try:
content = ""
# Get file extension in lowercase
ext = Path(file.filename).suffix.lower()
match ext:
case ".txt" | ".md":
# Text files handling
text_content = await file.read()
content = text_content.decode("utf-8")
case ".pdf":
if not pm.is_installed("pypdf2"):
pm.install("pypdf2")
from pypdf2 import PdfReader
from io import BytesIO
# Read PDF from memory
pdf_content = await file.read()
pdf_file = BytesIO(pdf_content)
reader = PdfReader(pdf_file)
content = ""
for page in reader.pages:
content += page.extract_text() + "\n"
case ".docx":
if not pm.is_installed("docx"):
pm.install("docx")
from docx import Document
from io import BytesIO
# Read DOCX from memory
docx_content = await file.read()
docx_file = BytesIO(docx_content)
doc = Document(docx_file)
content = "\n".join([paragraph.text for paragraph in doc.paragraphs])
case ".pptx":
if not pm.is_installed("pptx"):
pm.install("pptx")
from pptx import Presentation
from io import BytesIO
# Read PPTX from memory
pptx_content = await file.read()
pptx_file = BytesIO(pptx_content)
prs = Presentation(pptx_file)
content = ""
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + "\n"
case _:
raise HTTPException(
status_code=400,
detail=f"Unsupported file type. Supported types: {doc_manager.supported_extensions}",
)
# Insert content into RAG system
if content:
# Add description if provided
if description:
content = f"{description}\n\n{content}"
await rag.ainsert(content)
logging.info(f"Successfully indexed file: {file.filename}")
return InsertResponse(
status="success",
message=f"File '{file.filename}' successfully inserted",
document_count=1,
)
else: else:
raise HTTPException( raise HTTPException(
status_code=400, status_code=400,
detail="Unsupported file type. Only .txt and .md files are supported", detail="No content could be extracted from the file",
) )
return InsertResponse(
status="success",
message=f"File '{file.filename}' successfully inserted",
document_count=1,
)
except UnicodeDecodeError: except UnicodeDecodeError:
raise HTTPException(status_code=400, detail="File encoding not supported") raise HTTPException(status_code=400, detail="File encoding not supported")
except Exception as e: except Exception as e:
logging.error(f"Error processing file {file.filename}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.post( @app.post(
"/documents/batch", "/documents/batch",
response_model=InsertResponse, response_model=InsertResponse,
dependencies=[Depends(optional_api_key)], dependencies=[Depends(optional_api_key)],
) )
async def insert_batch(files: List[UploadFile] = File(...)): async def insert_batch(files: List[UploadFile] = File(...)):
"""Process multiple files in batch mode
Args:
files: List of files to process
Returns:
InsertResponse: Status of the batch insertion operation
Raises:
HTTPException: For processing errors
"""
try: try:
inserted_count = 0 inserted_count = 0
failed_files = [] failed_files = []
for file in files: for file in files:
try: try:
content = await file.read() content = ""
if file.filename.endswith((".txt", ".md")): ext = Path(file.filename).suffix.lower()
text = content.decode("utf-8")
await rag.ainsert(text) match ext:
case ".txt" | ".md":
text_content = await file.read()
content = text_content.decode("utf-8")
case ".pdf":
if not pm.is_installed("pypdf2"):
pm.install("pypdf2")
from pypdf2 import PdfReader
from io import BytesIO
pdf_content = await file.read()
pdf_file = BytesIO(pdf_content)
reader = PdfReader(pdf_file)
for page in reader.pages:
content += page.extract_text() + "\n"
case ".docx":
if not pm.is_installed("docx"):
pm.install("docx")
from docx import Document
from io import BytesIO
docx_content = await file.read()
docx_file = BytesIO(docx_content)
doc = Document(docx_file)
content = "\n".join([paragraph.text for paragraph in doc.paragraphs])
case ".pptx":
if not pm.is_installed("pptx"):
pm.install("pptx")
from pptx import Presentation
from io import BytesIO
pptx_content = await file.read()
pptx_file = BytesIO(pptx_content)
prs = Presentation(pptx_file)
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + "\n"
case _:
failed_files.append(f"{file.filename} (unsupported type)")
continue
if content:
await rag.ainsert(content)
inserted_count += 1 inserted_count += 1
logging.info(f"Successfully indexed file: {file.filename}")
else: else:
failed_files.append(f"{file.filename} (unsupported type)") failed_files.append(f"{file.filename} (no content extracted)")
except UnicodeDecodeError:
failed_files.append(f"{file.filename} (encoding error)")
except Exception as e: except Exception as e:
failed_files.append(f"{file.filename} ({str(e)})") failed_files.append(f"{file.filename} ({str(e)})")
logging.error(f"Error processing file {file.filename}: {str(e)}")
status_message = f"Successfully inserted {inserted_count} documents" # Prepare status message
if failed_files: if inserted_count == len(files):
status_message += f". Failed files: {', '.join(failed_files)}" status = "success"
status_message = f"Successfully inserted all {inserted_count} documents"
elif inserted_count > 0:
status = "partial_success"
status_message = f"Successfully inserted {inserted_count} out of {len(files)} documents"
if failed_files:
status_message += f". Failed files: {', '.join(failed_files)}"
else:
status = "failure"
status_message = "No documents were successfully inserted"
if failed_files:
status_message += f". Failed files: {', '.join(failed_files)}"
return InsertResponse( return InsertResponse(
status="success" if inserted_count > 0 else "partial_success", status=status,
message=status_message, message=status_message,
document_count=len(files), document_count=inserted_count,
) )
except Exception as e: except Exception as e:
logging.error(f"Batch processing error: {str(e)}")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.delete( @app.delete(
"/documents", "/documents",
response_model=InsertResponse, response_model=InsertResponse,

View File

@@ -15,3 +15,4 @@ torch
tqdm tqdm
transformers transformers
uvicorn uvicorn
pipmaster