Add async lock for atomic graph database operations

• Introduced graph_db_lock mechanism
• Ensured atomic node/edge merge and insert operation
This commit is contained in:
yangdx
2025-03-08 22:48:12 +08:00
parent 95c06f1bde
commit 73452e63fa

View File

@@ -519,19 +519,24 @@ async def extract_entities(
for k, v in m_edges.items(): for k, v in m_edges.items():
maybe_edges[tuple(sorted(k))].extend(v) maybe_edges[tuple(sorted(k))].extend(v)
all_entities_data = await asyncio.gather( from .kg.shared_storage import get_graph_db_lock
*[ graph_db_lock = get_graph_db_lock(enable_logging = True)
_merge_nodes_then_upsert(k, v, knowledge_graph_inst, global_config)
for k, v in maybe_nodes.items() # Ensure that nodes and edges are merged and upserted atomically
] async with graph_db_lock:
) all_entities_data = await asyncio.gather(
*[
_merge_nodes_then_upsert(k, v, knowledge_graph_inst, global_config)
for k, v in maybe_nodes.items()
]
)
all_relationships_data = await asyncio.gather( all_relationships_data = await asyncio.gather(
*[ *[
_merge_edges_then_upsert(k[0], k[1], v, knowledge_graph_inst, global_config) _merge_edges_then_upsert(k[0], k[1], v, knowledge_graph_inst, global_config)
for k, v in maybe_edges.items() for k, v in maybe_edges.items()
] ]
) )
if not (all_entities_data or all_relationships_data): if not (all_entities_data or all_relationships_data):
log_message = "Didn't extract any entities and relationships." log_message = "Didn't extract any entities and relationships."