Serialize merge precess to prevent race conditions

This commit is contained in:
yangdx
2025-04-03 21:33:46 +08:00
parent 2bb3822d05
commit 6b240fa9b2
3 changed files with 17 additions and 16 deletions

View File

@@ -832,7 +832,7 @@ class Neo4JStorage(BaseGraphStorage):
seen_edges.add(edge_id)
logger.info(
f"Process {os.getpid()} graph query return: {len(result.nodes)} nodes, {len(result.edges)} edges"
f"Subgraph query successful | Node count: {len(result.nodes)} | Edge count: {len(result.edges)}"
)
except neo4jExceptions.ClientError as e:

View File

@@ -1612,6 +1612,9 @@ class PGGraphStorage(BaseGraphStorage):
is_truncated=is_truncated,
)
logger.info(
f"Subgraph query successful | Node count: {len(kg.nodes)} | Edge count: {len(kg.edges)}"
)
return kg
async def drop(self) -> dict[str, str]:

View File

@@ -616,21 +616,19 @@ async def extract_entities(
# Ensure that nodes and edges are merged and upserted atomically
async with graph_db_lock:
all_entities_data = await asyncio.gather(
*[
_merge_nodes_then_upsert(k, v, knowledge_graph_inst, global_config)
for k, v in maybe_nodes.items()
]
)
# serial processing nodes under lock
all_entities_data = []
for k, v in maybe_nodes.items():
entity_data = await _merge_nodes_then_upsert(k, v, knowledge_graph_inst, global_config)
all_entities_data.append(entity_data)
all_relationships_data = await asyncio.gather(
*[
_merge_edges_then_upsert(
# serial processing edges under lock
all_relationships_data = []
for k, v in maybe_edges.items():
edge_data = await _merge_edges_then_upsert(
k[0], k[1], v, knowledge_graph_inst, global_config
)
for k, v in maybe_edges.items()
]
)
all_relationships_data.append(edge_data)
if not (all_entities_data or all_relationships_data):
log_message = "Didn't extract any entities and relationships."