From 73452e63fa76f6b710de42ca34e4f5823c27e01a Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 8 Mar 2025 22:48:12 +0800 Subject: [PATCH] Add async lock for atomic graph database operations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Introduced graph_db_lock mechanism • Ensured atomic node/edge merge and insert operation --- lightrag/operate.py | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/lightrag/operate.py b/lightrag/operate.py index 30983145..f89a551d 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -519,19 +519,24 @@ async def extract_entities( for k, v in m_edges.items(): maybe_edges[tuple(sorted(k))].extend(v) - all_entities_data = await asyncio.gather( - *[ - _merge_nodes_then_upsert(k, v, knowledge_graph_inst, global_config) - for k, v in maybe_nodes.items() - ] - ) + from .kg.shared_storage import get_graph_db_lock + graph_db_lock = get_graph_db_lock(enable_logging = True) + + # Ensure that nodes and edges are merged and upserted atomically + async with graph_db_lock: + all_entities_data = await asyncio.gather( + *[ + _merge_nodes_then_upsert(k, v, knowledge_graph_inst, global_config) + for k, v in maybe_nodes.items() + ] + ) - all_relationships_data = await asyncio.gather( - *[ - _merge_edges_then_upsert(k[0], k[1], v, knowledge_graph_inst, global_config) - for k, v in maybe_edges.items() - ] - ) + all_relationships_data = await asyncio.gather( + *[ + _merge_edges_then_upsert(k[0], k[1], v, knowledge_graph_inst, global_config) + for k, v in maybe_edges.items() + ] + ) if not (all_entities_data or all_relationships_data): log_message = "Didn't extract any entities and relationships."