Fix linting
This commit is contained in:
@@ -833,7 +833,7 @@ class Neo4JStorage(BaseGraphStorage):
|
|||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Subgraph query successful | Node count: {len(result.nodes)} | Edge count: {len(result.edges)}"
|
f"Subgraph query successful | Node count: {len(result.nodes)} | Edge count: {len(result.edges)}"
|
||||||
)
|
)
|
||||||
|
|
||||||
except neo4jExceptions.ClientError as e:
|
except neo4jExceptions.ClientError as e:
|
||||||
logger.warning(f"APOC plugin error: {str(e)}")
|
logger.warning(f"APOC plugin error: {str(e)}")
|
||||||
|
@@ -442,9 +442,10 @@ async def extract_entities(
|
|||||||
total_chunks = len(ordered_chunks)
|
total_chunks = len(ordered_chunks)
|
||||||
total_entities_count = 0
|
total_entities_count = 0
|
||||||
total_relations_count = 0
|
total_relations_count = 0
|
||||||
|
|
||||||
# Get lock manager from shared storage
|
# Get lock manager from shared storage
|
||||||
from .kg.shared_storage import get_graph_db_lock
|
from .kg.shared_storage import get_graph_db_lock
|
||||||
|
|
||||||
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
||||||
|
|
||||||
async def _user_llm_func_with_cache(
|
async def _user_llm_func_with_cache(
|
||||||
@@ -602,11 +603,11 @@ async def extract_entities(
|
|||||||
async with pipeline_status_lock:
|
async with pipeline_status_lock:
|
||||||
pipeline_status["latest_message"] = log_message
|
pipeline_status["latest_message"] = log_message
|
||||||
pipeline_status["history_messages"].append(log_message)
|
pipeline_status["history_messages"].append(log_message)
|
||||||
|
|
||||||
# Use graph database lock to ensure atomic merges and updates
|
# Use graph database lock to ensure atomic merges and updates
|
||||||
chunk_entities_data = []
|
chunk_entities_data = []
|
||||||
chunk_relationships_data = []
|
chunk_relationships_data = []
|
||||||
|
|
||||||
async with graph_db_lock:
|
async with graph_db_lock:
|
||||||
# Process and update entities
|
# Process and update entities
|
||||||
for entity_name, entities in maybe_nodes.items():
|
for entity_name, entities in maybe_nodes.items():
|
||||||
@@ -614,16 +615,20 @@ async def extract_entities(
|
|||||||
entity_name, entities, knowledge_graph_inst, global_config
|
entity_name, entities, knowledge_graph_inst, global_config
|
||||||
)
|
)
|
||||||
chunk_entities_data.append(entity_data)
|
chunk_entities_data.append(entity_data)
|
||||||
|
|
||||||
# Process and update relationships
|
# Process and update relationships
|
||||||
for edge_key, edges in maybe_edges.items():
|
for edge_key, edges in maybe_edges.items():
|
||||||
# Ensure edge direction consistency
|
# Ensure edge direction consistency
|
||||||
sorted_edge_key = tuple(sorted(edge_key))
|
sorted_edge_key = tuple(sorted(edge_key))
|
||||||
edge_data = await _merge_edges_then_upsert(
|
edge_data = await _merge_edges_then_upsert(
|
||||||
sorted_edge_key[0], sorted_edge_key[1], edges, knowledge_graph_inst, global_config
|
sorted_edge_key[0],
|
||||||
|
sorted_edge_key[1],
|
||||||
|
edges,
|
||||||
|
knowledge_graph_inst,
|
||||||
|
global_config,
|
||||||
)
|
)
|
||||||
chunk_relationships_data.append(edge_data)
|
chunk_relationships_data.append(edge_data)
|
||||||
|
|
||||||
# Update vector database (within the same lock to ensure atomicity)
|
# Update vector database (within the same lock to ensure atomicity)
|
||||||
if entity_vdb is not None and chunk_entities_data:
|
if entity_vdb is not None and chunk_entities_data:
|
||||||
data_for_vdb = {
|
data_for_vdb = {
|
||||||
@@ -637,7 +642,7 @@ async def extract_entities(
|
|||||||
for dp in chunk_entities_data
|
for dp in chunk_entities_data
|
||||||
}
|
}
|
||||||
await entity_vdb.upsert(data_for_vdb)
|
await entity_vdb.upsert(data_for_vdb)
|
||||||
|
|
||||||
if relationships_vdb is not None and chunk_relationships_data:
|
if relationships_vdb is not None and chunk_relationships_data:
|
||||||
data_for_vdb = {
|
data_for_vdb = {
|
||||||
compute_mdhash_id(dp["src_id"] + dp["tgt_id"], prefix="rel-"): {
|
compute_mdhash_id(dp["src_id"] + dp["tgt_id"], prefix="rel-"): {
|
||||||
@@ -651,7 +656,7 @@ async def extract_entities(
|
|||||||
for dp in chunk_relationships_data
|
for dp in chunk_relationships_data
|
||||||
}
|
}
|
||||||
await relationships_vdb.upsert(data_for_vdb)
|
await relationships_vdb.upsert(data_for_vdb)
|
||||||
|
|
||||||
# Update counters
|
# Update counters
|
||||||
total_entities_count += len(chunk_entities_data)
|
total_entities_count += len(chunk_entities_data)
|
||||||
total_relations_count += len(chunk_relationships_data)
|
total_relations_count += len(chunk_relationships_data)
|
||||||
|
Reference in New Issue
Block a user