Add duplicate edge upsert checking and logging

This commit is contained in:
yangdx
2025-03-08 11:36:24 +08:00
parent 22a93fb717
commit fb4a4c736e

View File

@@ -412,9 +412,7 @@ class Neo4JStorage(BaseGraphStorage):
result = await session.run(query) result = await session.run(query)
try: try:
records = await result.fetch( records = await result.fetch(2)
2
) # Get up to 2 records to check for duplicates
if len(records) > 1: if len(records) > 1:
logger.warning( logger.warning(
@@ -552,20 +550,20 @@ class Neo4JStorage(BaseGraphStorage):
label = self._ensure_label(node_id) label = self._ensure_label(node_id)
properties = node_data properties = node_data
async def _do_upsert(tx: AsyncManagedTransaction):
query = f"""
MERGE (n:`{label}`)
SET n += $properties
"""
result = await tx.run(query, properties=properties)
logger.debug(
f"Upserted node with label '{label}' and properties: {properties}"
)
await result.consume() # Ensure result is fully consumed
try: try:
async with self._driver.session(database=self._DATABASE) as session: async with self._driver.session(database=self._DATABASE) as session:
await session.execute_write(_do_upsert) async def execute_upsert(tx: AsyncManagedTransaction):
query = f"""
MERGE (n:`{label}`)
SET n += $properties
"""
result = await tx.run(query, properties=properties)
logger.debug(
f"Upserted node with label '{label}' and properties: {properties}"
)
await result.consume() # Ensure result is fully consumed
await session.execute_write(execute_upsert)
except Exception as e: except Exception as e:
logger.error(f"Error during upsert: {str(e)}") logger.error(f"Error during upsert: {str(e)}")
raise raise
@@ -614,27 +612,39 @@ class Neo4JStorage(BaseGraphStorage):
f"Neo4j: target node with label '{target_label}' does not exist" f"Neo4j: target node with label '{target_label}' does not exist"
) )
async def _do_upsert_edge(tx: AsyncManagedTransaction):
query = f"""
MATCH (source:`{source_label}`)
WITH source
MATCH (target:`{target_label}`)
MERGE (source)-[r:DIRECTED]-(target)
SET r += $properties
RETURN r
"""
result = await tx.run(query, properties=edge_properties)
try:
record = await result.single()
logger.debug(
f"Upserted edge from '{source_label}' to '{target_label}' with properties: {edge_properties}, result: {record['r'] if record else None}"
)
finally:
await result.consume() # Ensure result is consumed
try: try:
async with self._driver.session(database=self._DATABASE) as session: async with self._driver.session(database=self._DATABASE) as session:
await session.execute_write(_do_upsert_edge) async def execute_upsert(tx: AsyncManagedTransaction):
query = f"""
MATCH (source:`{source_label}`)
WITH source
MATCH (target:`{target_label}`)
MERGE (source)-[r:DIRECTED]-(target)
SET r += $properties
RETURN r, source, target
"""
result = await tx.run(query, properties=edge_properties)
try:
records = await result.fetch(100)
if len(records) > 1:
source_nodes = [dict(r['source']) for r in records]
target_nodes = [dict(r['target']) for r in records]
logger.warning(
f"Multiple edges created: found {len(records)} results for edge between "
f"source label '{source_label}' and target label '{target_label}'. "
f"Source nodes: {source_nodes}, "
f"Target nodes: {target_nodes}. "
"Using first edge only."
)
if records:
logger.debug(
f"Upserted edge from '{source_label}' to '{target_label}' "
f"with properties: {edge_properties}"
)
finally:
await result.consume() # Ensure result is consumed
await session.execute_write(execute_upsert)
except Exception as e: except Exception as e:
logger.error(f"Error during edge upsert: {str(e)}") logger.error(f"Error during edge upsert: {str(e)}")
raise raise