Merge branch 'HKUDS:main' into feature-implementation

This commit is contained in:
Gurjot Singh
2025-01-16 10:53:01 +05:30
committed by GitHub
5 changed files with 376 additions and 42 deletions

View File

@@ -26,7 +26,7 @@ This repository hosts the code of LightRAG. The structure of this code is based
</div>
## 🎉 News
- [x] [2025.01.13]🎯📢Our team has launched [MiniRAG](https://github.com/HKUDS/MiniRAG) for small models.
- [x] [2025.01.13]🎯📢Our team has released [MiniRAG](https://github.com/HKUDS/MiniRAG) making RAG simpler with small models.
- [x] [2025.01.06]🎯📢You can now [use PostgreSQL for Storage](#using-postgresql-for-storage).
- [x] [2024.12.31]🎯📢LightRAG now supports [deletion by document ID](https://github.com/HKUDS/LightRAG?tab=readme-ov-file#delete).
- [x] [2024.11.25]🎯📢LightRAG now supports seamless integration of [custom knowledge graphs](https://github.com/HKUDS/LightRAG?tab=readme-ov-file#insert-custom-kg), empowering users to enhance the system with their own domain expertise.
@@ -361,6 +361,7 @@ see test_neo4j.py for a working example.
### Using PostgreSQL for Storage
For production level scenarios you will most likely want to leverage an enterprise solution. PostgreSQL can provide a one-stop solution for you as KV store, VectorDB (pgvector) and GraphDB (apache AGE).
* PostgreSQL is lightweight,the whole binary distribution including all necessary plugins can be zipped to 40MB: Ref to [Windows Release](https://github.com/ShanGor/apache-age-windows/releases/tag/PG17%2Fv1.5.0-rc0) as it is easy to install for Linux/Mac.
* If you prefer docker, please start with this image if you are a beginner to avoid hiccups (DO read the overview): https://hub.docker.com/r/shangor/postgres-for-rag
* How to start? Ref to: [examples/lightrag_zhipu_postgres_demo.py](https://github.com/HKUDS/LightRAG/blob/main/examples/lightrag_zhipu_postgres_demo.py)
* Create index for AGE example: (Change below `dickens` to your graph name if necessary)
```

View File

@@ -0,0 +1,97 @@
"""
Sometimes you need to switch a storage solution, but you want to save LLM token and time.
This handy script helps you to copy the LLM caches from one storage solution to another.
(Not all the storage impl are supported)
"""
import asyncio
import logging
import os
from dotenv import load_dotenv
from lightrag.kg.postgres_impl import PostgreSQLDB, PGKVStorage
from lightrag.storage import JsonKVStorage
load_dotenv()
ROOT_DIR = os.environ.get("ROOT_DIR")
WORKING_DIR = f"{ROOT_DIR}/dickens"
logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO)
if not os.path.exists(WORKING_DIR):
os.mkdir(WORKING_DIR)
# AGE
os.environ["AGE_GRAPH_NAME"] = "chinese"
postgres_db = PostgreSQLDB(
config={
"host": "localhost",
"port": 15432,
"user": "rag",
"password": "rag",
"database": "r2",
}
)
async def copy_from_postgres_to_json():
await postgres_db.initdb()
from_llm_response_cache = PGKVStorage(
namespace="llm_response_cache",
global_config={"embedding_batch_num": 6},
embedding_func=None,
db=postgres_db,
)
to_llm_response_cache = JsonKVStorage(
namespace="llm_response_cache",
global_config={"working_dir": WORKING_DIR},
embedding_func=None,
)
kv = {}
for c_id in await from_llm_response_cache.all_keys():
print(f"Copying {c_id}")
workspace = c_id["workspace"]
mode = c_id["mode"]
_id = c_id["id"]
postgres_db.workspace = workspace
obj = await from_llm_response_cache.get_by_mode_and_id(mode, _id)
if mode not in kv:
kv[mode] = {}
kv[mode][_id] = obj[_id]
print(f"Object {obj}")
await to_llm_response_cache.upsert(kv)
await to_llm_response_cache.index_done_callback()
print("Mission accomplished!")
async def copy_from_json_to_postgres():
await postgres_db.initdb()
from_llm_response_cache = JsonKVStorage(
namespace="llm_response_cache",
global_config={"working_dir": WORKING_DIR},
embedding_func=None,
)
to_llm_response_cache = PGKVStorage(
namespace="llm_response_cache",
global_config={"embedding_batch_num": 6},
embedding_func=None,
db=postgres_db,
)
for mode in await from_llm_response_cache.all_keys():
print(f"Copying {mode}")
caches = await from_llm_response_cache.get_by_id(mode)
for k, v in caches.items():
item = {mode: {k: v}}
print(f"\tCopying {item}")
await to_llm_response_cache.upsert(item)
if __name__ == "__main__":
asyncio.run(copy_from_json_to_postgres())

View File

@@ -9,7 +9,7 @@ from lightrag.llm import openai_complete_if_cache, openai_embedding
from lightrag.llm import azure_openai_complete_if_cache, azure_openai_embedding
from lightrag.utils import EmbeddingFunc
from typing import Optional, List
from typing import Optional, List, Union
from enum import Enum
from pathlib import Path
import shutil
@@ -22,6 +22,7 @@ from fastapi.security import APIKeyHeader
from fastapi.middleware.cors import CORSMiddleware
from starlette.status import HTTP_403_FORBIDDEN
import pipmaster as pm
def get_default_host(binding_type: str) -> str:
@@ -174,7 +175,11 @@ def parse_args():
class DocumentManager:
"""Handles document operations and tracking"""
def __init__(self, input_dir: str, supported_extensions: tuple = (".txt", ".md")):
def __init__(
self,
input_dir: str,
supported_extensions: tuple = (".txt", ".md", ".pdf", ".docx", ".pptx"),
):
self.input_dir = Path(input_dir)
self.supported_extensions = supported_extensions
self.indexed_files = set()
@@ -289,7 +294,7 @@ def create_app(args):
+ "(With authentication)"
if api_key
else "",
version="1.0.1",
version="1.0.2",
openapi_tags=[{"name": "api"}],
)
@@ -356,6 +361,80 @@ def create_app(args):
),
)
async def index_file(file_path: Union[str, Path]) -> None:
"""Index all files inside the folder with support for multiple file formats
Args:
file_path: Path to the file to be indexed (str or Path object)
Raises:
ValueError: If file format is not supported
FileNotFoundError: If file doesn't exist
"""
if not pm.is_installed("aiofiles"):
pm.install("aiofiles")
# Convert to Path object if string
file_path = Path(file_path)
# Check if file exists
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
content = ""
# Get file extension in lowercase
ext = file_path.suffix.lower()
match ext:
case ".txt" | ".md":
# Text files handling
async with aiofiles.open(file_path, "r", encoding="utf-8") as f:
content = await f.read()
case ".pdf":
if not pm.is_installed("pypdf2"):
pm.install("pypdf2")
from pypdf2 import PdfReader
# PDF handling
reader = PdfReader(str(file_path))
content = ""
for page in reader.pages:
content += page.extract_text() + "\n"
case ".docx":
if not pm.is_installed("docx"):
pm.install("docx")
from docx import Document
# Word document handling
doc = Document(file_path)
content = "\n".join([paragraph.text for paragraph in doc.paragraphs])
case ".pptx":
if not pm.is_installed("pptx"):
pm.install("pptx")
from pptx import Presentation
# PowerPoint handling
prs = Presentation(file_path)
content = ""
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + "\n"
case _:
raise ValueError(f"Unsupported file format: {ext}")
# Insert content into RAG system
if content:
await rag.ainsert(content)
doc_manager.mark_as_indexed(file_path)
logging.info(f"Successfully indexed file: {file_path}")
else:
logging.warning(f"No content extracted from file: {file_path}")
@app.on_event("startup")
async def startup_event():
"""Index all files in input directory during startup"""
@@ -363,13 +442,7 @@ def create_app(args):
new_files = doc_manager.scan_directory()
for file_path in new_files:
try:
# Use async file reading
async with aiofiles.open(file_path, "r", encoding="utf-8") as f:
content = await f.read()
# Use the async version of insert directly
await rag.ainsert(content)
doc_manager.mark_as_indexed(file_path)
logging.info(f"Indexed file: {file_path}")
await index_file(file_path)
except Exception as e:
trace_exception(e)
logging.error(f"Error indexing file {file_path}: {str(e)}")
@@ -388,10 +461,7 @@ def create_app(args):
for file_path in new_files:
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
await rag.ainsert(content)
doc_manager.mark_as_indexed(file_path)
await index_file(file_path)
indexed_count += 1
except Exception as e:
logging.error(f"Error indexing file {file_path}: {str(e)}")
@@ -419,10 +489,7 @@ def create_app(args):
shutil.copyfileobj(file.file, buffer)
# Immediately index the uploaded file
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
await rag.ainsert(content)
doc_manager.mark_as_indexed(file_path)
await index_file(file_path)
return {
"status": "success",
@@ -498,26 +565,103 @@ def create_app(args):
dependencies=[Depends(optional_api_key)],
)
async def insert_file(file: UploadFile = File(...), description: str = Form(None)):
try:
content = await file.read()
"""Insert a file directly into the RAG system
if file.filename.endswith((".txt", ".md")):
text = content.decode("utf-8")
await rag.ainsert(text)
else:
Args:
file: Uploaded file
description: Optional description of the file
Returns:
InsertResponse: Status of the insertion operation
Raises:
HTTPException: For unsupported file types or processing errors
"""
try:
content = ""
# Get file extension in lowercase
ext = Path(file.filename).suffix.lower()
match ext:
case ".txt" | ".md":
# Text files handling
text_content = await file.read()
content = text_content.decode("utf-8")
case ".pdf":
if not pm.is_installed("pypdf2"):
pm.install("pypdf2")
from pypdf2 import PdfReader
from io import BytesIO
# Read PDF from memory
pdf_content = await file.read()
pdf_file = BytesIO(pdf_content)
reader = PdfReader(pdf_file)
content = ""
for page in reader.pages:
content += page.extract_text() + "\n"
case ".docx":
if not pm.is_installed("docx"):
pm.install("docx")
from docx import Document
from io import BytesIO
# Read DOCX from memory
docx_content = await file.read()
docx_file = BytesIO(docx_content)
doc = Document(docx_file)
content = "\n".join(
[paragraph.text for paragraph in doc.paragraphs]
)
case ".pptx":
if not pm.is_installed("pptx"):
pm.install("pptx")
from pptx import Presentation
from io import BytesIO
# Read PPTX from memory
pptx_content = await file.read()
pptx_file = BytesIO(pptx_content)
prs = Presentation(pptx_file)
content = ""
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + "\n"
case _:
raise HTTPException(
status_code=400,
detail="Unsupported file type. Only .txt and .md files are supported",
detail=f"Unsupported file type. Supported types: {doc_manager.supported_extensions}",
)
# Insert content into RAG system
if content:
# Add description if provided
if description:
content = f"{description}\n\n{content}"
await rag.ainsert(content)
logging.info(f"Successfully indexed file: {file.filename}")
return InsertResponse(
status="success",
message=f"File '{file.filename}' successfully inserted",
document_count=1,
)
else:
raise HTTPException(
status_code=400,
detail="No content could be extracted from the file",
)
except UnicodeDecodeError:
raise HTTPException(status_code=400, detail="File encoding not supported")
except Exception as e:
logging.error(f"Error processing file {file.filename}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post(
@@ -526,32 +670,110 @@ def create_app(args):
dependencies=[Depends(optional_api_key)],
)
async def insert_batch(files: List[UploadFile] = File(...)):
"""Process multiple files in batch mode
Args:
files: List of files to process
Returns:
InsertResponse: Status of the batch insertion operation
Raises:
HTTPException: For processing errors
"""
try:
inserted_count = 0
failed_files = []
for file in files:
try:
content = await file.read()
if file.filename.endswith((".txt", ".md")):
text = content.decode("utf-8")
await rag.ainsert(text)
inserted_count += 1
else:
content = ""
ext = Path(file.filename).suffix.lower()
match ext:
case ".txt" | ".md":
text_content = await file.read()
content = text_content.decode("utf-8")
case ".pdf":
if not pm.is_installed("pypdf2"):
pm.install("pypdf2")
from pypdf2 import PdfReader
from io import BytesIO
pdf_content = await file.read()
pdf_file = BytesIO(pdf_content)
reader = PdfReader(pdf_file)
for page in reader.pages:
content += page.extract_text() + "\n"
case ".docx":
if not pm.is_installed("docx"):
pm.install("docx")
from docx import Document
from io import BytesIO
docx_content = await file.read()
docx_file = BytesIO(docx_content)
doc = Document(docx_file)
content = "\n".join(
[paragraph.text for paragraph in doc.paragraphs]
)
case ".pptx":
if not pm.is_installed("pptx"):
pm.install("pptx")
from pptx import Presentation
from io import BytesIO
pptx_content = await file.read()
pptx_file = BytesIO(pptx_content)
prs = Presentation(pptx_file)
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + "\n"
case _:
failed_files.append(f"{file.filename} (unsupported type)")
continue
if content:
await rag.ainsert(content)
inserted_count += 1
logging.info(f"Successfully indexed file: {file.filename}")
else:
failed_files.append(f"{file.filename} (no content extracted)")
except UnicodeDecodeError:
failed_files.append(f"{file.filename} (encoding error)")
except Exception as e:
failed_files.append(f"{file.filename} ({str(e)})")
logging.error(f"Error processing file {file.filename}: {str(e)}")
status_message = f"Successfully inserted {inserted_count} documents"
# Prepare status message
if inserted_count == len(files):
status = "success"
status_message = f"Successfully inserted all {inserted_count} documents"
elif inserted_count > 0:
status = "partial_success"
status_message = f"Successfully inserted {inserted_count} out of {len(files)} documents"
if failed_files:
status_message += f". Failed files: {', '.join(failed_files)}"
else:
status = "failure"
status_message = "No documents were successfully inserted"
if failed_files:
status_message += f". Failed files: {', '.join(failed_files)}"
return InsertResponse(
status="success" if inserted_count > 0 else "partial_success",
status=status,
message=status_message,
document_count=len(files),
document_count=inserted_count,
)
except Exception as e:
logging.error(f"Batch processing error: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.delete(

View File

@@ -7,6 +7,7 @@ nest_asyncio
numpy
ollama
openai
pipmaster
python-dotenv
python-multipart
tenacity

View File

@@ -231,6 +231,16 @@ class PGKVStorage(BaseKVStorage):
else:
return None
async def all_keys(self) -> list[dict]:
if "llm_response_cache" == self.namespace:
sql = "select workspace,mode,id from lightrag_llm_cache"
res = await self.db.query(sql, multirows=True)
return res
else:
logger.error(
f"all_keys is only implemented for llm_response_cache, not for {self.namespace}"
)
async def filter_keys(self, keys: List[str]) -> Set[str]:
"""Filter out duplicated content"""
sql = SQL_TEMPLATES["filter_keys"].format(
@@ -412,7 +422,10 @@ class PGDocStatusStorage(DocStatusStorage):
async def filter_keys(self, data: list[str]) -> set[str]:
"""Return keys that don't exist in storage"""
sql = f"SELECT id FROM LIGHTRAG_DOC_STATUS WHERE workspace=$1 AND id IN ({",".join([f"'{_id}'" for _id in data])})"
keys = ",".join([f"'{_id}'" for _id in data])
sql = (
f"SELECT id FROM LIGHTRAG_DOC_STATUS WHERE workspace=$1 AND id IN ({keys})"
)
result = await self.db.query(sql, {"workspace": self.db.workspace}, True)
# The result is like [{'id': 'id1'}, {'id': 'id2'}, ...].
if result is None: