From dcb2a724628cd484e34caa8cd56157365c313b57 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 4 May 2025 22:18:56 +0800 Subject: [PATCH 1/3] Fix JSON handling error for PostgreSQL graph storage --- lightrag/kg/postgres_impl.py | 80 +++++++++++++++++++++++++----------- 1 file changed, 57 insertions(+), 23 deletions(-) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 7dcd969b..b86c226b 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -1372,6 +1372,14 @@ class PGGraphStorage(BaseGraphStorage): if record: node = record[0] node_dict = node["n"]["properties"] + + # Process string result, parse it to JSON dictionary + if isinstance(node_dict, str): + try: + import json + node_dict = json.loads(node_dict) + except json.JSONDecodeError: + logger.warning(f"Failed to parse node string: {node_dict}") return node_dict return None @@ -1420,7 +1428,15 @@ class PGGraphStorage(BaseGraphStorage): record = await self._query(query) if record and record[0] and record[0]["edge_properties"]: result = record[0]["edge_properties"] - + + # Process string result, parse it to JSON dictionary + if isinstance(result, str): + try: + import json + result = json.loads(result) + except json.JSONDecodeError: + logger.warning(f"Failed to parse edge string: {result}") + return result async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None: @@ -1432,9 +1448,9 @@ class PGGraphStorage(BaseGraphStorage): query = """SELECT * FROM cypher('%s', $$ MATCH (n:base {entity_id: "%s"}) - OPTIONAL MATCH (n)-[]-(connected) - RETURN n, connected - $$) AS (n agtype, connected agtype)""" % ( + OPTIONAL MATCH (n)-[]-(connected:base) + RETURN n.entity_id AS source_id, connected.entity_id AS connected_id + $$) AS (source_id text, connected_id text)""" % ( self.graph_name, label, ) @@ -1442,20 +1458,11 @@ class PGGraphStorage(BaseGraphStorage): results = await self._query(query) edges = [] for record in results: - source_node = record["n"] if record["n"] else None - connected_node = record["connected"] if record["connected"] else None + source_id = record["source_id"] + connected_id = record["connected_id"] - if ( - source_node - and connected_node - and "properties" in source_node - and "properties" in connected_node - ): - source_label = source_node["properties"].get("entity_id") - target_label = connected_node["properties"].get("entity_id") - - if source_label and target_label: - edges.append((source_label, target_label)) + if source_id and connected_id: + edges.append((source_id, connected_id)) return edges @@ -1638,6 +1645,15 @@ class PGGraphStorage(BaseGraphStorage): for result in results: if result["node_id"] and result["n"]: node_dict = result["n"]["properties"] + + # Process string result, parse it to JSON dictionary + if isinstance(node_dict, str): + try: + import json + node_dict = json.loads(node_dict) + except json.JSONDecodeError: + logger.warning(f"Failed to parse node string in batch: {node_dict}") + # Remove the 'base' label if present in a 'labels' property if "labels" in node_dict: node_dict["labels"] = [ @@ -1789,15 +1805,33 @@ class PGGraphStorage(BaseGraphStorage): for result in forward_results: if result["source"] and result["target"] and result["edge_properties"]: - edges_dict[(result["source"], result["target"])] = result[ - "edge_properties" - ] + edge_props = result["edge_properties"] + + # Process string result, parse it to JSON dictionary + if isinstance(edge_props, str): + try: + import json + edge_props = json.loads(edge_props) + except json.JSONDecodeError: + logger.warning(f"Failed to parse edge properties string: {edge_props}") + continue + + edges_dict[(result["source"], result["target"])] = edge_props for result in backward_results: if result["source"] and result["target"] and result["edge_properties"]: - edges_dict[(result["source"], result["target"])] = result[ - "edge_properties" - ] + edge_props = result["edge_properties"] + + # Process string result, parse it to JSON dictionary + if isinstance(edge_props, str): + try: + import json + edge_props = json.loads(edge_props) + except json.JSONDecodeError: + logger.warning(f"Failed to parse edge properties string: {edge_props}") + continue + + edges_dict[(result["source"], result["target"])] = edge_props return edges_dict From 9a41de51fb45972abf87bf6210de4c196f4ac543 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 4 May 2025 22:20:44 +0800 Subject: [PATCH 2/3] Optimize log message --- lightrag/kg/postgres_impl.py | 35 +++++++++++++++++++++++------------ lightrag/lightrag.py | 12 ++++++++++-- 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index b86c226b..3d2955bb 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -1372,11 +1372,12 @@ class PGGraphStorage(BaseGraphStorage): if record: node = record[0] node_dict = node["n"]["properties"] - + # Process string result, parse it to JSON dictionary if isinstance(node_dict, str): try: import json + node_dict = json.loads(node_dict) except json.JSONDecodeError: logger.warning(f"Failed to parse node string: {node_dict}") @@ -1428,15 +1429,16 @@ class PGGraphStorage(BaseGraphStorage): record = await self._query(query) if record and record[0] and record[0]["edge_properties"]: result = record[0]["edge_properties"] - + # Process string result, parse it to JSON dictionary if isinstance(result, str): try: import json + result = json.loads(result) except json.JSONDecodeError: logger.warning(f"Failed to parse edge string: {result}") - + return result async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None: @@ -1645,15 +1647,18 @@ class PGGraphStorage(BaseGraphStorage): for result in results: if result["node_id"] and result["n"]: node_dict = result["n"]["properties"] - + # Process string result, parse it to JSON dictionary if isinstance(node_dict, str): try: import json + node_dict = json.loads(node_dict) except json.JSONDecodeError: - logger.warning(f"Failed to parse node string in batch: {node_dict}") - + logger.warning( + f"Failed to parse node string in batch: {node_dict}" + ) + # Remove the 'base' label if present in a 'labels' property if "labels" in node_dict: node_dict["labels"] = [ @@ -1806,31 +1811,37 @@ class PGGraphStorage(BaseGraphStorage): for result in forward_results: if result["source"] and result["target"] and result["edge_properties"]: edge_props = result["edge_properties"] - + # Process string result, parse it to JSON dictionary if isinstance(edge_props, str): try: import json + edge_props = json.loads(edge_props) except json.JSONDecodeError: - logger.warning(f"Failed to parse edge properties string: {edge_props}") + logger.warning( + f"Failed to parse edge properties string: {edge_props}" + ) continue - + edges_dict[(result["source"], result["target"])] = edge_props for result in backward_results: if result["source"] and result["target"] and result["edge_properties"]: edge_props = result["edge_properties"] - + # Process string result, parse it to JSON dictionary if isinstance(edge_props, str): try: import json + edge_props = json.loads(edge_props) except json.JSONDecodeError: - logger.warning(f"Failed to parse edge properties string: {edge_props}") + logger.warning( + f"Failed to parse edge properties string: {edge_props}" + ) continue - + edges_dict[(result["source"], result["target"])] = edge_props return edges_dict diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 080c02bc..17d36166 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -994,10 +994,14 @@ class LightRAG: except Exception as e: # Log error and update pipeline status - error_msg = f"Failed to extrat document {doc_id}: {traceback.format_exc()}" + logger.error(traceback.format_exc()) + error_msg = f"Failed to extrat document {current_file_number}/{total_files}: {file_path}" logger.error(error_msg) async with pipeline_status_lock: pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + traceback.format_exc() + ) pipeline_status["history_messages"].append(error_msg) # Cancel other tasks as they are no longer meaningful @@ -1080,10 +1084,14 @@ class LightRAG: except Exception as e: # Log error and update pipeline status - error_msg = f"Merging stage failed in document {doc_id}: {traceback.format_exc()}" + logger.error(traceback.format_exc()) + error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}" logger.error(error_msg) async with pipeline_status_lock: pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append( + traceback.format_exc() + ) pipeline_status["history_messages"].append(error_msg) # Persistent llm cache From e46a4b20793297664aa8eb2e21509bb267f05efc Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 4 May 2025 22:31:57 +0800 Subject: [PATCH 3/3] Optimize log message --- lightrag/kg/postgres_impl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 3d2955bb..44610eaf 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -1178,7 +1178,7 @@ class PGGraphStorage(BaseGraphStorage): with_age=True, graph_name=self.graph_name, ) - logger.info(f"Successfully executed: {query}") + # logger.info(f"Successfully executed: {query}") except Exception: continue