Merge remote-tracking branch 'origin/main' into make-clear-what-implemented-or-not

# Conflicts:
#	lightrag/base.py
#	lightrag/kg/json_doc_status_impl.py
#	lightrag/kg/mongo_impl.py
#	lightrag/kg/postgres_impl.py
This commit is contained in:
Yannick Stephan
2025-02-16 15:29:16 +01:00
9 changed files with 536 additions and 447 deletions

View File

@@ -1,19 +1,20 @@
### Server Configuration
#HOST=0.0.0.0
#PORT=9621
#NAMESPACE_PREFIX=lightrag # separating data from difference Lightrag instances
# HOST=0.0.0.0
# PORT=9621
# NAMESPACE_PREFIX=lightrag # separating data from difference Lightrag instances
# CORS_ORIGINS=http://localhost:3000,http://localhost:8080
### Optional SSL Configuration
#SSL=true
#SSL_CERTFILE=/path/to/cert.pem
#SSL_KEYFILE=/path/to/key.pem
# SSL=true
# SSL_CERTFILE=/path/to/cert.pem
# SSL_KEYFILE=/path/to/key.pem
### Security (empty for no api-key is needed)
# LIGHTRAG_API_KEY=your-secure-api-key-here
### Directory Configuration
# WORKING_DIR=./rag_storage
# INPUT_DIR=./inputs
# WORKING_DIR=<absolute_path_for_working_dir>
# INPUT_DIR=<absolute_path_for_doc_input_dir>
### Logging level
LOG_LEVEL=INFO

View File

@@ -74,30 +74,38 @@ LLM_MODEL=model_name_of_azure_ai
LLM_BINDING_API_KEY=api_key_of_azure_ai
```
### About Ollama API
### 3. Install Lightrag as a Linux Service
We provide an Ollama-compatible interfaces for LightRAG, aiming to emulate LightRAG as an Ollama chat model. This allows AI chat frontends supporting Ollama, such as Open WebUI, to access LightRAG easily.
Create a your service file `lightrag.sevice` from the sample file : `lightrag.sevice.example`. Modified the WorkingDirectoryand EexecStart in the service file:
#### Choose Query mode in chat
A query prefix in the query string can determines which LightRAG query mode is used to generate the respond for the query. The supported prefixes include:
```
/local
/global
/hybrid
/naive
/mix
/bypass
```text
Description=LightRAG Ollama Service
WorkingDirectory=<lightrag installed directory>
ExecStart=<lightrag installed directory>/lightrag/api/lightrag-api
```
For example, chat message "/mix 唐僧有几个徒弟" will trigger a mix mode query for LighRAG. A chat message without query prefix will trigger a hybrid mode query by default。
Modify your service startup script: `lightrag-api`. Change you python virtual environment activation command as needed:
"/bypass" is not a LightRAG query mode, it will tell API Server to pass the query directly to the underlying LLM with chat history. So user can use LLM to answer question base on the LightRAG query results. (If you are using Open WebUI as front end, you can just switch the model to a normal LLM instead of using /bypass prefix)
```shell
#!/bin/bash
# your python virtual environment activation
source /home/netman/lightrag-xyj/venv/bin/activate
# start lightrag api server
lightrag-server
```
Install LightRAG service. If your system is Ubuntu, the following commands will work:
```shell
sudo cp lightrag.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl start lightrag.service
sudo systemctl status lightrag.service
sudo systemctl enable lightrag.service
```
#### Connect Open WebUI to LightRAG
After starting the lightrag-server, you can add an Ollama-type connection in the Open WebUI admin pannel. And then a model named lightrag:latest will appear in Open WebUI's model management interface. Users can then send queries to LightRAG through the chat interface.
## Configuration
@@ -379,7 +387,7 @@ curl -X DELETE "http://localhost:9621/documents"
#### GET /api/version
Get Ollama version information
Get Ollama version information.
```bash
curl http://localhost:9621/api/version
@@ -387,7 +395,7 @@ curl http://localhost:9621/api/version
#### GET /api/tags
Get Ollama available models
Get Ollama available models.
```bash
curl http://localhost:9621/api/tags
@@ -395,7 +403,7 @@ curl http://localhost:9621/api/tags
#### POST /api/chat
Handle chat completion requests
Handle chat completion requests. Routes user queries through LightRAG by selecting query mode based on query prefix. Detects and forwards OpenWebUI session-related requests (for meta data generation task) directly to underlying LLM.
```shell
curl -N -X POST http://localhost:9621/api/chat -H "Content-Type: application/json" -d \
@@ -404,6 +412,10 @@ curl -N -X POST http://localhost:9621/api/chat -H "Content-Type: application/jso
> For more information about Ollama API pls. visit : [Ollama API documentation](https://github.com/ollama/ollama/blob/main/docs/api.md)
#### POST /api/generate
Handle generate completion requests. For compatibility purpose, the request is not processed by LightRAG, and will be handled by underlying LLM model.
### Utility Endpoints
#### GET /health
@@ -413,7 +425,35 @@ Check server health and configuration.
curl "http://localhost:9621/health"
```
## Ollama Emulation
We provide an Ollama-compatible interfaces for LightRAG, aiming to emulate LightRAG as an Ollama chat model. This allows AI chat frontends supporting Ollama, such as Open WebUI, to access LightRAG easily.
### Connect Open WebUI to LightRAG
After starting the lightrag-server, you can add an Ollama-type connection in the Open WebUI admin pannel. And then a model named lightrag:latest will appear in Open WebUI's model management interface. Users can then send queries to LightRAG through the chat interface. You'd better install LightRAG as service for this use case.
Open WebUI's use LLM to do the session title and session keyword generation task. So the Ollama chat chat completion API detects and forwards OpenWebUI session-related requests directly to underlying LLM.
### Choose Query mode in chat
A query prefix in the query string can determines which LightRAG query mode is used to generate the respond for the query. The supported prefixes include:
```
/local
/global
/hybrid
/naive
/mix
/bypass
```
For example, chat message "/mix 唐僧有几个徒弟" will trigger a mix mode query for LighRAG. A chat message without query prefix will trigger a hybrid mode query by default。
"/bypass" is not a LightRAG query mode, it will tell API Server to pass the query directly to the underlying LLM with chat history. So user can use LLM to answer question base on the chat history. If you are using Open WebUI as front end, you can just switch the model to a normal LLM instead of using /bypass prefix.
## Development
Contribute to the project: [Guide](contributor-readme.MD)
### Running in Development Mode
@@ -471,34 +511,3 @@ This intelligent caching mechanism:
- Only new documents in the input directory will be processed
- This optimization significantly reduces startup time for subsequent runs
- The working directory (`--working-dir`) stores the vectorized documents database
## Install Lightrag as a Linux Service
Create a your service file `lightrag.sevice` from the sample file : `lightrag.sevice.example`. Modified the WorkingDirectoryand EexecStart in the service file:
```text
Description=LightRAG Ollama Service
WorkingDirectory=<lightrag installed directory>
ExecStart=<lightrag installed directory>/lightrag/api/lightrag-api
```
Modify your service startup script: `lightrag-api`. Change you python virtual environment activation command as needed:
```shell
#!/bin/bash
# your python virtual environment activation
source /home/netman/lightrag-xyj/venv/bin/activate
# start lightrag api server
lightrag-server
```
Install LightRAG service. If your system is Ubuntu, the following commands will work:
```shell
sudo cp lightrag.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl start lightrag.service
sudo systemctl status lightrag.service
sudo systemctl enable lightrag.service
```

View File

@@ -3,7 +3,6 @@ from fastapi import (
HTTPException,
File,
UploadFile,
Form,
BackgroundTasks,
)
import asyncio
@@ -14,7 +13,7 @@ import re
from fastapi.staticfiles import StaticFiles
import logging
import argparse
from typing import List, Any, Optional, Union, Dict
from typing import List, Any, Optional, Dict
from pydantic import BaseModel
from lightrag import LightRAG, QueryParam
from lightrag.types import GPTKeywordExtractionFormat
@@ -34,6 +33,9 @@ from starlette.status import HTTP_403_FORBIDDEN
import pipmaster as pm
from dotenv import load_dotenv
import configparser
import traceback
from datetime import datetime
from lightrag.utils import logger
from .ollama_api import (
OllamaAPI,
@@ -159,8 +161,12 @@ def display_splash_screen(args: argparse.Namespace) -> None:
ASCIIColors.yellow(f"{args.host}")
ASCIIColors.white(" ├─ Port: ", end="")
ASCIIColors.yellow(f"{args.port}")
ASCIIColors.white(" SSL Enabled: ", end="")
ASCIIColors.white(" CORS Origins: ", end="")
ASCIIColors.yellow(f"{os.getenv('CORS_ORIGINS', '*')}")
ASCIIColors.white(" ├─ SSL Enabled: ", end="")
ASCIIColors.yellow(f"{args.ssl}")
ASCIIColors.white(" └─ API Key: ", end="")
ASCIIColors.yellow("Set" if args.key else "Not Set")
if args.ssl:
ASCIIColors.white(" ├─ SSL Cert: ", end="")
ASCIIColors.yellow(f"{args.ssl_certfile}")
@@ -229,10 +235,8 @@ def display_splash_screen(args: argparse.Namespace) -> None:
ASCIIColors.yellow(f"{ollama_server_infos.LIGHTRAG_MODEL}")
ASCIIColors.white(" ├─ Log Level: ", end="")
ASCIIColors.yellow(f"{args.log_level}")
ASCIIColors.white(" ─ Timeout: ", end="")
ASCIIColors.white(" ─ Timeout: ", end="")
ASCIIColors.yellow(f"{args.timeout if args.timeout else 'None (infinite)'}")
ASCIIColors.white(" └─ API Key: ", end="")
ASCIIColors.yellow("Set" if args.key else "Not Set")
# Server Status
ASCIIColors.green("\n✨ Server starting up...\n")
@@ -564,6 +568,10 @@ def parse_args() -> argparse.Namespace:
args = parser.parse_args()
# conver relative path to absolute path
args.working_dir = os.path.abspath(args.working_dir)
args.input_dir = os.path.abspath(args.input_dir)
ollama_server_infos.LIGHTRAG_MODEL = args.simulated_model_name
return args
@@ -595,6 +603,7 @@ class DocumentManager:
"""Scan input directory for new files"""
new_files = []
for ext in self.supported_extensions:
logger.info(f"Scanning for {ext} files in {self.input_dir}")
for file_path in self.input_dir.rglob(f"*{ext}"):
if file_path not in self.indexed_files:
new_files.append(file_path)
@@ -628,9 +637,47 @@ class SearchMode(str, Enum):
class QueryRequest(BaseModel):
query: str
"""Specifies the retrieval mode"""
mode: SearchMode = SearchMode.hybrid
stream: bool = False
only_need_context: bool = False
"""If True, enables streaming output for real-time responses."""
stream: Optional[bool] = None
"""If True, only returns the retrieved context without generating a response."""
only_need_context: Optional[bool] = None
"""If True, only returns the generated prompt without producing a response."""
only_need_prompt: Optional[bool] = None
"""Defines the response format. Examples: 'Multiple Paragraphs', 'Single Paragraph', 'Bullet Points'."""
response_type: Optional[str] = None
"""Number of top items to retrieve. Represents entities in 'local' mode and relationships in 'global' mode."""
top_k: Optional[int] = None
"""Maximum number of tokens allowed for each retrieved text chunk."""
max_token_for_text_unit: Optional[int] = None
"""Maximum number of tokens allocated for relationship descriptions in global retrieval."""
max_token_for_global_context: Optional[int] = None
"""Maximum number of tokens allocated for entity descriptions in local retrieval."""
max_token_for_local_context: Optional[int] = None
"""List of high-level keywords to prioritize in retrieval."""
hl_keywords: Optional[List[str]] = None
"""List of low-level keywords to refine retrieval focus."""
ll_keywords: Optional[List[str]] = None
"""Stores past conversation history to maintain context.
Format: [{"role": "user/assistant", "content": "message"}].
"""
conversation_history: Optional[List[dict[str, Any]]] = None
"""Number of complete conversation turns (user-assistant pairs) to consider in the response context."""
history_turns: Optional[int] = None
class QueryResponse(BaseModel):
@@ -639,13 +686,38 @@ class QueryResponse(BaseModel):
class InsertTextRequest(BaseModel):
text: str
description: Optional[str] = None
class InsertResponse(BaseModel):
status: str
message: str
document_count: int
def QueryRequestToQueryParams(request: QueryRequest):
param = QueryParam(mode=request.mode, stream=request.stream)
if request.only_need_context is not None:
param.only_need_context = request.only_need_context
if request.only_need_prompt is not None:
param.only_need_prompt = request.only_need_prompt
if request.response_type is not None:
param.response_type = request.response_type
if request.top_k is not None:
param.top_k = request.top_k
if request.max_token_for_text_unit is not None:
param.max_token_for_text_unit = request.max_token_for_text_unit
if request.max_token_for_global_context is not None:
param.max_token_for_global_context = request.max_token_for_global_context
if request.max_token_for_local_context is not None:
param.max_token_for_local_context = request.max_token_for_local_context
if request.hl_keywords is not None:
param.hl_keywords = request.hl_keywords
if request.ll_keywords is not None:
param.ll_keywords = request.ll_keywords
if request.conversation_history is not None:
param.conversation_history = request.conversation_history
if request.history_turns is not None:
param.history_turns = request.history_turns
return param
def get_api_key_dependency(api_key: Optional[str]):
@@ -659,7 +731,9 @@ def get_api_key_dependency(api_key: Optional[str]):
# If API key is configured, use proper authentication
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
async def api_key_auth(api_key_header_value: str | None = Security(api_key_header)):
async def api_key_auth(
api_key_header_value: Optional[str] = Security(api_key_header),
):
if not api_key_header_value:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="API Key required"
@@ -675,6 +749,7 @@ def get_api_key_dependency(api_key: Optional[str]):
# Global configuration
global_top_k = 60 # default value
temp_prefix = "__tmp_" # prefix for temporary files
def create_app(args):
@@ -842,10 +917,19 @@ def create_app(args):
lifespan=lifespan,
)
def get_cors_origins():
"""Get allowed origins from environment variable
Returns a list of allowed origins, defaults to ["*"] if not set
"""
origins_str = os.getenv("CORS_ORIGINS", "*")
if origins_str == "*":
return ["*"]
return [origin.strip() for origin in origins_str.split(",")]
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_origins=get_cors_origins(),
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
@@ -1116,61 +1200,194 @@ def create_app(args):
("llm_response_cache", rag.llm_response_cache),
]
async def index_file(file_path: Union[str, Path]) -> None:
"""Index all files inside the folder with support for multiple file formats
async def pipeline_enqueue_file(file_path: Path) -> bool:
"""Add a file to the queue for processing
Args:
file_path: Path to the file to be indexed (str or Path object)
Raises:
ValueError: If file format is not supported
FileNotFoundError: If file doesn't exist
file_path: Path to the saved file
Returns:
bool: True if the file was successfully enqueued, False otherwise
"""
if not pm.is_installed("aiofiles"):
pm.install("aiofiles")
try:
content = ""
ext = file_path.suffix.lower()
# Convert to Path object if string
file_path = Path(file_path)
file = None
async with aiofiles.open(file_path, "rb") as f:
file = await f.read()
# Check if file exists
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
# Process based on file type
match ext:
case ".txt" | ".md":
content = file.decode("utf-8")
case ".pdf":
if not pm.is_installed("pypdf2"):
pm.install("pypdf2")
from PyPDF2 import PdfReader
from io import BytesIO
content = ""
# Get file extension in lowercase
ext = file_path.suffix.lower()
pdf_file = BytesIO(file)
reader = PdfReader(pdf_file)
for page in reader.pages:
content += page.extract_text() + "\n"
case ".docx":
if not pm.is_installed("docx"):
pm.install("docx")
from docx import Document
from io import BytesIO
match ext:
case ".txt" | ".md":
# Text files handling
async with aiofiles.open(file_path, "r", encoding="utf-8") as f:
content = await f.read()
docx_content = await file.read()
docx_file = BytesIO(docx_content)
doc = Document(docx_file)
content = "\n".join(
[paragraph.text for paragraph in doc.paragraphs]
)
case ".pptx":
if not pm.is_installed("pptx"):
pm.install("pptx")
from pptx import Presentation # type: ignore
from io import BytesIO
case ".pdf" | ".docx" | ".pptx" | ".xlsx":
if not pm.is_installed("docling"):
pm.install("docling")
from docling.document_converter import DocumentConverter
pptx_content = await file.read()
pptx_file = BytesIO(pptx_content)
prs = Presentation(pptx_file)
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + "\n"
case _:
logging.error(
f"Unsupported file type: {file_path.name} (extension {ext})"
)
return False
async def convert_doc():
def sync_convert():
converter = DocumentConverter()
result = converter.convert(file_path)
return result.document.export_to_markdown()
# Insert into the RAG queue
if content:
await rag.apipeline_enqueue_documents(content)
logging.info(
f"Successfully processed and enqueued file: {file_path.name}"
)
return True
else:
logging.error(
f"No content could be extracted from file: {file_path.name}"
)
return await asyncio.to_thread(sync_convert)
except Exception as e:
logging.error(
f"Error processing or enqueueing file {file_path.name}: {str(e)}"
)
logging.error(traceback.format_exc())
finally:
if file_path.name.startswith(temp_prefix):
# Clean up the temporary file after indexing
try:
file_path.unlink()
except Exception as e:
logging.error(f"Error deleting file {file_path}: {str(e)}")
return False
content = await convert_doc()
async def pipeline_index_file(file_path: Path):
"""Index a file
case _:
raise ValueError(f"Unsupported file format: {ext}")
Args:
file_path: Path to the saved file
"""
try:
if await pipeline_enqueue_file(file_path):
await rag.apipeline_process_enqueue_documents()
# Insert content into RAG system
if content:
await rag.ainsert(content)
doc_manager.mark_as_indexed(file_path)
logging.info(f"Successfully indexed file: {file_path}")
else:
logging.warning(f"No content extracted from file: {file_path}")
except Exception as e:
logging.error(f"Error indexing file {file_path.name}: {str(e)}")
logging.error(traceback.format_exc())
async def pipeline_index_files(file_paths: List[Path]):
"""Index multiple files concurrently
Args:
file_paths: Paths to the files to index
"""
if not file_paths:
return
try:
enqueued = False
if len(file_paths) == 1:
enqueued = await pipeline_enqueue_file(file_paths[0])
else:
tasks = [pipeline_enqueue_file(path) for path in file_paths]
enqueued = any(await asyncio.gather(*tasks))
if enqueued:
await rag.apipeline_process_enqueue_documents()
except Exception as e:
logging.error(f"Error indexing files: {str(e)}")
logging.error(traceback.format_exc())
async def pipeline_index_texts(texts: List[str]):
"""Index a list of texts
Args:
texts: The texts to index
"""
if not texts:
return
await rag.apipeline_enqueue_documents(texts)
await rag.apipeline_process_enqueue_documents()
async def save_temp_file(file: UploadFile = File(...)) -> Path:
"""Save the uploaded file to a temporary location
Args:
file: The uploaded file
Returns:
Path: The path to the saved file
"""
# Generate unique filename to avoid conflicts
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
unique_filename = f"{temp_prefix}{timestamp}_{file.filename}"
# Create a temporary file to save the uploaded content
temp_path = doc_manager.input_dir / "temp" / unique_filename
temp_path.parent.mkdir(exist_ok=True)
# Save the file
with open(temp_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return temp_path
async def run_scanning_process():
"""Background task to scan and index documents"""
global scan_progress
try:
new_files = doc_manager.scan_directory_for_new_files()
scan_progress["total_files"] = len(new_files)
logger.info(f"Found {len(new_files)} new files to index.")
for file_path in new_files:
try:
with progress_lock:
scan_progress["current_file"] = os.path.basename(file_path)
await pipeline_index_file(file_path)
with progress_lock:
scan_progress["indexed_count"] += 1
scan_progress["progress"] = (
scan_progress["indexed_count"]
/ scan_progress["total_files"]
) * 100
except Exception as e:
logging.error(f"Error indexing file {file_path}: {str(e)}")
except Exception as e:
logging.error(f"Error during scanning process: {str(e)}")
finally:
with progress_lock:
scan_progress["is_scanning"] = False
@app.post("/documents/scan", dependencies=[Depends(optional_api_key)])
async def scan_for_new_documents(background_tasks: BackgroundTasks):
@@ -1190,37 +1407,6 @@ def create_app(args):
return {"status": "scanning_started"}
async def run_scanning_process():
"""Background task to scan and index documents"""
global scan_progress
try:
new_files = doc_manager.scan_directory_for_new_files()
scan_progress["total_files"] = len(new_files)
for file_path in new_files:
try:
with progress_lock:
scan_progress["current_file"] = os.path.basename(file_path)
await index_file(file_path)
with progress_lock:
scan_progress["indexed_count"] += 1
scan_progress["progress"] = (
scan_progress["indexed_count"]
/ scan_progress["total_files"]
) * 100
except Exception as e:
logging.error(f"Error indexing file {file_path}: {str(e)}")
except Exception as e:
logging.error(f"Error during scanning process: {str(e)}")
finally:
with progress_lock:
scan_progress["is_scanning"] = False
@app.get("/documents/scan-progress")
async def get_scan_progress():
"""Get the current scanning progress"""
@@ -1228,7 +1414,9 @@ def create_app(args):
return scan_progress
@app.post("/documents/upload", dependencies=[Depends(optional_api_key)])
async def upload_to_input_dir(file: UploadFile = File(...)):
async def upload_to_input_dir(
background_tasks: BackgroundTasks, file: UploadFile = File(...)
):
"""
Endpoint for uploading a file to the input directory and indexing it.
@@ -1237,6 +1425,7 @@ def create_app(args):
indexes it for retrieval, and returns a success status with relevant details.
Parameters:
background_tasks: FastAPI BackgroundTasks for async processing
file (UploadFile): The file to be uploaded. It must have an allowed extension as per
`doc_manager.supported_extensions`.
@@ -1261,15 +1450,175 @@ def create_app(args):
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Immediately index the uploaded file
await index_file(file_path)
# Add to background tasks
background_tasks.add_task(pipeline_index_file, file_path)
return {
"status": "success",
"message": f"File uploaded and indexed: {file.filename}",
"total_documents": len(doc_manager.indexed_files),
}
return InsertResponse(
status="success",
message=f"File '{file.filename}' uploaded successfully. Processing will continue in background.",
)
except Exception as e:
logging.error(f"Error /documents/upload: {file.filename}: {str(e)}")
logging.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.post(
"/documents/text",
response_model=InsertResponse,
dependencies=[Depends(optional_api_key)],
)
async def insert_text(
request: InsertTextRequest, background_tasks: BackgroundTasks
):
"""
Insert text into the Retrieval-Augmented Generation (RAG) system.
This endpoint allows you to insert text data into the RAG system for later retrieval and use in generating responses.
Args:
request (InsertTextRequest): The request body containing the text to be inserted.
background_tasks: FastAPI BackgroundTasks for async processing
Returns:
InsertResponse: A response object containing the status of the operation, a message, and the number of documents inserted.
"""
try:
background_tasks.add_task(pipeline_index_texts, [request.text])
return InsertResponse(
status="success",
message="Text successfully received. Processing will continue in background.",
)
except Exception as e:
logging.error(f"Error /documents/text: {str(e)}")
logging.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.post(
"/documents/file",
response_model=InsertResponse,
dependencies=[Depends(optional_api_key)],
)
async def insert_file(
background_tasks: BackgroundTasks, file: UploadFile = File(...)
):
"""Insert a file directly into the RAG system
Args:
background_tasks: FastAPI BackgroundTasks for async processing
file: Uploaded file
Returns:
InsertResponse: Status of the insertion operation
Raises:
HTTPException: For unsupported file types or processing errors
"""
try:
if not doc_manager.is_supported_file(file.filename):
raise HTTPException(
status_code=400,
detail=f"Unsupported file type. Supported types: {doc_manager.supported_extensions}",
)
# Create a temporary file to save the uploaded content
temp_path = save_temp_file(file)
# Add to background tasks
background_tasks.add_task(pipeline_index_file, temp_path)
return InsertResponse(
status="success",
message=f"File '{file.filename}' saved successfully. Processing will continue in background.",
)
except Exception as e:
logging.error(f"Error /documents/file: {str(e)}")
logging.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.post(
"/documents/batch",
response_model=InsertResponse,
dependencies=[Depends(optional_api_key)],
)
async def insert_batch(
background_tasks: BackgroundTasks, files: List[UploadFile] = File(...)
):
"""Process multiple files in batch mode
Args:
background_tasks: FastAPI BackgroundTasks for async processing
files: List of files to process
Returns:
InsertResponse: Status of the batch insertion operation
Raises:
HTTPException: For processing errors
"""
try:
inserted_count = 0
failed_files = []
temp_files = []
for file in files:
if doc_manager.is_supported_file(file.filename):
# Create a temporary file to save the uploaded content
temp_files.append(save_temp_file(file))
inserted_count += 1
else:
failed_files.append(f"{file.filename} (unsupported type)")
if temp_files:
background_tasks.add_task(pipeline_index_files, temp_files)
# Prepare status message
if inserted_count == len(files):
status = "success"
status_message = f"Successfully inserted all {inserted_count} documents"
elif inserted_count > 0:
status = "partial_success"
status_message = f"Successfully inserted {inserted_count} out of {len(files)} documents"
if failed_files:
status_message += f". Failed files: {', '.join(failed_files)}"
else:
status = "failure"
status_message = "No documents were successfully inserted"
if failed_files:
status_message += f". Failed files: {', '.join(failed_files)}"
return InsertResponse(status=status, message=status_message)
except Exception as e:
logging.error(f"Error /documents/batch: {file.filename}: {str(e)}")
logging.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.delete(
"/documents",
response_model=InsertResponse,
dependencies=[Depends(optional_api_key)],
)
async def clear_documents():
"""
Clear all documents from the LightRAG system.
This endpoint deletes all text chunks, entities vector database, and relationships vector database,
effectively clearing all documents from the LightRAG system.
Returns:
InsertResponse: A response object containing the status, message, and the new document count (0 in this case).
"""
try:
rag.text_chunks = []
rag.entities_vdb = None
rag.relationships_vdb = None
return InsertResponse(
status="success", message="All documents cleared successfully"
)
except Exception as e:
logging.error(f"Error DELETE /documents: {str(e)}")
logging.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.post(
@@ -1280,12 +1629,7 @@ def create_app(args):
Handle a POST request at the /query endpoint to process user queries using RAG capabilities.
Parameters:
request (QueryRequest): A Pydantic model containing the following fields:
- query (str): The text of the user's query.
- mode (ModeEnum): Optional. Specifies the mode of retrieval augmentation.
- stream (bool): Optional. Determines if the response should be streamed.
- only_need_context (bool): Optional. If true, returns only the context without further processing.
request (QueryRequest): The request object containing the query parameters.
Returns:
QueryResponse: A Pydantic model containing the result of the query processing.
If a string is returned (e.g., cache hit), it's directly returned.
@@ -1297,13 +1641,7 @@ def create_app(args):
"""
try:
response = await rag.aquery(
request.query,
param=QueryParam(
mode=request.mode,
stream=request.stream,
only_need_context=request.only_need_context,
top_k=global_top_k,
),
request.query, param=QueryRequestToQueryParams(request)
)
# If response is a string (e.g. cache hit), return directly
@@ -1311,16 +1649,16 @@ def create_app(args):
return QueryResponse(response=response)
# If it's an async generator, decide whether to stream based on stream parameter
if request.stream:
if request.stream or hasattr(response, "__aiter__"):
result = ""
async for chunk in response:
result += chunk
return QueryResponse(response=result)
elif isinstance(response, dict):
result = json.dumps(response, indent=2)
return QueryResponse(response=result)
else:
result = ""
async for chunk in response:
result += chunk
return QueryResponse(response=result)
return QueryResponse(response=str(response))
except Exception as e:
trace_exception(e)
raise HTTPException(status_code=500, detail=str(e))
@@ -1338,14 +1676,11 @@ def create_app(args):
StreamingResponse: A streaming response containing the RAG query results.
"""
try:
params = QueryRequestToQueryParams(request)
params.stream = True
response = await rag.aquery( # Use aquery instead of query, and add await
request.query,
param=QueryParam(
mode=request.mode,
stream=True,
only_need_context=request.only_need_context,
top_k=global_top_k,
),
request.query, param=params
)
from fastapi.responses import StreamingResponse
@@ -1371,265 +1706,13 @@ def create_app(args):
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "application/x-ndjson",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type",
"X-Accel-Buffering": "no", # Disable Nginx buffering
"X-Accel-Buffering": "no", # 确保在Nginx代理时正确处理流式响应
},
)
except Exception as e:
trace_exception(e)
raise HTTPException(status_code=500, detail=str(e))
@app.post(
"/documents/text",
response_model=InsertResponse,
dependencies=[Depends(optional_api_key)],
)
async def insert_text(request: InsertTextRequest):
"""
Insert text into the Retrieval-Augmented Generation (RAG) system.
This endpoint allows you to insert text data into the RAG system for later retrieval and use in generating responses.
Args:
request (InsertTextRequest): The request body containing the text to be inserted.
Returns:
InsertResponse: A response object containing the status of the operation, a message, and the number of documents inserted.
"""
try:
await rag.ainsert(request.text)
return InsertResponse(
status="success",
message="Text successfully inserted",
document_count=1,
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post(
"/documents/file",
response_model=InsertResponse,
dependencies=[Depends(optional_api_key)],
)
async def insert_file(file: UploadFile = File(...), description: str = Form(None)):
"""Insert a file directly into the RAG system
Args:
file: Uploaded file
description: Optional description of the file
Returns:
InsertResponse: Status of the insertion operation
Raises:
HTTPException: For unsupported file types or processing errors
"""
try:
content = ""
# Get file extension in lowercase
ext = Path(file.filename).suffix.lower()
match ext:
case ".txt" | ".md":
# Text files handling
text_content = await file.read()
content = text_content.decode("utf-8")
case ".pdf" | ".docx" | ".pptx" | ".xlsx":
if not pm.is_installed("docling"):
pm.install("docling")
from docling.document_converter import DocumentConverter
# Create a temporary file to save the uploaded content
temp_path = Path("temp") / file.filename
temp_path.parent.mkdir(exist_ok=True)
# Save the uploaded file
with temp_path.open("wb") as f:
f.write(await file.read())
try:
async def convert_doc():
def sync_convert():
converter = DocumentConverter()
result = converter.convert(str(temp_path))
return result.document.export_to_markdown()
return await asyncio.to_thread(sync_convert)
content = await convert_doc()
finally:
# Clean up the temporary file
temp_path.unlink()
# Insert content into RAG system
if content:
# Add description if provided
if description:
content = f"{description}\n\n{content}"
await rag.ainsert(content)
logging.info(f"Successfully indexed file: {file.filename}")
return InsertResponse(
status="success",
message=f"File '{file.filename}' successfully inserted",
document_count=1,
)
else:
raise HTTPException(
status_code=400,
detail="No content could be extracted from the file",
)
except UnicodeDecodeError:
raise HTTPException(status_code=400, detail="File encoding not supported")
except Exception as e:
logging.error(f"Error processing file {file.filename}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post(
"/documents/batch",
response_model=InsertResponse,
dependencies=[Depends(optional_api_key)],
)
async def insert_batch(files: List[UploadFile] = File(...)):
"""Process multiple files in batch mode
Args:
files: List of files to process
Returns:
InsertResponse: Status of the batch insertion operation
Raises:
HTTPException: For processing errors
"""
try:
inserted_count = 0
failed_files = []
for file in files:
try:
content = ""
ext = Path(file.filename).suffix.lower()
match ext:
case ".txt" | ".md":
text_content = await file.read()
content = text_content.decode("utf-8")
case ".pdf":
if not pm.is_installed("pypdf2"):
pm.install("pypdf2")
from PyPDF2 import PdfReader
from io import BytesIO
pdf_content = await file.read()
pdf_file = BytesIO(pdf_content)
reader = PdfReader(pdf_file)
for page in reader.pages:
content += page.extract_text() + "\n"
case ".docx":
if not pm.is_installed("docx"):
pm.install("docx")
from docx import Document
from io import BytesIO
docx_content = await file.read()
docx_file = BytesIO(docx_content)
doc = Document(docx_file)
content = "\n".join(
[paragraph.text for paragraph in doc.paragraphs]
)
case ".pptx":
if not pm.is_installed("pptx"):
pm.install("pptx")
from pptx import Presentation # type: ignore
from io import BytesIO
pptx_content = await file.read()
pptx_file = BytesIO(pptx_content)
prs = Presentation(pptx_file)
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + "\n"
case _:
failed_files.append(f"{file.filename} (unsupported type)")
continue
if content:
await rag.ainsert(content)
inserted_count += 1
logging.info(f"Successfully indexed file: {file.filename}")
else:
failed_files.append(f"{file.filename} (no content extracted)")
except UnicodeDecodeError:
failed_files.append(f"{file.filename} (encoding error)")
except Exception as e:
failed_files.append(f"{file.filename} ({str(e)})")
logging.error(f"Error processing file {file.filename}: {str(e)}")
# Prepare status message
if inserted_count == len(files):
status = "success"
status_message = f"Successfully inserted all {inserted_count} documents"
elif inserted_count > 0:
status = "partial_success"
status_message = f"Successfully inserted {inserted_count} out of {len(files)} documents"
if failed_files:
status_message += f". Failed files: {', '.join(failed_files)}"
else:
status = "failure"
status_message = "No documents were successfully inserted"
if failed_files:
status_message += f". Failed files: {', '.join(failed_files)}"
return InsertResponse(
status=status,
message=status_message,
document_count=inserted_count,
)
except Exception as e:
logging.error(f"Batch processing error: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.delete(
"/documents",
response_model=InsertResponse,
dependencies=[Depends(optional_api_key)],
)
async def clear_documents():
"""
Clear all documents from the LightRAG system.
This endpoint deletes all text chunks, entities vector database, and relationships vector database,
effectively clearing all documents from the LightRAG system.
Returns:
InsertResponse: A response object containing the status, message, and the new document count (0 in this case).
"""
try:
rag.text_chunks = []
rag.entities_vdb = None
rag.relationships_vdb = None
return InsertResponse(
status="success",
message="All documents cleared successfully",
document_count=0,
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# query all graph labels
@app.get("/graph/label/list")
async def get_graph_labels():

View File

@@ -316,9 +316,7 @@ class OllamaAPI:
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "application/x-ndjson",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type",
"X-Accel-Buffering": "no", # 确保在Nginx代理时正确处理流式响应
},
)
else:
@@ -534,9 +532,7 @@ class OllamaAPI:
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "application/x-ndjson",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type",
"X-Accel-Buffering": "no", # 确保在Nginx代理时正确处理流式响应
},
)
else:

View File

@@ -39,8 +39,8 @@ class FaissVectorDBStorage(BaseVectorStorage):
def __post_init__(self):
# Grab config values if available
config = self.global_config.get("vector_db_storage_cls_kwargs", {})
cosine_threshold = config.get("cosine_better_than_threshold")
kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
cosine_threshold = kwargs.get("cosine_better_than_threshold")
if cosine_threshold is None:
raise ValueError(
"cosine_better_than_threshold must be specified in vector_db_storage_cls_kwargs"

View File

@@ -185,7 +185,7 @@ class MongoDocStatusStorage(DocStatusStorage):
async def get_docs_by_status(
self, status: DocStatus
) -> dict[str, DocProcessingStatus]:
"""Get all documents by status"""
"""Get all documents with a specific status"""
cursor = self._data.find({"status": status.value})
result = await cursor.to_list()
return {

View File

@@ -34,8 +34,8 @@ class NanoVectorDBStorage(BaseVectorStorage):
# Initialize lock only for file operations
self._save_lock = asyncio.Lock()
# Use global config value if specified, otherwise use default
config = self.global_config.get("vector_db_storage_cls_kwargs", {})
cosine_threshold = config.get("cosine_better_than_threshold")
kwargs = self.global_config.get("vector_db_storage_cls_kwargs", {})
cosine_threshold = kwargs.get("cosine_better_than_threshold")
if cosine_threshold is None:
raise ValueError(
"cosine_better_than_threshold must be specified in vector_db_storage_cls_kwargs"

View File

@@ -482,7 +482,7 @@ class PGDocStatusStorage(DocStatusStorage):
async def get_docs_by_status(
self, status: DocStatus
) -> Dict[str, DocProcessingStatus]:
"""Get all documents by status"""
"""all documents with a specific status"""
sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and status=$2"
params = {"workspace": self.db.workspace, "status": status}
result = await self.db.query(sql, params, True)

View File

@@ -89,7 +89,7 @@ STORAGE_IMPLEMENTATIONS = {
"PGDocStatusStorage",
"MongoDocStatusStorage",
],
"required_methods": ["get_pending_docs"],
"required_methods": ["get_docs_by_status"],
},
}
@@ -230,7 +230,7 @@ class LightRAG:
"""LightRAG: Simple and Fast Retrieval-Augmented Generation."""
working_dir: str = field(
default_factory=lambda: f'./lightrag_cache_{datetime.now().strftime("%Y-%m-%d-%H:%M:%S")}'
default_factory=lambda: f"./lightrag_cache_{datetime.now().strftime('%Y-%m-%d-%H:%M:%S')}"
)
"""Directory where cache and temporary files are stored."""
@@ -715,11 +715,11 @@ class LightRAG:
# 1. Get all pending, failed, and abnormally terminated processing documents.
to_process_docs: dict[str, DocProcessingStatus] = {}
processing_docs = await self.doc_status.get_processing_docs()
processing_docs = await self.doc_status.get_docs_by_status(DocStatus.PROCESSING)
to_process_docs.update(processing_docs)
failed_docs = await self.doc_status.get_failed_docs()
failed_docs = await self.doc_status.get_docs_by_status(DocStatus.FAILED)
to_process_docs.update(failed_docs)
pendings_docs = await self.doc_status.get_pending_docs()
pendings_docs = await self.doc_status.get_docs_by_status(DocStatus.PENDING)
to_process_docs.update(pendings_docs)
if not to_process_docs: