Merge branch 'main' into postgres-improve-entities-relation-save-process

This commit is contained in:
zrguo
2025-03-17 15:39:40 +08:00
committed by GitHub
56 changed files with 2082 additions and 897 deletions

View File

@@ -1,5 +1,5 @@
from .lightrag import LightRAG as LightRAG, QueryParam as QueryParam
__version__ = "1.2.5"
__version__ = "1.2.6"
__author__ = "Zirui Guo"
__url__ = "https://github.com/HKUDS/LightRAG"

View File

@@ -224,7 +224,7 @@ LightRAG supports binding to various LLM/Embedding backends:
Use environment variables `LLM_BINDING` or CLI argument `--llm-binding` to select LLM backend type. Use environment variables `EMBEDDING_BINDING` or CLI argument `--embedding-binding` to select LLM backend type.
### Entity Extraction Configuration
* ENABLE_LLM_CACHE_FOR_EXTRACT: Enable LLM cache for entity extraction (default: false)
* ENABLE_LLM_CACHE_FOR_EXTRACT: Enable LLM cache for entity extraction (default: true)
It's very common to set `ENABLE_LLM_CACHE_FOR_EXTRACT` to true for test environment to reduce the cost of LLM calls.

View File

@@ -141,7 +141,7 @@ Start the LightRAG server using specified options:
lightrag-server --port 9621 --key sk-somepassword --kv-storage PGKVStorage --graph-storage PGGraphStorage --vector-storage PGVectorStorage --doc-status-storage PGDocStatusStorage
```
Replace `the-port-number` with your desired port number (default is 9621) and `your-secret-key` with a secure key.
Replace the `port` number with your desired port number (default is 9621) and `your-secret-key` with a secure key.
## Conclusion

View File

@@ -59,7 +59,7 @@ logconfig_dict = {
},
"filters": {
"path_filter": {
"()": "lightrag.api.lightrag_server.LightragPathFilter",
"()": "lightrag.utils.LightragPathFilter",
},
},
"loggers": {

View File

@@ -55,41 +55,6 @@ config = configparser.ConfigParser()
config.read("config.ini")
class LightragPathFilter(logging.Filter):
"""Filter for lightrag logger to filter out frequent path access logs"""
def __init__(self):
super().__init__()
# Define paths to be filtered
self.filtered_paths = ["/documents", "/health", "/webui/"]
def filter(self, record):
try:
# Check if record has the required attributes for an access log
if not hasattr(record, "args") or not isinstance(record.args, tuple):
return True
if len(record.args) < 5:
return True
# Extract method, path and status from the record args
method = record.args[1]
path = record.args[2]
status = record.args[4]
# Filter out successful GET requests to filtered paths
if (
method == "GET"
and (status == 200 or status == 304)
and path in self.filtered_paths
):
return False
return True
except Exception:
# In case of any error, let the message through
return True
def create_app(args):
# Setup logging
logger.setLevel(args.log_level)
@@ -177,6 +142,9 @@ def create_app(args):
if api_key
else "",
version=__api_version__,
openapi_url="/openapi.json", # Explicitly set OpenAPI schema URL
docs_url="/docs", # Explicitly set docs URL
redoc_url="/redoc", # Explicitly set redoc URL
openapi_tags=[{"name": "api"}],
lifespan=lifespan,
)
@@ -423,12 +391,24 @@ def create_app(args):
"update_status": update_status,
}
# Custom StaticFiles class to prevent caching of HTML files
class NoCacheStaticFiles(StaticFiles):
async def get_response(self, path: str, scope):
response = await super().get_response(path, scope)
if path.endswith(".html"):
response.headers["Cache-Control"] = (
"no-cache, no-store, must-revalidate"
)
response.headers["Pragma"] = "no-cache"
response.headers["Expires"] = "0"
return response
# Webui mount webui/index.html
static_dir = Path(__file__).parent / "webui"
static_dir.mkdir(exist_ok=True)
app.mount(
"/webui",
StaticFiles(directory=static_dir, html=True, check_dir=True),
NoCacheStaticFiles(directory=static_dir, html=True, check_dir=True),
name="webui",
)
@@ -516,7 +496,7 @@ def configure_logging():
},
"filters": {
"path_filter": {
"()": "lightrag.api.lightrag_server.LightragPathFilter",
"()": "lightrag.utils.LightragPathFilter",
},
},
}

View File

@@ -3,6 +3,7 @@ ascii_colors
asyncpg
distro
fastapi
graspologic>=3.4.1
httpcore
httpx
jiter

View File

@@ -364,7 +364,7 @@ def parse_args(is_uvicorn_mode: bool = False) -> argparse.Namespace:
# Inject LLM cache configuration
args.enable_llm_cache_for_extract = get_env_value(
"ENABLE_LLM_CACHE_FOR_EXTRACT", False, bool
"ENABLE_LLM_CACHE_FOR_EXTRACT", True, bool
)
# Select Document loading tool (DOCLING, DEFAULT)

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -2,11 +2,14 @@
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta http-equiv="Cache-Control" content="no-cache, no-store, must-revalidate" />
<meta http-equiv="Pragma" content="no-cache" />
<meta http-equiv="Expires" content="0" />
<link rel="icon" type="image/svg+xml" href="./logo.png" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Lightrag</title>
<script type="module" crossorigin src="./assets/index-BlVvSIic.js"></script>
<link rel="stylesheet" crossorigin href="./assets/index-CH-3l4_Z.css">
<script type="module" crossorigin src="./assets/index-DwcJE583.js"></script>
<link rel="stylesheet" crossorigin href="./assets/index-BV5s8k-a.css">
</head>
<body>
<div id="root"></div>

View File

@@ -767,7 +767,7 @@ class PGDocStatusStorage(DocStatusStorage):
result = await self.db.query(sql, params, True)
docs_by_status = {
element["id"]: DocProcessingStatus(
content=result[0]["content"],
content=element["content"],
content_summary=element["content_summary"],
content_length=element["content_length"],
status=element["status"],
@@ -1572,7 +1572,7 @@ TABLES = {
content_vector VECTOR,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP,
chunk_ids VARCHAR(255)[] NULL,
chunk_id TEXT NULL,
CONSTRAINT LIGHTRAG_VDB_ENTITY_PK PRIMARY KEY (workspace, id)
)"""
},
@@ -1586,7 +1586,7 @@ TABLES = {
content_vector VECTOR,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP,
chunk_ids VARCHAR(255)[] NULL,
chunk_id TEXT NULL,
CONSTRAINT LIGHTRAG_VDB_RELATION_PK PRIMARY KEY (workspace, id)
)"""
},

View File

@@ -3,11 +3,14 @@ from __future__ import annotations
import asyncio
import configparser
import os
import csv
import warnings
from dataclasses import asdict, dataclass, field
from datetime import datetime
from functools import partial
from typing import Any, AsyncIterator, Callable, Iterator, cast, final
from typing import Any, AsyncIterator, Callable, Iterator, cast, final, Literal
import pandas as pd
from lightrag.kg import (
STORAGE_ENV_REQUIREMENTS,
@@ -211,7 +214,7 @@ class LightRAG:
llm_model_max_token_size: int = field(default=int(os.getenv("MAX_TOKENS", 32768)))
"""Maximum number of tokens allowed per LLM response."""
llm_model_max_async: int = field(default=int(os.getenv("MAX_ASYNC", 16)))
llm_model_max_async: int = field(default=int(os.getenv("MAX_ASYNC", 4)))
"""Maximum number of concurrent LLM calls."""
llm_model_kwargs: dict[str, Any] = field(default_factory=dict)
@@ -235,7 +238,7 @@ class LightRAG:
# Extensions
# ---
max_parallel_insert: int = field(default=int(os.getenv("MAX_PARALLEL_INSERT", 20)))
max_parallel_insert: int = field(default=int(os.getenv("MAX_PARALLEL_INSERT", 2)))
"""Maximum number of parallel insert operations."""
addon_params: dict[str, Any] = field(
@@ -550,6 +553,7 @@ class LightRAG:
Args:
input: Single document string or list of document strings
split_by_character: if split_by_character is not None, split the string by character, if chunk longer than
chunk_token_size, it will be split again by token size.
split_by_character_only: if split_by_character_only is True, split the string by character only, when
split_by_character is None, this parameter is ignored.
ids: single string of the document ID or list of unique document IDs, if not provided, MD5 hash IDs will be generated
@@ -571,6 +575,7 @@ class LightRAG:
Args:
input: Single document string or list of document strings
split_by_character: if split_by_character is not None, split the string by character, if chunk longer than
chunk_token_size, it will be split again by token size.
split_by_character_only: if split_by_character_only is True, split the string by character only, when
split_by_character is None, this parameter is ignored.
ids: list of unique document IDs, if not provided, MD5 hash IDs will be generated
@@ -764,7 +769,6 @@ class LightRAG:
async with pipeline_status_lock:
# Ensure only one worker is processing documents
if not pipeline_status.get("busy", False):
# 先检查是否有需要处理的文档
processing_docs, failed_docs, pending_docs = await asyncio.gather(
self.doc_status.get_docs_by_status(DocStatus.PROCESSING),
self.doc_status.get_docs_by_status(DocStatus.FAILED),
@@ -776,12 +780,10 @@ class LightRAG:
to_process_docs.update(failed_docs)
to_process_docs.update(pending_docs)
# 如果没有需要处理的文档,直接返回,保留 pipeline_status 中的内容不变
if not to_process_docs:
logger.info("No documents to process")
return
# 有文档需要处理,更新 pipeline_status
pipeline_status.update(
{
"busy": True,
@@ -820,7 +822,7 @@ class LightRAG:
for i in range(0, len(to_process_docs), self.max_parallel_insert)
]
log_message = f"Number of batches to process: {len(docs_batches)}."
log_message = f"Processing {len(to_process_docs)} document(s) in {len(docs_batches)} batches"
logger.info(log_message)
# Update pipeline status with current batch information
@@ -829,140 +831,149 @@ class LightRAG:
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
batches: list[Any] = []
# 3. iterate over batches
for batch_idx, docs_batch in enumerate(docs_batches):
# Update current batch in pipeline status (directly, as it's atomic)
pipeline_status["cur_batch"] += 1
async def batch(
batch_idx: int,
docs_batch: list[tuple[str, DocProcessingStatus]],
size_batch: int,
) -> None:
log_message = (
f"Start processing batch {batch_idx + 1} of {size_batch}."
)
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# 4. iterate over batch
for doc_id_processing_status in docs_batch:
doc_id, status_doc = doc_id_processing_status
# Generate chunks from document
chunks: dict[str, Any] = {
compute_mdhash_id(dp["content"], prefix="chunk-"): {
**dp,
"full_doc_id": doc_id,
}
for dp in self.chunking_func(
status_doc.content,
split_by_character,
split_by_character_only,
self.chunk_overlap_token_size,
self.chunk_token_size,
self.tiktoken_model_name,
)
async def process_document(
doc_id: str,
status_doc: DocProcessingStatus,
split_by_character: str | None,
split_by_character_only: bool,
pipeline_status: dict,
pipeline_status_lock: asyncio.Lock,
) -> None:
"""Process single document"""
try:
# Generate chunks from document
chunks: dict[str, Any] = {
compute_mdhash_id(dp["content"], prefix="chunk-"): {
**dp,
"full_doc_id": doc_id,
}
# Process document (text chunks and full docs) in parallel
# Create tasks with references for potential cancellation
doc_status_task = asyncio.create_task(
self.doc_status.upsert(
{
doc_id: {
"status": DocStatus.PROCESSING,
"updated_at": datetime.now().isoformat(),
"content": status_doc.content,
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
}
for dp in self.chunking_func(
status_doc.content,
split_by_character,
split_by_character_only,
self.chunk_overlap_token_size,
self.chunk_token_size,
self.tiktoken_model_name,
)
}
# Process document (text chunks and full docs) in parallel
# Create tasks with references for potential cancellation
doc_status_task = asyncio.create_task(
self.doc_status.upsert(
{
doc_id: {
"status": DocStatus.PROCESSING,
"updated_at": datetime.now().isoformat(),
"content": status_doc.content,
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
}
)
}
)
chunks_vdb_task = asyncio.create_task(
self.chunks_vdb.upsert(chunks)
)
chunks_vdb_task = asyncio.create_task(
self.chunks_vdb.upsert(chunks)
)
entity_relation_task = asyncio.create_task(
self._process_entity_relation_graph(
chunks, pipeline_status, pipeline_status_lock
)
entity_relation_task = asyncio.create_task(
self._process_entity_relation_graph(
chunks, pipeline_status, pipeline_status_lock
)
)
full_docs_task = asyncio.create_task(
self.full_docs.upsert(
{doc_id: {"content": status_doc.content}}
)
full_docs_task = asyncio.create_task(
self.full_docs.upsert(
{doc_id: {"content": status_doc.content}}
)
)
text_chunks_task = asyncio.create_task(
self.text_chunks.upsert(chunks)
)
tasks = [
doc_status_task,
)
text_chunks_task = asyncio.create_task(
self.text_chunks.upsert(chunks)
)
tasks = [
doc_status_task,
chunks_vdb_task,
entity_relation_task,
full_docs_task,
text_chunks_task,
]
await asyncio.gather(*tasks)
await self.doc_status.upsert(
{
doc_id: {
"status": DocStatus.PROCESSED,
"chunks_count": len(chunks),
"content": status_doc.content,
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now().isoformat(),
}
}
)
except Exception as e:
# Log error and update pipeline status
error_msg = f"Failed to process document {doc_id}: {str(e)}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(error_msg)
# Cancel other tasks as they are no longer meaningful
for task in [
chunks_vdb_task,
entity_relation_task,
full_docs_task,
text_chunks_task,
]
try:
await asyncio.gather(*tasks)
await self.doc_status.upsert(
{
doc_id: {
"status": DocStatus.PROCESSED,
"chunks_count": len(chunks),
"content": status_doc.content,
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now().isoformat(),
}
}
)
except Exception as e:
# Log error and update pipeline status
error_msg = (
f"Failed to process document {doc_id}: {str(e)}"
)
logger.error(error_msg)
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(error_msg)
# Cancel other tasks as they are no longer meaningful
for task in [
chunks_vdb_task,
entity_relation_task,
full_docs_task,
text_chunks_task,
]:
if not task.done():
task.cancel()
# Update document status to failed
await self.doc_status.upsert(
{
doc_id: {
"status": DocStatus.FAILED,
"error": str(e),
"content": status_doc.content,
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now().isoformat(),
}
}
)
continue
log_message = (
f"Completed batch {batch_idx + 1} of {len(docs_batches)}."
]:
if not task.done():
task.cancel()
# Update document status to failed
await self.doc_status.upsert(
{
doc_id: {
"status": DocStatus.FAILED,
"error": str(e),
"content": status_doc.content,
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now().isoformat(),
}
}
)
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
batches.append(batch(batch_idx, docs_batch, len(docs_batches)))
# 3. iterate over batches
total_batches = len(docs_batches)
for batch_idx, docs_batch in enumerate(docs_batches):
current_batch = batch_idx + 1
log_message = (
f"Start processing batch {current_batch} of {total_batches}."
)
logger.info(log_message)
pipeline_status["cur_batch"] = current_batch
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
await asyncio.gather(*batches)
await self._insert_done()
doc_tasks = []
for doc_id, status_doc in docs_batch:
doc_tasks.append(
process_document(
doc_id,
status_doc,
split_by_character,
split_by_character_only,
pipeline_status,
pipeline_status_lock,
)
)
# Process documents in one batch parallelly
await asyncio.gather(*doc_tasks)
await self._insert_done()
log_message = f"Completed batch {current_batch} of {total_batches}."
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# Check if there's a pending request to process more documents (with lock)
has_pending_request = False
@@ -1037,7 +1048,7 @@ class LightRAG:
]
await asyncio.gather(*tasks)
log_message = "All Insert done"
log_message = "In memory DB persist to disk"
logger.info(log_message)
if pipeline_status is not None and pipeline_status_lock is not None:
@@ -1111,6 +1122,7 @@ class LightRAG:
# Prepare node data
node_data: dict[str, str] = {
"entity_id": entity_name,
"entity_type": entity_type,
"description": description,
"source_id": source_id,
@@ -1148,6 +1160,7 @@ class LightRAG:
await self.chunk_entity_relation_graph.upsert_node(
need_insert_id,
node_data={
"entity_id": need_insert_id,
"source_id": source_id,
"description": "UNKNOWN",
"entity_type": "UNKNOWN",
@@ -1838,9 +1851,10 @@ class LightRAG:
"""
try:
# 1. Get current entity information
node_data = await self.chunk_entity_relation_graph.get_node(entity_name)
if not node_data:
node_exists = await self.chunk_entity_relation_graph.has_node(entity_name)
if not node_exists:
raise ValueError(f"Entity '{entity_name}' does not exist")
node_data = await self.chunk_entity_relation_graph.get_node(entity_name)
# Check if entity is being renamed
new_entity_name = updated_data.get("entity_name", entity_name)
@@ -1853,7 +1867,7 @@ class LightRAG:
"Entity renaming is not allowed. Set allow_rename=True to enable this feature"
)
existing_node = await self.chunk_entity_relation_graph.get_node(
existing_node = await self.chunk_entity_relation_graph.has_node(
new_entity_name
)
if existing_node:
@@ -2035,14 +2049,16 @@ class LightRAG:
"""
try:
# 1. Get current relation information
edge_data = await self.chunk_entity_relation_graph.get_edge(
edge_exists = await self.chunk_entity_relation_graph.has_edge(
source_entity, target_entity
)
if not edge_data:
if not edge_exists:
raise ValueError(
f"Relation from '{source_entity}' to '{target_entity}' does not exist"
)
edge_data = await self.chunk_entity_relation_graph.get_edge(
source_entity, target_entity
)
# Important: First delete the old relation record from the vector database
old_relation_id = compute_mdhash_id(
source_entity + target_entity, prefix="rel-"
@@ -2151,12 +2167,13 @@ class LightRAG:
"""
try:
# Check if entity already exists
existing_node = await self.chunk_entity_relation_graph.get_node(entity_name)
existing_node = await self.chunk_entity_relation_graph.has_node(entity_name)
if existing_node:
raise ValueError(f"Entity '{entity_name}' already exists")
# Prepare node data with defaults if missing
node_data = {
"entity_id": entity_name,
"entity_type": entity_data.get("entity_type", "UNKNOWN"),
"description": entity_data.get("description", ""),
"source_id": entity_data.get("source_id", "manual"),
@@ -2244,7 +2261,7 @@ class LightRAG:
raise ValueError(f"Target entity '{target_entity}' does not exist")
# Check if relation already exists
existing_edge = await self.chunk_entity_relation_graph.get_edge(
existing_edge = await self.chunk_entity_relation_graph.has_edge(
source_entity, target_entity
)
if existing_edge:
@@ -2377,19 +2394,22 @@ class LightRAG:
# 1. Check if all source entities exist
source_entities_data = {}
for entity_name in source_entities:
node_data = await self.chunk_entity_relation_graph.get_node(entity_name)
if not node_data:
node_exists = await self.chunk_entity_relation_graph.has_node(
entity_name
)
if not node_exists:
raise ValueError(f"Source entity '{entity_name}' does not exist")
node_data = await self.chunk_entity_relation_graph.get_node(entity_name)
source_entities_data[entity_name] = node_data
# 2. Check if target entity exists and get its data if it does
target_exists = await self.chunk_entity_relation_graph.has_node(
target_entity
)
target_entity_data = {}
existing_target_entity_data = {}
if target_exists:
target_entity_data = await self.chunk_entity_relation_graph.get_node(
target_entity
existing_target_entity_data = (
await self.chunk_entity_relation_graph.get_node(target_entity)
)
logger.info(
f"Target entity '{target_entity}' already exists, will merge data"
@@ -2398,7 +2418,7 @@ class LightRAG:
# 3. Merge entity data
merged_entity_data = self._merge_entity_attributes(
list(source_entities_data.values())
+ ([target_entity_data] if target_exists else []),
+ ([existing_target_entity_data] if target_exists else []),
merge_strategy,
)
@@ -2592,6 +2612,322 @@ class LightRAG:
logger.error(f"Error merging entities: {e}")
raise
async def aexport_data(
self,
output_path: str,
file_format: Literal["csv", "excel", "md", "txt"] = "csv",
include_vector_data: bool = False,
) -> None:
"""
Asynchronously exports all entities, relations, and relationships to various formats.
Args:
output_path: The path to the output file (including extension).
file_format: Output format - "csv", "excel", "md", "txt".
- csv: Comma-separated values file
- excel: Microsoft Excel file with multiple sheets
- md: Markdown tables
- txt: Plain text formatted output
- table: Print formatted tables to console
include_vector_data: Whether to include data from the vector database.
"""
# Collect data
entities_data = []
relations_data = []
relationships_data = []
# --- Entities ---
all_entities = await self.chunk_entity_relation_graph.get_all_labels()
for entity_name in all_entities:
entity_info = await self.get_entity_info(
entity_name, include_vector_data=include_vector_data
)
entity_row = {
"entity_name": entity_name,
"source_id": entity_info["source_id"],
"graph_data": str(
entity_info["graph_data"]
), # Convert to string to ensure compatibility
}
if include_vector_data and "vector_data" in entity_info:
entity_row["vector_data"] = str(entity_info["vector_data"])
entities_data.append(entity_row)
# --- Relations ---
for src_entity in all_entities:
for tgt_entity in all_entities:
if src_entity == tgt_entity:
continue
edge_exists = await self.chunk_entity_relation_graph.has_edge(
src_entity, tgt_entity
)
if edge_exists:
relation_info = await self.get_relation_info(
src_entity, tgt_entity, include_vector_data=include_vector_data
)
relation_row = {
"src_entity": src_entity,
"tgt_entity": tgt_entity,
"source_id": relation_info["source_id"],
"graph_data": str(
relation_info["graph_data"]
), # Convert to string
}
if include_vector_data and "vector_data" in relation_info:
relation_row["vector_data"] = str(relation_info["vector_data"])
relations_data.append(relation_row)
# --- Relationships (from VectorDB) ---
all_relationships = await self.relationships_vdb.client_storage
for rel in all_relationships["data"]:
relationships_data.append(
{
"relationship_id": rel["__id__"],
"data": str(rel), # Convert to string for compatibility
}
)
# Export based on format
if file_format == "csv":
# CSV export
with open(output_path, "w", newline="", encoding="utf-8") as csvfile:
# Entities
if entities_data:
csvfile.write("# ENTITIES\n")
writer = csv.DictWriter(csvfile, fieldnames=entities_data[0].keys())
writer.writeheader()
writer.writerows(entities_data)
csvfile.write("\n\n")
# Relations
if relations_data:
csvfile.write("# RELATIONS\n")
writer = csv.DictWriter(
csvfile, fieldnames=relations_data[0].keys()
)
writer.writeheader()
writer.writerows(relations_data)
csvfile.write("\n\n")
# Relationships
if relationships_data:
csvfile.write("# RELATIONSHIPS\n")
writer = csv.DictWriter(
csvfile, fieldnames=relationships_data[0].keys()
)
writer.writeheader()
writer.writerows(relationships_data)
elif file_format == "excel":
# Excel export
entities_df = (
pd.DataFrame(entities_data) if entities_data else pd.DataFrame()
)
relations_df = (
pd.DataFrame(relations_data) if relations_data else pd.DataFrame()
)
relationships_df = (
pd.DataFrame(relationships_data)
if relationships_data
else pd.DataFrame()
)
with pd.ExcelWriter(output_path, engine="xlsxwriter") as writer:
if not entities_df.empty:
entities_df.to_excel(writer, sheet_name="Entities", index=False)
if not relations_df.empty:
relations_df.to_excel(writer, sheet_name="Relations", index=False)
if not relationships_df.empty:
relationships_df.to_excel(
writer, sheet_name="Relationships", index=False
)
elif file_format == "md":
# Markdown export
with open(output_path, "w", encoding="utf-8") as mdfile:
mdfile.write("# LightRAG Data Export\n\n")
# Entities
mdfile.write("## Entities\n\n")
if entities_data:
# Write header
mdfile.write("| " + " | ".join(entities_data[0].keys()) + " |\n")
mdfile.write(
"| "
+ " | ".join(["---"] * len(entities_data[0].keys()))
+ " |\n"
)
# Write rows
for entity in entities_data:
mdfile.write(
"| " + " | ".join(str(v) for v in entity.values()) + " |\n"
)
mdfile.write("\n\n")
else:
mdfile.write("*No entity data available*\n\n")
# Relations
mdfile.write("## Relations\n\n")
if relations_data:
# Write header
mdfile.write("| " + " | ".join(relations_data[0].keys()) + " |\n")
mdfile.write(
"| "
+ " | ".join(["---"] * len(relations_data[0].keys()))
+ " |\n"
)
# Write rows
for relation in relations_data:
mdfile.write(
"| "
+ " | ".join(str(v) for v in relation.values())
+ " |\n"
)
mdfile.write("\n\n")
else:
mdfile.write("*No relation data available*\n\n")
# Relationships
mdfile.write("## Relationships\n\n")
if relationships_data:
# Write header
mdfile.write(
"| " + " | ".join(relationships_data[0].keys()) + " |\n"
)
mdfile.write(
"| "
+ " | ".join(["---"] * len(relationships_data[0].keys()))
+ " |\n"
)
# Write rows
for relationship in relationships_data:
mdfile.write(
"| "
+ " | ".join(str(v) for v in relationship.values())
+ " |\n"
)
else:
mdfile.write("*No relationship data available*\n\n")
elif file_format == "txt":
# Plain text export
with open(output_path, "w", encoding="utf-8") as txtfile:
txtfile.write("LIGHTRAG DATA EXPORT\n")
txtfile.write("=" * 80 + "\n\n")
# Entities
txtfile.write("ENTITIES\n")
txtfile.write("-" * 80 + "\n")
if entities_data:
# Create fixed width columns
col_widths = {
k: max(len(k), max(len(str(e[k])) for e in entities_data))
for k in entities_data[0]
}
header = " ".join(k.ljust(col_widths[k]) for k in entities_data[0])
txtfile.write(header + "\n")
txtfile.write("-" * len(header) + "\n")
# Write rows
for entity in entities_data:
row = " ".join(
str(v).ljust(col_widths[k]) for k, v in entity.items()
)
txtfile.write(row + "\n")
txtfile.write("\n\n")
else:
txtfile.write("No entity data available\n\n")
# Relations
txtfile.write("RELATIONS\n")
txtfile.write("-" * 80 + "\n")
if relations_data:
# Create fixed width columns
col_widths = {
k: max(len(k), max(len(str(r[k])) for r in relations_data))
for k in relations_data[0]
}
header = " ".join(
k.ljust(col_widths[k]) for k in relations_data[0]
)
txtfile.write(header + "\n")
txtfile.write("-" * len(header) + "\n")
# Write rows
for relation in relations_data:
row = " ".join(
str(v).ljust(col_widths[k]) for k, v in relation.items()
)
txtfile.write(row + "\n")
txtfile.write("\n\n")
else:
txtfile.write("No relation data available\n\n")
# Relationships
txtfile.write("RELATIONSHIPS\n")
txtfile.write("-" * 80 + "\n")
if relationships_data:
# Create fixed width columns
col_widths = {
k: max(len(k), max(len(str(r[k])) for r in relationships_data))
for k in relationships_data[0]
}
header = " ".join(
k.ljust(col_widths[k]) for k in relationships_data[0]
)
txtfile.write(header + "\n")
txtfile.write("-" * len(header) + "\n")
# Write rows
for relationship in relationships_data:
row = " ".join(
str(v).ljust(col_widths[k]) for k, v in relationship.items()
)
txtfile.write(row + "\n")
else:
txtfile.write("No relationship data available\n\n")
else:
raise ValueError(
f"Unsupported file format: {file_format}. "
f"Choose from: csv, excel, md, txt"
)
if file_format is not None:
print(f"Data exported to: {output_path} with format: {file_format}")
else:
print("Data displayed as table format")
def export_data(
self,
output_path: str,
file_format: Literal["csv", "excel", "md", "txt"] = "csv",
include_vector_data: bool = False,
) -> None:
"""
Synchronously exports all entities, relations, and relationships to various formats.
Args:
output_path: The path to the output file (including extension).
file_format: Output format - "csv", "excel", "md", "txt".
- csv: Comma-separated values file
- excel: Microsoft Excel file with multiple sheets
- md: Markdown tables
- txt: Plain text formatted output
- table: Print formatted tables to console
include_vector_data: Whether to include data from the vector database.
"""
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(
self.aexport_data(output_path, file_format, include_vector_data)
)
def merge_entities(
self,
source_entities: list[str],

View File

@@ -76,6 +76,7 @@ class LightragPathFilter(logging.Filter):
super().__init__()
# Define paths to be filtered
self.filtered_paths = ["/documents", "/health", "/webui/"]
# self.filtered_paths = ["/health", "/webui/"]
def filter(self, record):
try: