Fixed lint and Added new imports at the top of the file

This commit is contained in:
Rushi Chaganti
2025-03-12 00:04:23 +05:30
parent 2ffd7f9111
commit 39633cb1d9
3 changed files with 493 additions and 70 deletions

View File

@@ -3,11 +3,14 @@ from __future__ import annotations
import asyncio
import configparser
import os
import csv
import warnings
from dataclasses import asdict, dataclass, field
from datetime import datetime
from functools import partial
from typing import Any, AsyncIterator, Callable, Iterator, cast, final
from typing import Any, AsyncIterator, Callable, Iterator, cast, final, Literal
import pandas as pd
from lightrag.kg import (
STORAGE_ENV_REQUIREMENTS,
@@ -2592,6 +2595,322 @@ class LightRAG:
logger.error(f"Error merging entities: {e}")
raise
async def aexport_data(
self,
output_path: str,
file_format: Literal["csv", "excel", "md", "txt"] = "csv",
include_vector_data: bool = False,
) -> None:
"""
Asynchronously exports all entities, relations, and relationships to various formats.
Args:
output_path: The path to the output file (including extension).
file_format: Output format - "csv", "excel", "md", "txt".
- csv: Comma-separated values file
- excel: Microsoft Excel file with multiple sheets
- md: Markdown tables
- txt: Plain text formatted output
- table: Print formatted tables to console
include_vector_data: Whether to include data from the vector database.
"""
# Collect data
entities_data = []
relations_data = []
relationships_data = []
# --- Entities ---
all_entities = await self.chunk_entity_relation_graph.get_all_labels()
for entity_name in all_entities:
entity_info = await self.get_entity_info(
entity_name, include_vector_data=include_vector_data
)
entity_row = {
"entity_name": entity_name,
"source_id": entity_info["source_id"],
"graph_data": str(
entity_info["graph_data"]
), # Convert to string to ensure compatibility
}
if include_vector_data and "vector_data" in entity_info:
entity_row["vector_data"] = str(entity_info["vector_data"])
entities_data.append(entity_row)
# --- Relations ---
for src_entity in all_entities:
for tgt_entity in all_entities:
if src_entity == tgt_entity:
continue
edge_exists = await self.chunk_entity_relation_graph.has_edge(
src_entity, tgt_entity
)
if edge_exists:
relation_info = await self.get_relation_info(
src_entity, tgt_entity, include_vector_data=include_vector_data
)
relation_row = {
"src_entity": src_entity,
"tgt_entity": tgt_entity,
"source_id": relation_info["source_id"],
"graph_data": str(
relation_info["graph_data"]
), # Convert to string
}
if include_vector_data and "vector_data" in relation_info:
relation_row["vector_data"] = str(relation_info["vector_data"])
relations_data.append(relation_row)
# --- Relationships (from VectorDB) ---
all_relationships = await self.relationships_vdb.client_storage
for rel in all_relationships["data"]:
relationships_data.append(
{
"relationship_id": rel["__id__"],
"data": str(rel), # Convert to string for compatibility
}
)
# Export based on format
if file_format == "csv":
# CSV export
with open(output_path, "w", newline="", encoding="utf-8") as csvfile:
# Entities
if entities_data:
csvfile.write("# ENTITIES\n")
writer = csv.DictWriter(csvfile, fieldnames=entities_data[0].keys())
writer.writeheader()
writer.writerows(entities_data)
csvfile.write("\n\n")
# Relations
if relations_data:
csvfile.write("# RELATIONS\n")
writer = csv.DictWriter(
csvfile, fieldnames=relations_data[0].keys()
)
writer.writeheader()
writer.writerows(relations_data)
csvfile.write("\n\n")
# Relationships
if relationships_data:
csvfile.write("# RELATIONSHIPS\n")
writer = csv.DictWriter(
csvfile, fieldnames=relationships_data[0].keys()
)
writer.writeheader()
writer.writerows(relationships_data)
elif file_format == "excel":
# Excel export
entities_df = (
pd.DataFrame(entities_data) if entities_data else pd.DataFrame()
)
relations_df = (
pd.DataFrame(relations_data) if relations_data else pd.DataFrame()
)
relationships_df = (
pd.DataFrame(relationships_data)
if relationships_data
else pd.DataFrame()
)
with pd.ExcelWriter(output_path, engine="xlsxwriter") as writer:
if not entities_df.empty:
entities_df.to_excel(writer, sheet_name="Entities", index=False)
if not relations_df.empty:
relations_df.to_excel(writer, sheet_name="Relations", index=False)
if not relationships_df.empty:
relationships_df.to_excel(
writer, sheet_name="Relationships", index=False
)
elif file_format == "md":
# Markdown export
with open(output_path, "w", encoding="utf-8") as mdfile:
mdfile.write("# LightRAG Data Export\n\n")
# Entities
mdfile.write("## Entities\n\n")
if entities_data:
# Write header
mdfile.write("| " + " | ".join(entities_data[0].keys()) + " |\n")
mdfile.write(
"| "
+ " | ".join(["---"] * len(entities_data[0].keys()))
+ " |\n"
)
# Write rows
for entity in entities_data:
mdfile.write(
"| " + " | ".join(str(v) for v in entity.values()) + " |\n"
)
mdfile.write("\n\n")
else:
mdfile.write("*No entity data available*\n\n")
# Relations
mdfile.write("## Relations\n\n")
if relations_data:
# Write header
mdfile.write("| " + " | ".join(relations_data[0].keys()) + " |\n")
mdfile.write(
"| "
+ " | ".join(["---"] * len(relations_data[0].keys()))
+ " |\n"
)
# Write rows
for relation in relations_data:
mdfile.write(
"| "
+ " | ".join(str(v) for v in relation.values())
+ " |\n"
)
mdfile.write("\n\n")
else:
mdfile.write("*No relation data available*\n\n")
# Relationships
mdfile.write("## Relationships\n\n")
if relationships_data:
# Write header
mdfile.write(
"| " + " | ".join(relationships_data[0].keys()) + " |\n"
)
mdfile.write(
"| "
+ " | ".join(["---"] * len(relationships_data[0].keys()))
+ " |\n"
)
# Write rows
for relationship in relationships_data:
mdfile.write(
"| "
+ " | ".join(str(v) for v in relationship.values())
+ " |\n"
)
else:
mdfile.write("*No relationship data available*\n\n")
elif file_format == "txt":
# Plain text export
with open(output_path, "w", encoding="utf-8") as txtfile:
txtfile.write("LIGHTRAG DATA EXPORT\n")
txtfile.write("=" * 80 + "\n\n")
# Entities
txtfile.write("ENTITIES\n")
txtfile.write("-" * 80 + "\n")
if entities_data:
# Create fixed width columns
col_widths = {
k: max(len(k), max(len(str(e[k])) for e in entities_data))
for k in entities_data[0]
}
header = " ".join(k.ljust(col_widths[k]) for k in entities_data[0])
txtfile.write(header + "\n")
txtfile.write("-" * len(header) + "\n")
# Write rows
for entity in entities_data:
row = " ".join(
str(v).ljust(col_widths[k]) for k, v in entity.items()
)
txtfile.write(row + "\n")
txtfile.write("\n\n")
else:
txtfile.write("No entity data available\n\n")
# Relations
txtfile.write("RELATIONS\n")
txtfile.write("-" * 80 + "\n")
if relations_data:
# Create fixed width columns
col_widths = {
k: max(len(k), max(len(str(r[k])) for r in relations_data))
for k in relations_data[0]
}
header = " ".join(
k.ljust(col_widths[k]) for k in relations_data[0]
)
txtfile.write(header + "\n")
txtfile.write("-" * len(header) + "\n")
# Write rows
for relation in relations_data:
row = " ".join(
str(v).ljust(col_widths[k]) for k, v in relation.items()
)
txtfile.write(row + "\n")
txtfile.write("\n\n")
else:
txtfile.write("No relation data available\n\n")
# Relationships
txtfile.write("RELATIONSHIPS\n")
txtfile.write("-" * 80 + "\n")
if relationships_data:
# Create fixed width columns
col_widths = {
k: max(len(k), max(len(str(r[k])) for r in relationships_data))
for k in relationships_data[0]
}
header = " ".join(
k.ljust(col_widths[k]) for k in relationships_data[0]
)
txtfile.write(header + "\n")
txtfile.write("-" * len(header) + "\n")
# Write rows
for relationship in relationships_data:
row = " ".join(
str(v).ljust(col_widths[k]) for k, v in relationship.items()
)
txtfile.write(row + "\n")
else:
txtfile.write("No relationship data available\n\n")
else:
raise ValueError(
f"Unsupported file format: {file_format}. "
f"Choose from: csv, excel, md, txt"
)
if file_format is not None:
print(f"Data exported to: {output_path} with format: {file_format}")
else:
print("Data displayed as table format")
def export_data(
self,
output_path: str,
file_format: Literal["csv", "excel", "md", "txt"] = "csv",
include_vector_data: bool = False,
) -> None:
"""
Synchronously exports all entities, relations, and relationships to various formats.
Args:
output_path: The path to the output file (including extension).
file_format: Output format - "csv", "excel", "md", "txt".
- csv: Comma-separated values file
- excel: Microsoft Excel file with multiple sheets
- md: Markdown tables
- txt: Plain text formatted output
- table: Print formatted tables to console
include_vector_data: Whether to include data from the vector database.
"""
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(
self.aexport_data(output_path, file_format, include_vector_data)
)
def merge_entities(
self,
source_entities: list[str],