From 58e74d5fb25f0003d08edd701330d4b340ab2e58 Mon Sep 17 00:00:00 2001 From: Luca Congiu Date: Mon, 23 Dec 2024 14:34:34 +0100 Subject: [PATCH] Added Azure OpenAI api sample with streaming --- .gitignore | 4 + api/.env.aoi.example | 7 + api/README_AZURE_OPENAI.md | 183 ++++++++++++ api/azure_openai_lightrag_server.py | 437 ++++++++++++++++++++++++++++ api/requirements.txt | 13 + examples/.env.oai.example | 7 + lightrag/llm.py | 32 +- 7 files changed, 678 insertions(+), 5 deletions(-) create mode 100644 api/.env.aoi.example create mode 100644 api/README_AZURE_OPENAI.md create mode 100644 api/azure_openai_lightrag_server.py create mode 100644 examples/.env.oai.example diff --git a/.gitignore b/.gitignore index f038ef5d..7b7aaad5 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,7 @@ ignore_this.txt gui/ *.log .vscode +.env +venv/ +examples/input/ +examples/output/ \ No newline at end of file diff --git a/api/.env.aoi.example b/api/.env.aoi.example new file mode 100644 index 00000000..288d7ff3 --- /dev/null +++ b/api/.env.aoi.example @@ -0,0 +1,7 @@ +AZURE_OPENAI_API_VERSION=2024-08-01-preview +AZURE_OPENAI_DEPLOYMENT=gpt-4o +AZURE_OPENAI_API_KEY=myapikey +AZURE_OPENAI_ENDPOINT=https://myendpoint.openai.azure.com + +AZURE_EMBEDDING_DEPLOYMENT=text-embedding-3-large +AZURE_EMBEDDING_API_VERSION=2023-05-15 \ No newline at end of file diff --git a/api/README_AZURE_OPENAI.md b/api/README_AZURE_OPENAI.md new file mode 100644 index 00000000..d48ae89a --- /dev/null +++ b/api/README_AZURE_OPENAI.md @@ -0,0 +1,183 @@ + +# LightRAG API Server + +A powerful FastAPI-based server for managing and querying documents using LightRAG (Light Retrieval-Augmented Generation). This server provides a REST API interface for document management and intelligent querying using OpenAI's language models. + +## Features + +- 🔍 Multiple search modes (naive, local, global, hybrid) +- 📡 Streaming and non-streaming responses +- 📝 Document management (insert, batch upload, clear) +- ⚙️ Highly configurable model parameters +- 📚 Support for text and file uploads +- 🔧 RESTful API with automatic documentation +- 🚀 Built with FastAPI for high performance + +## Prerequisites + +- Python 3.8+ +- Azure OpenAI API key +- Azure OpenAI Deployments (gpt-4o, text-embedding-3-large) +- Required Python packages: + - fastapi + - uvicorn + - lightrag + - pydantic + - openai + - nest-asyncio + +## Installation +If you are using Windows, you will need to download and install visual c++ build tools from [https://visualstudio.microsoft.com/visual-cpp-build-tools/](https://visualstudio.microsoft.com/visual-cpp-build-tools/) +Make sure you install the VS 2022 C++ x64/x86 Build tools from individual components tab. + +1. Clone the repository: +```bash +git clone https://github.com/ParisNeo/LightRAG.git +cd api +``` + +2. Install dependencies: +```bash +python -m venv venv +source venv/bin/activate +#venv\Scripts\activate for Windows +pip install -r requirements.txt +``` + +3. Set up environment variables: + use the `.env` file to set the environment variables (you can copy the `.env.aoi.example` file and rename it to `.env`), + or set them manually: +```bash +export AZURE_OPENAI_API_VERSION='2024-08-01-preview' +export AZURE_OPENAI_DEPLOYMENT='gpt-4o' +export AZURE_OPENAI_API_KEY='myapikey' +export AZURE_OPENAI_ENDPOINT='https://myendpoint.openai.azure.com' +export AZURE_EMBEDDING_DEPLOYMENT='text-embedding-3-large' +export AZURE_EMBEDDING_API_VERSION='2023-05-15' +``` + +## Configuration + +The server can be configured using command-line arguments: + +```bash +python azure_openai_lightrag_server.py --help +``` + +Available options: + +| Parameter | Default | Description | +|-----------|---------|-------------| +| --host | 0.0.0.0 | Server host | +| --port | 9621 | Server port | +| --model | gpt-4 | OpenAI model name | +| --embedding-model | text-embedding-3-large | OpenAI embedding model | +| --working-dir | ./rag_storage | Working directory for RAG | +| --max-tokens | 32768 | Maximum token size | +| --max-embed-tokens | 8192 | Maximum embedding token size | +| --input-dir | ./inputs | Input directory for documents | +| --enable-cache | True | Enable response cache | +| --log-level | INFO | Logging level | + +## Quick Start + +1. Basic usage with default settings: +```bash +python azure_openai_lightrag_server.py +``` + +2. Custom configuration: +```bash +python azure_openai_lightrag_server.py --model gpt-4o --port 8080 --working-dir ./custom_rag +``` + +## API Endpoints + +### Query Endpoints + +#### POST /query +Query the RAG system with options for different search modes. + +```bash +curl -X POST "http://localhost:9621/query" \ + -H "Content-Type: application/json" \ + -d '{"query": "Your question here", "mode": "hybrid"}' +``` + +#### POST /query/stream +Stream responses from the RAG system. + +```bash +curl -X POST "http://localhost:9621/query/stream" \ + -H "Content-Type: application/json" \ + -d '{"query": "Your question here", "mode": "hybrid"}' +``` + +### Document Management Endpoints + +#### POST /documents/text +Insert text directly into the RAG system. + +```bash +curl -X POST "http://localhost:9621/documents/text" \ + -H "Content-Type: application/json" \ + -d '{"text": "Your text content here", "description": "Optional description"}' +``` + +#### POST /documents/file +Upload a single file to the RAG system. + +```bash +curl -X POST "http://localhost:9621/documents/file" \ + -F "file=@/path/to/your/document.txt" \ + -F "description=Optional description" +``` + +#### POST /documents/batch +Upload multiple files at once. + +```bash +curl -X POST "http://localhost:9621/documents/batch" \ + -F "files=@/path/to/doc1.txt" \ + -F "files=@/path/to/doc2.txt" +``` + +#### DELETE /documents +Clear all documents from the RAG system. + +```bash +curl -X DELETE "http://localhost:9621/documents" +``` + +### Utility Endpoints + +#### GET /health +Check server health and configuration. + +```bash +curl "http://localhost:9621/health" +``` + +## Development + +### Running in Development Mode + +```bash +uvicorn azure_openai_lightrag_server:app --reload --port 9621 +``` + +### API Documentation + +When the server is running, visit: +- Swagger UI: http://localhost:9621/docs +- ReDoc: http://localhost:9621/redoc + +## License + +This project is licensed under the MIT License - see the LICENSE file for details. + +## Acknowledgments + +- Built with [FastAPI](https://fastapi.tiangolo.com/) +- Uses [LightRAG](https://github.com/HKUDS/LightRAG) for document processing +- Powered by [OpenAI](https://openai.com/) for language model inference diff --git a/api/azure_openai_lightrag_server.py b/api/azure_openai_lightrag_server.py new file mode 100644 index 00000000..26dc3a61 --- /dev/null +++ b/api/azure_openai_lightrag_server.py @@ -0,0 +1,437 @@ +from fastapi import FastAPI, HTTPException, File, UploadFile, Form +from pydantic import BaseModel +import asyncio +import logging +import argparse +from lightrag import LightRAG, QueryParam +from lightrag.llm import azure_openai_complete_if_cache, azure_openai_complete, azure_openai_embedding +from lightrag.utils import EmbeddingFunc +from typing import Optional, List +from enum import Enum +from pathlib import Path +import shutil +import aiofiles +from ascii_colors import trace_exception +import os +from dotenv import load_dotenv +import inspect +import json +from fastapi.responses import StreamingResponse + +load_dotenv() + +AZURE_OPENAI_API_VERSION = os.getenv("AZURE_OPENAI_API_VERSION") +AZURE_OPENAI_DEPLOYMENT = os.getenv("AZURE_OPENAI_DEPLOYMENT") +AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY") +AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT") + +AZURE_EMBEDDING_DEPLOYMENT = os.getenv("AZURE_EMBEDDING_DEPLOYMENT") +AZURE_EMBEDDING_API_VERSION = os.getenv("AZURE_EMBEDDING_API_VERSION") + +def parse_args(): + parser = argparse.ArgumentParser( + description="LightRAG FastAPI Server with OpenAI integration" + ) + + # Server configuration + parser.add_argument( + "--host", default="0.0.0.0", help="Server host (default: 0.0.0.0)" + ) + parser.add_argument( + "--port", type=int, default=9621, help="Server port (default: 9621)" + ) + + # Directory configuration + parser.add_argument( + "--working-dir", + default="./rag_storage", + help="Working directory for RAG storage (default: ./rag_storage)", + ) + parser.add_argument( + "--input-dir", + default="./inputs", + help="Directory containing input documents (default: ./inputs)", + ) + + # Model configuration + parser.add_argument( + "--model", default="gpt-4o", help="OpenAI model name (default: gpt-4o)" + ) + parser.add_argument( + "--embedding-model", + default="text-embedding-3-large", + help="OpenAI embedding model (default: text-embedding-3-large)", + ) + + # RAG configuration + parser.add_argument( + "--max-tokens", + type=int, + default=32768, + help="Maximum token size (default: 32768)", + ) + parser.add_argument( + "--max-embed-tokens", + type=int, + default=8192, + help="Maximum embedding token size (default: 8192)", + ) + parser.add_argument( + "--enable-cache", + default=True, + help="Enable response cache (default: True)", + ) + # Logging configuration + parser.add_argument( + "--log-level", + default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + help="Logging level (default: INFO)", + ) + + return parser.parse_args() + + +class DocumentManager: + """Handles document operations and tracking""" + + def __init__(self, input_dir: str, supported_extensions: tuple = (".txt", ".md")): + self.input_dir = Path(input_dir) + self.supported_extensions = supported_extensions + self.indexed_files = set() + + # Create input directory if it doesn't exist + self.input_dir.mkdir(parents=True, exist_ok=True) + + def scan_directory(self) -> List[Path]: + """Scan input directory for new files""" + new_files = [] + for ext in self.supported_extensions: + for file_path in self.input_dir.rglob(f"*{ext}"): + if file_path not in self.indexed_files: + new_files.append(file_path) + return new_files + + def mark_as_indexed(self, file_path: Path): + """Mark a file as indexed""" + self.indexed_files.add(file_path) + + def is_supported_file(self, filename: str) -> bool: + """Check if file type is supported""" + return any(filename.lower().endswith(ext) for ext in self.supported_extensions) + + +# Pydantic models +class SearchMode(str, Enum): + naive = "naive" + local = "local" + global_ = "global" + hybrid = "hybrid" + + +class QueryRequest(BaseModel): + query: str + mode: SearchMode = SearchMode.hybrid + #stream: bool = False + + +class QueryResponse(BaseModel): + response: str + + +class InsertTextRequest(BaseModel): + text: str + description: Optional[str] = None + + +class InsertResponse(BaseModel): + status: str + message: str + document_count: int + + +async def get_embedding_dim(embedding_model: str) -> int: + """Get embedding dimensions for the specified model""" + test_text = ["This is a test sentence."] + embedding = await azure_openai_embedding(test_text, model=embedding_model) + return embedding.shape[1] + + +def create_app(args): + # Setup logging + logging.basicConfig( + format="%(levelname)s:%(message)s", level=getattr(logging, args.log_level) + ) + + # Initialize FastAPI app + app = FastAPI( + title="LightRAG API", + description="API for querying text using LightRAG with OpenAI integration", + ) + + # Create working directory if it doesn't exist + Path(args.working_dir).mkdir(parents=True, exist_ok=True) + + # Initialize document manager + doc_manager = DocumentManager(args.input_dir) + + # Get embedding dimensions + embedding_dim = asyncio.run(get_embedding_dim(args.embedding_model)) + + async def async_openai_complete( + prompt, system_prompt=None, history_messages=[], **kwargs + ): + """Async wrapper for OpenAI completion""" + keyword_extraction = kwargs.pop("keyword_extraction", None) + + return await azure_openai_complete_if_cache( + args.model, + prompt, + system_prompt=system_prompt, + history_messages=history_messages, + base_url=AZURE_OPENAI_ENDPOINT, + api_key=AZURE_OPENAI_API_KEY, + api_version=AZURE_OPENAI_API_VERSION, + **kwargs, + ) + + # Initialize RAG with OpenAI configuration + rag = LightRAG( + enable_llm_cache=args.enable_cache, + working_dir=args.working_dir, + llm_model_func=async_openai_complete, + llm_model_name=args.model, + llm_model_max_token_size=args.max_tokens, + embedding_func=EmbeddingFunc( + embedding_dim=embedding_dim, + max_token_size=args.max_embed_tokens, + func=lambda texts: azure_openai_embedding(texts, model=args.embedding_model), + ), + ) + + + @app.on_event("startup") + async def startup_event(): + """Index all files in input directory during startup""" + try: + new_files = doc_manager.scan_directory() + for file_path in new_files: + try: + # Use async file reading + async with aiofiles.open(file_path, "r", encoding="utf-8") as f: + content = await f.read() + # Use the async version of insert directly + await rag.ainsert(content) + doc_manager.mark_as_indexed(file_path) + logging.info(f"Indexed file: {file_path}") + except Exception as e: + trace_exception(e) + logging.error(f"Error indexing file {file_path}: {str(e)}") + + logging.info(f"Indexed {len(new_files)} documents from {args.input_dir}") + + except Exception as e: + logging.error(f"Error during startup indexing: {str(e)}") + + @app.post("/documents/scan") + async def scan_for_new_documents(): + """Manually trigger scanning for new documents""" + try: + new_files = doc_manager.scan_directory() + indexed_count = 0 + + for file_path in new_files: + try: + with open(file_path, "r", encoding="utf-8") as f: + content = f.read() + await rag.ainsert(content) + doc_manager.mark_as_indexed(file_path) + indexed_count += 1 + except Exception as e: + logging.error(f"Error indexing file {file_path}: {str(e)}") + + return { + "status": "success", + "indexed_count": indexed_count, + "total_documents": len(doc_manager.indexed_files), + } + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + @app.post("/resetcache") + async def reset_cache(): + """Manually reset cache""" + try: + cachefile = args.working_dir + "/kv_store_llm_response_cache.json" + if os.path.exists(cachefile): + with open(cachefile, "w") as f: + f.write("{}") + return { + "status": "success" + } + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + @app.post("/documents/upload") + async def upload_to_input_dir(file: UploadFile = File(...)): + """Upload a file to the input directory""" + try: + if not doc_manager.is_supported_file(file.filename): + raise HTTPException( + status_code=400, + detail=f"Unsupported file type. Supported types: {doc_manager.supported_extensions}", + ) + + file_path = doc_manager.input_dir / file.filename + with open(file_path, "wb") as buffer: + shutil.copyfileobj(file.file, buffer) + + # Immediately index the uploaded file + with open(file_path, "r", encoding="utf-8") as f: + content = f.read() + await rag.ainsert(content) + doc_manager.mark_as_indexed(file_path) + + return { + "status": "success", + "message": f"File uploaded and indexed: {file.filename}", + "total_documents": len(doc_manager.indexed_files), + } + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + @app.post("/query", response_model=QueryResponse) + async def query_text(request: QueryRequest): + try: + response = await rag.aquery( + request.query, + param=QueryParam(mode=request.mode, stream=False), + ) + return QueryResponse(response=response) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + @app.post("/query/stream") + async def query_text_stream(request: QueryRequest): + try: + response = await rag.aquery( + request.query, + param=QueryParam(mode=request.mode, stream=True), + ) + if inspect.isasyncgen(response): + async def stream_generator(): + async for chunk in response: + yield json.dumps({"data": chunk}) + "\n" + + return StreamingResponse(stream_generator(), media_type="application/json") + else: + return QueryResponse(response=response) + + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + @app.post("/documents/text", response_model=InsertResponse) + async def insert_text(request: InsertTextRequest): + try: + rag.insert(request.text) + return InsertResponse( + status="success", + message="Text successfully inserted", + document_count=len(rag), + ) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + @app.post("/documents/file", response_model=InsertResponse) + async def insert_file(file: UploadFile = File(...), description: str = Form(None)): + try: + content = await file.read() + + if file.filename.endswith((".txt", ".md")): + text = content.decode("utf-8") + rag.insert(text) + else: + raise HTTPException( + status_code=400, + detail="Unsupported file type. Only .txt and .md files are supported", + ) + + return InsertResponse( + status="success", + message=f"File '{file.filename}' successfully inserted", + document_count=len(rag), + ) + except UnicodeDecodeError: + raise HTTPException(status_code=400, detail="File encoding not supported") + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + @app.post("/documents/batch", response_model=InsertResponse) + async def insert_batch(files: List[UploadFile] = File(...)): + try: + inserted_count = 0 + failed_files = [] + + for file in files: + try: + content = await file.read() + if file.filename.endswith((".txt", ".md")): + text = content.decode("utf-8") + rag.insert(text) + inserted_count += 1 + else: + failed_files.append(f"{file.filename} (unsupported type)") + except Exception as e: + failed_files.append(f"{file.filename} ({str(e)})") + + status_message = f"Successfully inserted {inserted_count} documents" + if failed_files: + status_message += f". Failed files: {', '.join(failed_files)}" + + return InsertResponse( + status="success" if inserted_count > 0 else "partial_success", + message=status_message, + document_count=len(rag), + ) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + @app.delete("/documents", response_model=InsertResponse) + async def clear_documents(): + try: + rag.text_chunks = [] + rag.entities_vdb = None + rag.relationships_vdb = None + return InsertResponse( + status="success", + message="All documents cleared successfully", + document_count=0, + ) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + @app.get("/health") + async def get_status(): + """Get current system status""" + return { + "status": "healthy", + "working_directory": str(args.working_dir), + "input_directory": str(args.input_dir), + "indexed_files": len(doc_manager.indexed_files), + "configuration": { + "model": args.model, + "embedding_model": args.embedding_model, + "max_tokens": args.max_tokens, + "embedding_dim": embedding_dim, + }, + } + + return app + + +if __name__ == "__main__": + args = parse_args() + import uvicorn + app = create_app(args) + uvicorn.run(app, host=args.host, port=args.port) diff --git a/api/requirements.txt b/api/requirements.txt index 6871ffd8..c83c3382 100644 --- a/api/requirements.txt +++ b/api/requirements.txt @@ -2,3 +2,16 @@ ascii_colors fastapi python-multipart uvicorn +nest_asyncio +lightrag-hku +tqdm +aioboto3 +numpy +ollama +torch +openai +tenacity +transformers +tiktoken +nano_vectordb +python-dotenv \ No newline at end of file diff --git a/examples/.env.oai.example b/examples/.env.oai.example new file mode 100644 index 00000000..288d7ff3 --- /dev/null +++ b/examples/.env.oai.example @@ -0,0 +1,7 @@ +AZURE_OPENAI_API_VERSION=2024-08-01-preview +AZURE_OPENAI_DEPLOYMENT=gpt-4o +AZURE_OPENAI_API_KEY=myapikey +AZURE_OPENAI_ENDPOINT=https://myendpoint.openai.azure.com + +AZURE_EMBEDDING_DEPLOYMENT=text-embedding-3-large +AZURE_EMBEDDING_API_VERSION=2023-05-15 \ No newline at end of file diff --git a/lightrag/llm.py b/lightrag/llm.py index 190039f1..0b844204 100644 --- a/lightrag/llm.py +++ b/lightrag/llm.py @@ -140,12 +140,34 @@ async def azure_openai_complete_if_cache( if prompt is not None: messages.append({"role": "user", "content": prompt}) - response = await openai_async_client.chat.completions.create( - model=model, messages=messages, **kwargs - ) - content = response.choices[0].message.content + if "response_format" in kwargs: + response = await openai_async_client.beta.chat.completions.parse( + model=model, messages=messages, **kwargs + ) + else: + response = await openai_async_client.chat.completions.create( + model=model, messages=messages, **kwargs + ) + + if hasattr(response, "__aiter__"): - return content + async def inner(): + async for chunk in response: + if len(chunk.choices) == 0: + continue + content = chunk.choices[0].delta.content + if content is None: + continue + if r"\u" in content: + content = safe_unicode_decode(content.encode("utf-8")) + yield content + + return inner() + else: + content = response.choices[0].message.content + if r"\u" in content: + content = safe_unicode_decode(content.encode("utf-8")) + return content class BedrockError(Exception):