Merge pull request #1519 from danielaskdd/fix-json-postgres
Fix JSON handling error for PostgreSQL graph storage
This commit is contained in:
@@ -1178,7 +1178,7 @@ class PGGraphStorage(BaseGraphStorage):
|
||||
with_age=True,
|
||||
graph_name=self.graph_name,
|
||||
)
|
||||
logger.info(f"Successfully executed: {query}")
|
||||
# logger.info(f"Successfully executed: {query}")
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
@@ -1373,6 +1373,15 @@ class PGGraphStorage(BaseGraphStorage):
|
||||
node = record[0]
|
||||
node_dict = node["n"]["properties"]
|
||||
|
||||
# Process string result, parse it to JSON dictionary
|
||||
if isinstance(node_dict, str):
|
||||
try:
|
||||
import json
|
||||
|
||||
node_dict = json.loads(node_dict)
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(f"Failed to parse node string: {node_dict}")
|
||||
|
||||
return node_dict
|
||||
return None
|
||||
|
||||
@@ -1421,6 +1430,15 @@ class PGGraphStorage(BaseGraphStorage):
|
||||
if record and record[0] and record[0]["edge_properties"]:
|
||||
result = record[0]["edge_properties"]
|
||||
|
||||
# Process string result, parse it to JSON dictionary
|
||||
if isinstance(result, str):
|
||||
try:
|
||||
import json
|
||||
|
||||
result = json.loads(result)
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(f"Failed to parse edge string: {result}")
|
||||
|
||||
return result
|
||||
|
||||
async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None:
|
||||
@@ -1432,9 +1450,9 @@ class PGGraphStorage(BaseGraphStorage):
|
||||
|
||||
query = """SELECT * FROM cypher('%s', $$
|
||||
MATCH (n:base {entity_id: "%s"})
|
||||
OPTIONAL MATCH (n)-[]-(connected)
|
||||
RETURN n, connected
|
||||
$$) AS (n agtype, connected agtype)""" % (
|
||||
OPTIONAL MATCH (n)-[]-(connected:base)
|
||||
RETURN n.entity_id AS source_id, connected.entity_id AS connected_id
|
||||
$$) AS (source_id text, connected_id text)""" % (
|
||||
self.graph_name,
|
||||
label,
|
||||
)
|
||||
@@ -1442,20 +1460,11 @@ class PGGraphStorage(BaseGraphStorage):
|
||||
results = await self._query(query)
|
||||
edges = []
|
||||
for record in results:
|
||||
source_node = record["n"] if record["n"] else None
|
||||
connected_node = record["connected"] if record["connected"] else None
|
||||
source_id = record["source_id"]
|
||||
connected_id = record["connected_id"]
|
||||
|
||||
if (
|
||||
source_node
|
||||
and connected_node
|
||||
and "properties" in source_node
|
||||
and "properties" in connected_node
|
||||
):
|
||||
source_label = source_node["properties"].get("entity_id")
|
||||
target_label = connected_node["properties"].get("entity_id")
|
||||
|
||||
if source_label and target_label:
|
||||
edges.append((source_label, target_label))
|
||||
if source_id and connected_id:
|
||||
edges.append((source_id, connected_id))
|
||||
|
||||
return edges
|
||||
|
||||
@@ -1638,6 +1647,18 @@ class PGGraphStorage(BaseGraphStorage):
|
||||
for result in results:
|
||||
if result["node_id"] and result["n"]:
|
||||
node_dict = result["n"]["properties"]
|
||||
|
||||
# Process string result, parse it to JSON dictionary
|
||||
if isinstance(node_dict, str):
|
||||
try:
|
||||
import json
|
||||
|
||||
node_dict = json.loads(node_dict)
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(
|
||||
f"Failed to parse node string in batch: {node_dict}"
|
||||
)
|
||||
|
||||
# Remove the 'base' label if present in a 'labels' property
|
||||
if "labels" in node_dict:
|
||||
node_dict["labels"] = [
|
||||
@@ -1789,15 +1810,39 @@ class PGGraphStorage(BaseGraphStorage):
|
||||
|
||||
for result in forward_results:
|
||||
if result["source"] and result["target"] and result["edge_properties"]:
|
||||
edges_dict[(result["source"], result["target"])] = result[
|
||||
"edge_properties"
|
||||
]
|
||||
edge_props = result["edge_properties"]
|
||||
|
||||
# Process string result, parse it to JSON dictionary
|
||||
if isinstance(edge_props, str):
|
||||
try:
|
||||
import json
|
||||
|
||||
edge_props = json.loads(edge_props)
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(
|
||||
f"Failed to parse edge properties string: {edge_props}"
|
||||
)
|
||||
continue
|
||||
|
||||
edges_dict[(result["source"], result["target"])] = edge_props
|
||||
|
||||
for result in backward_results:
|
||||
if result["source"] and result["target"] and result["edge_properties"]:
|
||||
edges_dict[(result["source"], result["target"])] = result[
|
||||
"edge_properties"
|
||||
]
|
||||
edge_props = result["edge_properties"]
|
||||
|
||||
# Process string result, parse it to JSON dictionary
|
||||
if isinstance(edge_props, str):
|
||||
try:
|
||||
import json
|
||||
|
||||
edge_props = json.loads(edge_props)
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(
|
||||
f"Failed to parse edge properties string: {edge_props}"
|
||||
)
|
||||
continue
|
||||
|
||||
edges_dict[(result["source"], result["target"])] = edge_props
|
||||
|
||||
return edges_dict
|
||||
|
||||
|
@@ -994,10 +994,14 @@ class LightRAG:
|
||||
|
||||
except Exception as e:
|
||||
# Log error and update pipeline status
|
||||
error_msg = f"Failed to extrat document {doc_id}: {traceback.format_exc()}"
|
||||
logger.error(traceback.format_exc())
|
||||
error_msg = f"Failed to extrat document {current_file_number}/{total_files}: {file_path}"
|
||||
logger.error(error_msg)
|
||||
async with pipeline_status_lock:
|
||||
pipeline_status["latest_message"] = error_msg
|
||||
pipeline_status["history_messages"].append(
|
||||
traceback.format_exc()
|
||||
)
|
||||
pipeline_status["history_messages"].append(error_msg)
|
||||
|
||||
# Cancel other tasks as they are no longer meaningful
|
||||
@@ -1080,10 +1084,14 @@ class LightRAG:
|
||||
|
||||
except Exception as e:
|
||||
# Log error and update pipeline status
|
||||
error_msg = f"Merging stage failed in document {doc_id}: {traceback.format_exc()}"
|
||||
logger.error(traceback.format_exc())
|
||||
error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
|
||||
logger.error(error_msg)
|
||||
async with pipeline_status_lock:
|
||||
pipeline_status["latest_message"] = error_msg
|
||||
pipeline_status["history_messages"].append(
|
||||
traceback.format_exc()
|
||||
)
|
||||
pipeline_status["history_messages"].append(error_msg)
|
||||
|
||||
# Persistent llm cache
|
||||
|
Reference in New Issue
Block a user