diff --git a/lightrag/kg/neo4j_impl.py b/lightrag/kg/neo4j_impl.py index 82bb54a2..ba53a349 100644 --- a/lightrag/kg/neo4j_impl.py +++ b/lightrag/kg/neo4j_impl.py @@ -833,7 +833,7 @@ class Neo4JStorage(BaseGraphStorage): logger.info( f"Subgraph query successful | Node count: {len(result.nodes)} | Edge count: {len(result.edges)}" - ) + ) except neo4jExceptions.ClientError as e: logger.warning(f"APOC plugin error: {str(e)}") diff --git a/lightrag/operate.py b/lightrag/operate.py index d14426ab..a944860a 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -442,9 +442,10 @@ async def extract_entities( total_chunks = len(ordered_chunks) total_entities_count = 0 total_relations_count = 0 - + # Get lock manager from shared storage from .kg.shared_storage import get_graph_db_lock + graph_db_lock = get_graph_db_lock(enable_logging=False) async def _user_llm_func_with_cache( @@ -602,11 +603,11 @@ async def extract_entities( async with pipeline_status_lock: pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) - + # Use graph database lock to ensure atomic merges and updates chunk_entities_data = [] chunk_relationships_data = [] - + async with graph_db_lock: # Process and update entities for entity_name, entities in maybe_nodes.items(): @@ -614,16 +615,20 @@ async def extract_entities( entity_name, entities, knowledge_graph_inst, global_config ) chunk_entities_data.append(entity_data) - + # Process and update relationships for edge_key, edges in maybe_edges.items(): # Ensure edge direction consistency sorted_edge_key = tuple(sorted(edge_key)) edge_data = await _merge_edges_then_upsert( - sorted_edge_key[0], sorted_edge_key[1], edges, knowledge_graph_inst, global_config + sorted_edge_key[0], + sorted_edge_key[1], + edges, + knowledge_graph_inst, + global_config, ) chunk_relationships_data.append(edge_data) - + # Update vector database (within the same lock to ensure atomicity) if entity_vdb is not None and chunk_entities_data: data_for_vdb = { @@ -637,7 +642,7 @@ async def extract_entities( for dp in chunk_entities_data } await entity_vdb.upsert(data_for_vdb) - + if relationships_vdb is not None and chunk_relationships_data: data_for_vdb = { compute_mdhash_id(dp["src_id"] + dp["tgt_id"], prefix="rel-"): { @@ -651,7 +656,7 @@ async def extract_entities( for dp in chunk_relationships_data } await relationships_vdb.upsert(data_for_vdb) - + # Update counters total_entities_count += len(chunk_entities_data) total_relations_count += len(chunk_relationships_data)