Fix JSON handling error for PostgreSQL graph storage

This commit is contained in:
yangdx
2025-05-04 22:18:56 +08:00
parent 1213f53fc9
commit dcb2a72462

View File

@@ -1372,6 +1372,14 @@ class PGGraphStorage(BaseGraphStorage):
if record: if record:
node = record[0] node = record[0]
node_dict = node["n"]["properties"] node_dict = node["n"]["properties"]
# Process string result, parse it to JSON dictionary
if isinstance(node_dict, str):
try:
import json
node_dict = json.loads(node_dict)
except json.JSONDecodeError:
logger.warning(f"Failed to parse node string: {node_dict}")
return node_dict return node_dict
return None return None
@@ -1420,7 +1428,15 @@ class PGGraphStorage(BaseGraphStorage):
record = await self._query(query) record = await self._query(query)
if record and record[0] and record[0]["edge_properties"]: if record and record[0] and record[0]["edge_properties"]:
result = record[0]["edge_properties"] result = record[0]["edge_properties"]
# Process string result, parse it to JSON dictionary
if isinstance(result, str):
try:
import json
result = json.loads(result)
except json.JSONDecodeError:
logger.warning(f"Failed to parse edge string: {result}")
return result return result
async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None: async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None:
@@ -1432,9 +1448,9 @@ class PGGraphStorage(BaseGraphStorage):
query = """SELECT * FROM cypher('%s', $$ query = """SELECT * FROM cypher('%s', $$
MATCH (n:base {entity_id: "%s"}) MATCH (n:base {entity_id: "%s"})
OPTIONAL MATCH (n)-[]-(connected) OPTIONAL MATCH (n)-[]-(connected:base)
RETURN n, connected RETURN n.entity_id AS source_id, connected.entity_id AS connected_id
$$) AS (n agtype, connected agtype)""" % ( $$) AS (source_id text, connected_id text)""" % (
self.graph_name, self.graph_name,
label, label,
) )
@@ -1442,20 +1458,11 @@ class PGGraphStorage(BaseGraphStorage):
results = await self._query(query) results = await self._query(query)
edges = [] edges = []
for record in results: for record in results:
source_node = record["n"] if record["n"] else None source_id = record["source_id"]
connected_node = record["connected"] if record["connected"] else None connected_id = record["connected_id"]
if ( if source_id and connected_id:
source_node edges.append((source_id, connected_id))
and connected_node
and "properties" in source_node
and "properties" in connected_node
):
source_label = source_node["properties"].get("entity_id")
target_label = connected_node["properties"].get("entity_id")
if source_label and target_label:
edges.append((source_label, target_label))
return edges return edges
@@ -1638,6 +1645,15 @@ class PGGraphStorage(BaseGraphStorage):
for result in results: for result in results:
if result["node_id"] and result["n"]: if result["node_id"] and result["n"]:
node_dict = result["n"]["properties"] node_dict = result["n"]["properties"]
# Process string result, parse it to JSON dictionary
if isinstance(node_dict, str):
try:
import json
node_dict = json.loads(node_dict)
except json.JSONDecodeError:
logger.warning(f"Failed to parse node string in batch: {node_dict}")
# Remove the 'base' label if present in a 'labels' property # Remove the 'base' label if present in a 'labels' property
if "labels" in node_dict: if "labels" in node_dict:
node_dict["labels"] = [ node_dict["labels"] = [
@@ -1789,15 +1805,33 @@ class PGGraphStorage(BaseGraphStorage):
for result in forward_results: for result in forward_results:
if result["source"] and result["target"] and result["edge_properties"]: if result["source"] and result["target"] and result["edge_properties"]:
edges_dict[(result["source"], result["target"])] = result[ edge_props = result["edge_properties"]
"edge_properties"
] # Process string result, parse it to JSON dictionary
if isinstance(edge_props, str):
try:
import json
edge_props = json.loads(edge_props)
except json.JSONDecodeError:
logger.warning(f"Failed to parse edge properties string: {edge_props}")
continue
edges_dict[(result["source"], result["target"])] = edge_props
for result in backward_results: for result in backward_results:
if result["source"] and result["target"] and result["edge_properties"]: if result["source"] and result["target"] and result["edge_properties"]:
edges_dict[(result["source"], result["target"])] = result[ edge_props = result["edge_properties"]
"edge_properties"
] # Process string result, parse it to JSON dictionary
if isinstance(edge_props, str):
try:
import json
edge_props = json.loads(edge_props)
except json.JSONDecodeError:
logger.warning(f"Failed to parse edge properties string: {edge_props}")
continue
edges_dict[(result["source"], result["target"])] = edge_props
return edges_dict return edges_dict