Merge pull request #587 from ParisNeo/main

Enhancement: Multi-format document support and dynamic dependency management #565
This commit is contained in:
zrguo
2025-01-15 11:12:29 +08:00
committed by GitHub
2 changed files with 263 additions and 40 deletions

View File

@@ -9,7 +9,7 @@ from lightrag.llm import openai_complete_if_cache, openai_embedding
from lightrag.llm import azure_openai_complete_if_cache, azure_openai_embedding from lightrag.llm import azure_openai_complete_if_cache, azure_openai_embedding
from lightrag.utils import EmbeddingFunc from lightrag.utils import EmbeddingFunc
from typing import Optional, List from typing import Optional, List, Union
from enum import Enum from enum import Enum
from pathlib import Path from pathlib import Path
import shutil import shutil
@@ -22,6 +22,7 @@ from fastapi.security import APIKeyHeader
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from starlette.status import HTTP_403_FORBIDDEN from starlette.status import HTTP_403_FORBIDDEN
import pipmaster as pm
def get_default_host(binding_type: str) -> str: def get_default_host(binding_type: str) -> str:
@@ -174,7 +175,11 @@ def parse_args():
class DocumentManager: class DocumentManager:
"""Handles document operations and tracking""" """Handles document operations and tracking"""
def __init__(self, input_dir: str, supported_extensions: tuple = (".txt", ".md")): def __init__(
self,
input_dir: str,
supported_extensions: tuple = (".txt", ".md", ".pdf", ".docx", ".pptx"),
):
self.input_dir = Path(input_dir) self.input_dir = Path(input_dir)
self.supported_extensions = supported_extensions self.supported_extensions = supported_extensions
self.indexed_files = set() self.indexed_files = set()
@@ -289,7 +294,7 @@ def create_app(args):
+ "(With authentication)" + "(With authentication)"
if api_key if api_key
else "", else "",
version="1.0.1", version="1.0.2",
openapi_tags=[{"name": "api"}], openapi_tags=[{"name": "api"}],
) )
@@ -356,6 +361,80 @@ def create_app(args):
), ),
) )
async def index_file(file_path: Union[str, Path]) -> None:
"""Index all files inside the folder with support for multiple file formats
Args:
file_path: Path to the file to be indexed (str or Path object)
Raises:
ValueError: If file format is not supported
FileNotFoundError: If file doesn't exist
"""
if not pm.is_installed("aiofiles"):
pm.install("aiofiles")
# Convert to Path object if string
file_path = Path(file_path)
# Check if file exists
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
content = ""
# Get file extension in lowercase
ext = file_path.suffix.lower()
match ext:
case ".txt" | ".md":
# Text files handling
async with aiofiles.open(file_path, "r", encoding="utf-8") as f:
content = await f.read()
case ".pdf":
if not pm.is_installed("pypdf2"):
pm.install("pypdf2")
from pypdf2 import PdfReader
# PDF handling
reader = PdfReader(str(file_path))
content = ""
for page in reader.pages:
content += page.extract_text() + "\n"
case ".docx":
if not pm.is_installed("docx"):
pm.install("docx")
from docx import Document
# Word document handling
doc = Document(file_path)
content = "\n".join([paragraph.text for paragraph in doc.paragraphs])
case ".pptx":
if not pm.is_installed("pptx"):
pm.install("pptx")
from pptx import Presentation
# PowerPoint handling
prs = Presentation(file_path)
content = ""
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + "\n"
case _:
raise ValueError(f"Unsupported file format: {ext}")
# Insert content into RAG system
if content:
await rag.ainsert(content)
doc_manager.mark_as_indexed(file_path)
logging.info(f"Successfully indexed file: {file_path}")
else:
logging.warning(f"No content extracted from file: {file_path}")
@app.on_event("startup") @app.on_event("startup")
async def startup_event(): async def startup_event():
"""Index all files in input directory during startup""" """Index all files in input directory during startup"""
@@ -363,13 +442,7 @@ def create_app(args):
new_files = doc_manager.scan_directory() new_files = doc_manager.scan_directory()
for file_path in new_files: for file_path in new_files:
try: try:
# Use async file reading await index_file(file_path)
async with aiofiles.open(file_path, "r", encoding="utf-8") as f:
content = await f.read()
# Use the async version of insert directly
await rag.ainsert(content)
doc_manager.mark_as_indexed(file_path)
logging.info(f"Indexed file: {file_path}")
except Exception as e: except Exception as e:
trace_exception(e) trace_exception(e)
logging.error(f"Error indexing file {file_path}: {str(e)}") logging.error(f"Error indexing file {file_path}: {str(e)}")
@@ -388,10 +461,7 @@ def create_app(args):
for file_path in new_files: for file_path in new_files:
try: try:
with open(file_path, "r", encoding="utf-8") as f: await index_file(file_path)
content = f.read()
await rag.ainsert(content)
doc_manager.mark_as_indexed(file_path)
indexed_count += 1 indexed_count += 1
except Exception as e: except Exception as e:
logging.error(f"Error indexing file {file_path}: {str(e)}") logging.error(f"Error indexing file {file_path}: {str(e)}")
@@ -419,10 +489,7 @@ def create_app(args):
shutil.copyfileobj(file.file, buffer) shutil.copyfileobj(file.file, buffer)
# Immediately index the uploaded file # Immediately index the uploaded file
with open(file_path, "r", encoding="utf-8") as f: await index_file(file_path)
content = f.read()
await rag.ainsert(content)
doc_manager.mark_as_indexed(file_path)
return { return {
"status": "success", "status": "success",
@@ -498,26 +565,103 @@ def create_app(args):
dependencies=[Depends(optional_api_key)], dependencies=[Depends(optional_api_key)],
) )
async def insert_file(file: UploadFile = File(...), description: str = Form(None)): async def insert_file(file: UploadFile = File(...), description: str = Form(None)):
try: """Insert a file directly into the RAG system
content = await file.read()
if file.filename.endswith((".txt", ".md")): Args:
text = content.decode("utf-8") file: Uploaded file
await rag.ainsert(text) description: Optional description of the file
else:
Returns:
InsertResponse: Status of the insertion operation
Raises:
HTTPException: For unsupported file types or processing errors
"""
try:
content = ""
# Get file extension in lowercase
ext = Path(file.filename).suffix.lower()
match ext:
case ".txt" | ".md":
# Text files handling
text_content = await file.read()
content = text_content.decode("utf-8")
case ".pdf":
if not pm.is_installed("pypdf2"):
pm.install("pypdf2")
from pypdf2 import PdfReader
from io import BytesIO
# Read PDF from memory
pdf_content = await file.read()
pdf_file = BytesIO(pdf_content)
reader = PdfReader(pdf_file)
content = ""
for page in reader.pages:
content += page.extract_text() + "\n"
case ".docx":
if not pm.is_installed("docx"):
pm.install("docx")
from docx import Document
from io import BytesIO
# Read DOCX from memory
docx_content = await file.read()
docx_file = BytesIO(docx_content)
doc = Document(docx_file)
content = "\n".join(
[paragraph.text for paragraph in doc.paragraphs]
)
case ".pptx":
if not pm.is_installed("pptx"):
pm.install("pptx")
from pptx import Presentation
from io import BytesIO
# Read PPTX from memory
pptx_content = await file.read()
pptx_file = BytesIO(pptx_content)
prs = Presentation(pptx_file)
content = ""
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + "\n"
case _:
raise HTTPException( raise HTTPException(
status_code=400, status_code=400,
detail="Unsupported file type. Only .txt and .md files are supported", detail=f"Unsupported file type. Supported types: {doc_manager.supported_extensions}",
) )
# Insert content into RAG system
if content:
# Add description if provided
if description:
content = f"{description}\n\n{content}"
await rag.ainsert(content)
logging.info(f"Successfully indexed file: {file.filename}")
return InsertResponse( return InsertResponse(
status="success", status="success",
message=f"File '{file.filename}' successfully inserted", message=f"File '{file.filename}' successfully inserted",
document_count=1, document_count=1,
) )
else:
raise HTTPException(
status_code=400,
detail="No content could be extracted from the file",
)
except UnicodeDecodeError: except UnicodeDecodeError:
raise HTTPException(status_code=400, detail="File encoding not supported") raise HTTPException(status_code=400, detail="File encoding not supported")
except Exception as e: except Exception as e:
logging.error(f"Error processing file {file.filename}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.post( @app.post(
@@ -526,32 +670,110 @@ def create_app(args):
dependencies=[Depends(optional_api_key)], dependencies=[Depends(optional_api_key)],
) )
async def insert_batch(files: List[UploadFile] = File(...)): async def insert_batch(files: List[UploadFile] = File(...)):
"""Process multiple files in batch mode
Args:
files: List of files to process
Returns:
InsertResponse: Status of the batch insertion operation
Raises:
HTTPException: For processing errors
"""
try: try:
inserted_count = 0 inserted_count = 0
failed_files = [] failed_files = []
for file in files: for file in files:
try: try:
content = await file.read() content = ""
if file.filename.endswith((".txt", ".md")): ext = Path(file.filename).suffix.lower()
text = content.decode("utf-8")
await rag.ainsert(text) match ext:
inserted_count += 1 case ".txt" | ".md":
else: text_content = await file.read()
content = text_content.decode("utf-8")
case ".pdf":
if not pm.is_installed("pypdf2"):
pm.install("pypdf2")
from pypdf2 import PdfReader
from io import BytesIO
pdf_content = await file.read()
pdf_file = BytesIO(pdf_content)
reader = PdfReader(pdf_file)
for page in reader.pages:
content += page.extract_text() + "\n"
case ".docx":
if not pm.is_installed("docx"):
pm.install("docx")
from docx import Document
from io import BytesIO
docx_content = await file.read()
docx_file = BytesIO(docx_content)
doc = Document(docx_file)
content = "\n".join(
[paragraph.text for paragraph in doc.paragraphs]
)
case ".pptx":
if not pm.is_installed("pptx"):
pm.install("pptx")
from pptx import Presentation
from io import BytesIO
pptx_content = await file.read()
pptx_file = BytesIO(pptx_content)
prs = Presentation(pptx_file)
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + "\n"
case _:
failed_files.append(f"{file.filename} (unsupported type)") failed_files.append(f"{file.filename} (unsupported type)")
continue
if content:
await rag.ainsert(content)
inserted_count += 1
logging.info(f"Successfully indexed file: {file.filename}")
else:
failed_files.append(f"{file.filename} (no content extracted)")
except UnicodeDecodeError:
failed_files.append(f"{file.filename} (encoding error)")
except Exception as e: except Exception as e:
failed_files.append(f"{file.filename} ({str(e)})") failed_files.append(f"{file.filename} ({str(e)})")
logging.error(f"Error processing file {file.filename}: {str(e)}")
status_message = f"Successfully inserted {inserted_count} documents" # Prepare status message
if inserted_count == len(files):
status = "success"
status_message = f"Successfully inserted all {inserted_count} documents"
elif inserted_count > 0:
status = "partial_success"
status_message = f"Successfully inserted {inserted_count} out of {len(files)} documents"
if failed_files:
status_message += f". Failed files: {', '.join(failed_files)}"
else:
status = "failure"
status_message = "No documents were successfully inserted"
if failed_files: if failed_files:
status_message += f". Failed files: {', '.join(failed_files)}" status_message += f". Failed files: {', '.join(failed_files)}"
return InsertResponse( return InsertResponse(
status="success" if inserted_count > 0 else "partial_success", status=status,
message=status_message, message=status_message,
document_count=len(files), document_count=inserted_count,
) )
except Exception as e: except Exception as e:
logging.error(f"Batch processing error: {str(e)}")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.delete( @app.delete(

View File

@@ -7,6 +7,7 @@ nest_asyncio
numpy numpy
ollama ollama
openai openai
pipmaster
python-dotenv python-dotenv
python-multipart python-multipart
tenacity tenacity