Merge branch 'main' into fix--postgres-impl

This commit is contained in:
jofoks
2025-03-13 16:16:25 -07:00
24 changed files with 635 additions and 179 deletions

View File

@@ -1,5 +1,5 @@
from .lightrag import LightRAG as LightRAG, QueryParam as QueryParam
__version__ = "1.2.5"
__version__ = "1.2.6"
__author__ = "Zirui Guo"
__url__ = "https://github.com/HKUDS/LightRAG"

View File

@@ -59,7 +59,7 @@ logconfig_dict = {
},
"filters": {
"path_filter": {
"()": "lightrag.api.lightrag_server.LightragPathFilter",
"()": "lightrag.utils.LightragPathFilter",
},
},
"loggers": {

View File

@@ -55,41 +55,6 @@ config = configparser.ConfigParser()
config.read("config.ini")
class LightragPathFilter(logging.Filter):
"""Filter for lightrag logger to filter out frequent path access logs"""
def __init__(self):
super().__init__()
# Define paths to be filtered
self.filtered_paths = ["/documents", "/health", "/webui/"]
def filter(self, record):
try:
# Check if record has the required attributes for an access log
if not hasattr(record, "args") or not isinstance(record.args, tuple):
return True
if len(record.args) < 5:
return True
# Extract method, path and status from the record args
method = record.args[1]
path = record.args[2]
status = record.args[4]
# Filter out successful GET requests to filtered paths
if (
method == "GET"
and (status == 200 or status == 304)
and path in self.filtered_paths
):
return False
return True
except Exception:
# In case of any error, let the message through
return True
def create_app(args):
# Setup logging
logger.setLevel(args.log_level)
@@ -177,6 +142,9 @@ def create_app(args):
if api_key
else "",
version=__api_version__,
openapi_url="/openapi.json", # Explicitly set OpenAPI schema URL
docs_url="/docs", # Explicitly set docs URL
redoc_url="/redoc", # Explicitly set redoc URL
openapi_tags=[{"name": "api"}],
lifespan=lifespan,
)
@@ -516,7 +484,7 @@ def configure_logging():
},
"filters": {
"path_filter": {
"()": "lightrag.api.lightrag_server.LightragPathFilter",
"()": "lightrag.utils.LightragPathFilter",
},
},
}

View File

@@ -99,6 +99,37 @@ class DocsStatusesResponse(BaseModel):
statuses: Dict[DocStatus, List[DocStatusResponse]] = {}
class PipelineStatusResponse(BaseModel):
"""Response model for pipeline status
Attributes:
autoscanned: Whether auto-scan has started
busy: Whether the pipeline is currently busy
job_name: Current job name (e.g., indexing files/indexing texts)
job_start: Job start time as ISO format string (optional)
docs: Total number of documents to be indexed
batchs: Number of batches for processing documents
cur_batch: Current processing batch
request_pending: Flag for pending request for processing
latest_message: Latest message from pipeline processing
history_messages: List of history messages
"""
autoscanned: bool = False
busy: bool = False
job_name: str = "Default Job"
job_start: Optional[str] = None
docs: int = 0
batchs: int = 0
cur_batch: int = 0
request_pending: bool = False
latest_message: str = ""
history_messages: Optional[List[str]] = None
class Config:
extra = "allow" # Allow additional fields from the pipeline status
class DocumentManager:
def __init__(
self,
@@ -247,7 +278,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool:
if global_args["main_args"].document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore
pm.install("docling")
from docling.document_converter import DocumentConverter
from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter()
result = converter.convert(file_path)
@@ -266,7 +297,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool:
if global_args["main_args"].document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore
pm.install("docling")
from docling.document_converter import DocumentConverter
from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter()
result = converter.convert(file_path)
@@ -286,7 +317,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool:
if global_args["main_args"].document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore
pm.install("docling")
from docling.document_converter import DocumentConverter
from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter()
result = converter.convert(file_path)
@@ -307,7 +338,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool:
if global_args["main_args"].document_loading_engine == "DOCLING":
if not pm.is_installed("docling"): # type: ignore
pm.install("docling")
from docling.document_converter import DocumentConverter
from docling.document_converter import DocumentConverter # type: ignore
converter = DocumentConverter()
result = converter.convert(file_path)
@@ -718,17 +749,33 @@ def create_document_routes(
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@router.get("/pipeline_status", dependencies=[Depends(optional_api_key)])
async def get_pipeline_status():
@router.get(
"/pipeline_status",
dependencies=[Depends(optional_api_key)],
response_model=PipelineStatusResponse,
)
async def get_pipeline_status() -> PipelineStatusResponse:
"""
Get the current status of the document indexing pipeline.
This endpoint returns information about the current state of the document processing pipeline,
including whether it's busy, the current job name, when it started, how many documents
are being processed, how many batches there are, and which batch is currently being processed.
including the processing status, progress information, and history messages.
Returns:
dict: A dictionary containing the pipeline status information
PipelineStatusResponse: A response object containing:
- autoscanned (bool): Whether auto-scan has started
- busy (bool): Whether the pipeline is currently busy
- job_name (str): Current job name (e.g., indexing files/indexing texts)
- job_start (str, optional): Job start time as ISO format string
- docs (int): Total number of documents to be indexed
- batchs (int): Number of batches for processing documents
- cur_batch (int): Current processing batch
- request_pending (bool): Flag for pending request for processing
- latest_message (str): Latest message from pipeline processing
- history_messages (List[str], optional): List of history messages
Raises:
HTTPException: If an error occurs while retrieving pipeline status (500)
"""
try:
from lightrag.kg.shared_storage import get_namespace_data
@@ -746,7 +793,7 @@ def create_document_routes(
if status_dict.get("job_start"):
status_dict["job_start"] = str(status_dict["job_start"])
return status_dict
return PipelineStatusResponse(**status_dict)
except Exception as e:
logger.error(f"Error getting pipeline status: {str(e)}")
logger.error(traceback.format_exc())

View File

@@ -5,7 +5,7 @@
<link rel="icon" type="image/svg+xml" href="./logo.png" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Lightrag</title>
<script type="module" crossorigin src="./assets/index-CJz72b6Q.js"></script>
<script type="module" crossorigin src="./assets/index-BlVvSIic.js"></script>
<link rel="stylesheet" crossorigin href="./assets/index-CH-3l4_Z.css">
</head>
<body>

View File

@@ -156,7 +156,9 @@ class ChromaVectorDBStorage(BaseVectorStorage):
logger.error(f"Error during ChromaDB upsert: {str(e)}")
raise
async def query(self, query: str, top_k: int) -> list[dict[str, Any]]:
async def query(
self, query: str, top_k: int, ids: list[str] | None = None
) -> list[dict[str, Any]]:
try:
embedding = await self.embedding_func([query])

View File

@@ -171,7 +171,9 @@ class FaissVectorDBStorage(BaseVectorStorage):
logger.info(f"Upserted {len(list_data)} vectors into Faiss index.")
return [m["__id__"] for m in list_data]
async def query(self, query: str, top_k: int) -> list[dict[str, Any]]:
async def query(
self, query: str, top_k: int, ids: list[str] | None = None
) -> list[dict[str, Any]]:
"""
Search by a textual query; returns top_k results with their metadata + similarity distance.
"""

View File

@@ -101,7 +101,9 @@ class MilvusVectorDBStorage(BaseVectorStorage):
results = self._client.upsert(collection_name=self.namespace, data=list_data)
return results
async def query(self, query: str, top_k: int) -> list[dict[str, Any]]:
async def query(
self, query: str, top_k: int, ids: list[str] | None = None
) -> list[dict[str, Any]]:
embedding = await self.embedding_func([query])
results = self._client.search(
collection_name=self.namespace,

View File

@@ -938,7 +938,9 @@ class MongoVectorDBStorage(BaseVectorStorage):
return list_data
async def query(self, query: str, top_k: int) -> list[dict[str, Any]]:
async def query(
self, query: str, top_k: int, ids: list[str] | None = None
) -> list[dict[str, Any]]:
"""Queries the vector database using Atlas Vector Search."""
# Generate the embedding
embedding = await self.embedding_func([query])

View File

@@ -120,7 +120,9 @@ class NanoVectorDBStorage(BaseVectorStorage):
f"embedding is not 1-1 with data, {len(embeddings)} != {len(list_data)}"
)
async def query(self, query: str, top_k: int) -> list[dict[str, Any]]:
async def query(
self, query: str, top_k: int, ids: list[str] | None = None
) -> list[dict[str, Any]]:
# Execute embedding outside of lock to avoid long lock times
embedding = await self.embedding_func([query])
embedding = embedding[0]

View File

@@ -553,18 +553,6 @@ class Neo4JStorage(BaseGraphStorage):
logger.error(f"Error during upsert: {str(e)}")
raise
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type(
(
neo4jExceptions.ServiceUnavailable,
neo4jExceptions.TransientError,
neo4jExceptions.WriteServiceUnavailable,
neo4jExceptions.ClientError,
)
),
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
@@ -666,14 +654,14 @@ class Neo4JStorage(BaseGraphStorage):
main_query = """
MATCH (n)
OPTIONAL MATCH (n)-[r]-()
WITH n, count(r) AS degree
WITH n, COALESCE(count(r), 0) AS degree
WHERE degree >= $min_degree
ORDER BY degree DESC
LIMIT $max_nodes
WITH collect({node: n}) AS filtered_nodes
UNWIND filtered_nodes AS node_info
WITH collect(node_info.node) AS kept_nodes, filtered_nodes
MATCH (a)-[r]-(b)
OPTIONAL MATCH (a)-[r]-(b)
WHERE a IN kept_nodes AND b IN kept_nodes
RETURN filtered_nodes AS node_info,
collect(DISTINCT r) AS relationships
@@ -703,7 +691,7 @@ class Neo4JStorage(BaseGraphStorage):
WITH start, nodes, relationships
UNWIND nodes AS node
OPTIONAL MATCH (node)-[r]-()
WITH node, count(r) AS degree, start, nodes, relationships
WITH node, COALESCE(count(r), 0) AS degree, start, nodes, relationships
WHERE node = start OR EXISTS((start)--(node)) OR degree >= $min_degree
ORDER BY
CASE
@@ -716,7 +704,7 @@ class Neo4JStorage(BaseGraphStorage):
WITH collect({node: node}) AS filtered_nodes
UNWIND filtered_nodes AS node_info
WITH collect(node_info.node) AS kept_nodes, filtered_nodes
MATCH (a)-[r]-(b)
OPTIONAL MATCH (a)-[r]-(b)
WHERE a IN kept_nodes AND b IN kept_nodes
RETURN filtered_nodes AS node_info,
collect(DISTINCT r) AS relationships
@@ -744,11 +732,7 @@ class Neo4JStorage(BaseGraphStorage):
result.nodes.append(
KnowledgeGraphNode(
id=f"{node_id}",
labels=[
label
for label in node.labels
if label != "base"
],
labels=[node.get("entity_id")],
properties=dict(node),
)
)
@@ -865,9 +849,7 @@ class Neo4JStorage(BaseGraphStorage):
# Create KnowledgeGraphNode for target
target_node = KnowledgeGraphNode(
id=f"{target_id}",
labels=[
label for label in b_node.labels if label != "base"
],
labels=list(f"{target_id}"),
properties=dict(b_node.properties),
)
@@ -907,9 +889,7 @@ class Neo4JStorage(BaseGraphStorage):
# Create initial KnowledgeGraphNode
start_node = KnowledgeGraphNode(
id=f"{node_record['n'].get('entity_id')}",
labels=[
label for label in node_record["n"].labels if label != "base"
],
labels=list(f"{node_record['n'].get('entity_id')}"),
properties=dict(node_record["n"].properties),
)
finally:

View File

@@ -417,7 +417,9 @@ class OracleVectorDBStorage(BaseVectorStorage):
self.db = None
#################### query method ###############
async def query(self, query: str, top_k: int) -> list[dict[str, Any]]:
async def query(
self, query: str, top_k: int, ids: list[str] | None = None
) -> list[dict[str, Any]]:
embeddings = await self.embedding_func([query])
embedding = embeddings[0]
# 转换精度

View File

@@ -123,7 +123,9 @@ class QdrantVectorDBStorage(BaseVectorStorage):
)
return results
async def query(self, query: str, top_k: int) -> list[dict[str, Any]]:
async def query(
self, query: str, top_k: int, ids: list[str] | None = None
) -> list[dict[str, Any]]:
embedding = await self.embedding_func([query])
results = self._client.search(
collection_name=self.namespace,

View File

@@ -306,7 +306,9 @@ class TiDBVectorDBStorage(BaseVectorStorage):
await ClientManager.release_client(self.db)
self.db = None
async def query(self, query: str, top_k: int) -> list[dict[str, Any]]:
async def query(
self, query: str, top_k: int, ids: list[str] | None = None
) -> list[dict[str, Any]]:
"""Search from tidb vector"""
embeddings = await self.embedding_func([query])
embedding = embeddings[0]

View File

@@ -3,11 +3,14 @@ from __future__ import annotations
import asyncio
import configparser
import os
import csv
import warnings
from dataclasses import asdict, dataclass, field
from datetime import datetime
from functools import partial
from typing import Any, AsyncIterator, Callable, Iterator, cast, final
from typing import Any, AsyncIterator, Callable, Iterator, cast, final, Literal
import pandas as pd
from lightrag.kg import (
STORAGE_ENV_REQUIREMENTS,
@@ -1111,6 +1114,7 @@ class LightRAG:
# Prepare node data
node_data: dict[str, str] = {
"entity_id": entity_name,
"entity_type": entity_type,
"description": description,
"source_id": source_id,
@@ -1148,6 +1152,7 @@ class LightRAG:
await self.chunk_entity_relation_graph.upsert_node(
need_insert_id,
node_data={
"entity_id": need_insert_id,
"source_id": source_id,
"description": "UNKNOWN",
"entity_type": "UNKNOWN",
@@ -1838,9 +1843,10 @@ class LightRAG:
"""
try:
# 1. Get current entity information
node_data = await self.chunk_entity_relation_graph.get_node(entity_name)
if not node_data:
node_exists = await self.chunk_entity_relation_graph.has_node(entity_name)
if not node_exists:
raise ValueError(f"Entity '{entity_name}' does not exist")
node_data = await self.chunk_entity_relation_graph.get_node(entity_name)
# Check if entity is being renamed
new_entity_name = updated_data.get("entity_name", entity_name)
@@ -1853,7 +1859,7 @@ class LightRAG:
"Entity renaming is not allowed. Set allow_rename=True to enable this feature"
)
existing_node = await self.chunk_entity_relation_graph.get_node(
existing_node = await self.chunk_entity_relation_graph.has_node(
new_entity_name
)
if existing_node:
@@ -2035,14 +2041,16 @@ class LightRAG:
"""
try:
# 1. Get current relation information
edge_data = await self.chunk_entity_relation_graph.get_edge(
edge_exists = await self.chunk_entity_relation_graph.has_edge(
source_entity, target_entity
)
if not edge_data:
if not edge_exists:
raise ValueError(
f"Relation from '{source_entity}' to '{target_entity}' does not exist"
)
edge_data = await self.chunk_entity_relation_graph.get_edge(
source_entity, target_entity
)
# Important: First delete the old relation record from the vector database
old_relation_id = compute_mdhash_id(
source_entity + target_entity, prefix="rel-"
@@ -2151,12 +2159,13 @@ class LightRAG:
"""
try:
# Check if entity already exists
existing_node = await self.chunk_entity_relation_graph.get_node(entity_name)
existing_node = await self.chunk_entity_relation_graph.has_node(entity_name)
if existing_node:
raise ValueError(f"Entity '{entity_name}' already exists")
# Prepare node data with defaults if missing
node_data = {
"entity_id": entity_name,
"entity_type": entity_data.get("entity_type", "UNKNOWN"),
"description": entity_data.get("description", ""),
"source_id": entity_data.get("source_id", "manual"),
@@ -2244,7 +2253,7 @@ class LightRAG:
raise ValueError(f"Target entity '{target_entity}' does not exist")
# Check if relation already exists
existing_edge = await self.chunk_entity_relation_graph.get_edge(
existing_edge = await self.chunk_entity_relation_graph.has_edge(
source_entity, target_entity
)
if existing_edge:
@@ -2377,19 +2386,22 @@ class LightRAG:
# 1. Check if all source entities exist
source_entities_data = {}
for entity_name in source_entities:
node_data = await self.chunk_entity_relation_graph.get_node(entity_name)
if not node_data:
node_exists = await self.chunk_entity_relation_graph.has_node(
entity_name
)
if not node_exists:
raise ValueError(f"Source entity '{entity_name}' does not exist")
node_data = await self.chunk_entity_relation_graph.get_node(entity_name)
source_entities_data[entity_name] = node_data
# 2. Check if target entity exists and get its data if it does
target_exists = await self.chunk_entity_relation_graph.has_node(
target_entity
)
target_entity_data = {}
existing_target_entity_data = {}
if target_exists:
target_entity_data = await self.chunk_entity_relation_graph.get_node(
target_entity
existing_target_entity_data = (
await self.chunk_entity_relation_graph.get_node(target_entity)
)
logger.info(
f"Target entity '{target_entity}' already exists, will merge data"
@@ -2398,7 +2410,7 @@ class LightRAG:
# 3. Merge entity data
merged_entity_data = self._merge_entity_attributes(
list(source_entities_data.values())
+ ([target_entity_data] if target_exists else []),
+ ([existing_target_entity_data] if target_exists else []),
merge_strategy,
)
@@ -2592,6 +2604,322 @@ class LightRAG:
logger.error(f"Error merging entities: {e}")
raise
async def aexport_data(
self,
output_path: str,
file_format: Literal["csv", "excel", "md", "txt"] = "csv",
include_vector_data: bool = False,
) -> None:
"""
Asynchronously exports all entities, relations, and relationships to various formats.
Args:
output_path: The path to the output file (including extension).
file_format: Output format - "csv", "excel", "md", "txt".
- csv: Comma-separated values file
- excel: Microsoft Excel file with multiple sheets
- md: Markdown tables
- txt: Plain text formatted output
- table: Print formatted tables to console
include_vector_data: Whether to include data from the vector database.
"""
# Collect data
entities_data = []
relations_data = []
relationships_data = []
# --- Entities ---
all_entities = await self.chunk_entity_relation_graph.get_all_labels()
for entity_name in all_entities:
entity_info = await self.get_entity_info(
entity_name, include_vector_data=include_vector_data
)
entity_row = {
"entity_name": entity_name,
"source_id": entity_info["source_id"],
"graph_data": str(
entity_info["graph_data"]
), # Convert to string to ensure compatibility
}
if include_vector_data and "vector_data" in entity_info:
entity_row["vector_data"] = str(entity_info["vector_data"])
entities_data.append(entity_row)
# --- Relations ---
for src_entity in all_entities:
for tgt_entity in all_entities:
if src_entity == tgt_entity:
continue
edge_exists = await self.chunk_entity_relation_graph.has_edge(
src_entity, tgt_entity
)
if edge_exists:
relation_info = await self.get_relation_info(
src_entity, tgt_entity, include_vector_data=include_vector_data
)
relation_row = {
"src_entity": src_entity,
"tgt_entity": tgt_entity,
"source_id": relation_info["source_id"],
"graph_data": str(
relation_info["graph_data"]
), # Convert to string
}
if include_vector_data and "vector_data" in relation_info:
relation_row["vector_data"] = str(relation_info["vector_data"])
relations_data.append(relation_row)
# --- Relationships (from VectorDB) ---
all_relationships = await self.relationships_vdb.client_storage
for rel in all_relationships["data"]:
relationships_data.append(
{
"relationship_id": rel["__id__"],
"data": str(rel), # Convert to string for compatibility
}
)
# Export based on format
if file_format == "csv":
# CSV export
with open(output_path, "w", newline="", encoding="utf-8") as csvfile:
# Entities
if entities_data:
csvfile.write("# ENTITIES\n")
writer = csv.DictWriter(csvfile, fieldnames=entities_data[0].keys())
writer.writeheader()
writer.writerows(entities_data)
csvfile.write("\n\n")
# Relations
if relations_data:
csvfile.write("# RELATIONS\n")
writer = csv.DictWriter(
csvfile, fieldnames=relations_data[0].keys()
)
writer.writeheader()
writer.writerows(relations_data)
csvfile.write("\n\n")
# Relationships
if relationships_data:
csvfile.write("# RELATIONSHIPS\n")
writer = csv.DictWriter(
csvfile, fieldnames=relationships_data[0].keys()
)
writer.writeheader()
writer.writerows(relationships_data)
elif file_format == "excel":
# Excel export
entities_df = (
pd.DataFrame(entities_data) if entities_data else pd.DataFrame()
)
relations_df = (
pd.DataFrame(relations_data) if relations_data else pd.DataFrame()
)
relationships_df = (
pd.DataFrame(relationships_data)
if relationships_data
else pd.DataFrame()
)
with pd.ExcelWriter(output_path, engine="xlsxwriter") as writer:
if not entities_df.empty:
entities_df.to_excel(writer, sheet_name="Entities", index=False)
if not relations_df.empty:
relations_df.to_excel(writer, sheet_name="Relations", index=False)
if not relationships_df.empty:
relationships_df.to_excel(
writer, sheet_name="Relationships", index=False
)
elif file_format == "md":
# Markdown export
with open(output_path, "w", encoding="utf-8") as mdfile:
mdfile.write("# LightRAG Data Export\n\n")
# Entities
mdfile.write("## Entities\n\n")
if entities_data:
# Write header
mdfile.write("| " + " | ".join(entities_data[0].keys()) + " |\n")
mdfile.write(
"| "
+ " | ".join(["---"] * len(entities_data[0].keys()))
+ " |\n"
)
# Write rows
for entity in entities_data:
mdfile.write(
"| " + " | ".join(str(v) for v in entity.values()) + " |\n"
)
mdfile.write("\n\n")
else:
mdfile.write("*No entity data available*\n\n")
# Relations
mdfile.write("## Relations\n\n")
if relations_data:
# Write header
mdfile.write("| " + " | ".join(relations_data[0].keys()) + " |\n")
mdfile.write(
"| "
+ " | ".join(["---"] * len(relations_data[0].keys()))
+ " |\n"
)
# Write rows
for relation in relations_data:
mdfile.write(
"| "
+ " | ".join(str(v) for v in relation.values())
+ " |\n"
)
mdfile.write("\n\n")
else:
mdfile.write("*No relation data available*\n\n")
# Relationships
mdfile.write("## Relationships\n\n")
if relationships_data:
# Write header
mdfile.write(
"| " + " | ".join(relationships_data[0].keys()) + " |\n"
)
mdfile.write(
"| "
+ " | ".join(["---"] * len(relationships_data[0].keys()))
+ " |\n"
)
# Write rows
for relationship in relationships_data:
mdfile.write(
"| "
+ " | ".join(str(v) for v in relationship.values())
+ " |\n"
)
else:
mdfile.write("*No relationship data available*\n\n")
elif file_format == "txt":
# Plain text export
with open(output_path, "w", encoding="utf-8") as txtfile:
txtfile.write("LIGHTRAG DATA EXPORT\n")
txtfile.write("=" * 80 + "\n\n")
# Entities
txtfile.write("ENTITIES\n")
txtfile.write("-" * 80 + "\n")
if entities_data:
# Create fixed width columns
col_widths = {
k: max(len(k), max(len(str(e[k])) for e in entities_data))
for k in entities_data[0]
}
header = " ".join(k.ljust(col_widths[k]) for k in entities_data[0])
txtfile.write(header + "\n")
txtfile.write("-" * len(header) + "\n")
# Write rows
for entity in entities_data:
row = " ".join(
str(v).ljust(col_widths[k]) for k, v in entity.items()
)
txtfile.write(row + "\n")
txtfile.write("\n\n")
else:
txtfile.write("No entity data available\n\n")
# Relations
txtfile.write("RELATIONS\n")
txtfile.write("-" * 80 + "\n")
if relations_data:
# Create fixed width columns
col_widths = {
k: max(len(k), max(len(str(r[k])) for r in relations_data))
for k in relations_data[0]
}
header = " ".join(
k.ljust(col_widths[k]) for k in relations_data[0]
)
txtfile.write(header + "\n")
txtfile.write("-" * len(header) + "\n")
# Write rows
for relation in relations_data:
row = " ".join(
str(v).ljust(col_widths[k]) for k, v in relation.items()
)
txtfile.write(row + "\n")
txtfile.write("\n\n")
else:
txtfile.write("No relation data available\n\n")
# Relationships
txtfile.write("RELATIONSHIPS\n")
txtfile.write("-" * 80 + "\n")
if relationships_data:
# Create fixed width columns
col_widths = {
k: max(len(k), max(len(str(r[k])) for r in relationships_data))
for k in relationships_data[0]
}
header = " ".join(
k.ljust(col_widths[k]) for k in relationships_data[0]
)
txtfile.write(header + "\n")
txtfile.write("-" * len(header) + "\n")
# Write rows
for relationship in relationships_data:
row = " ".join(
str(v).ljust(col_widths[k]) for k, v in relationship.items()
)
txtfile.write(row + "\n")
else:
txtfile.write("No relationship data available\n\n")
else:
raise ValueError(
f"Unsupported file format: {file_format}. "
f"Choose from: csv, excel, md, txt"
)
if file_format is not None:
print(f"Data exported to: {output_path} with format: {file_format}")
else:
print("Data displayed as table format")
def export_data(
self,
output_path: str,
file_format: Literal["csv", "excel", "md", "txt"] = "csv",
include_vector_data: bool = False,
) -> None:
"""
Synchronously exports all entities, relations, and relationships to various formats.
Args:
output_path: The path to the output file (including extension).
file_format: Output format - "csv", "excel", "md", "txt".
- csv: Comma-separated values file
- excel: Microsoft Excel file with multiple sheets
- md: Markdown tables
- txt: Plain text formatted output
- table: Print formatted tables to console
include_vector_data: Whether to include data from the vector database.
"""
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(
self.aexport_data(output_path, file_format, include_vector_data)
)
def merge_entities(
self,
source_entities: list[str],

View File

@@ -76,6 +76,7 @@ class LightragPathFilter(logging.Filter):
super().__init__()
# Define paths to be filtered
self.filtered_paths = ["/documents", "/health", "/webui/"]
# self.filtered_paths = ["/health", "/webui/"]
def filter(self, record):
try: