Add node and edge merging log to pipeline_status

This commit is contained in:
yangdx
2025-04-10 00:56:35 +08:00
parent 58e47ed364
commit 5d286dd0fa

View File

@@ -106,6 +106,8 @@ async def _handle_entity_relation_summary(
entity_or_relation_name: str,
description: str,
global_config: dict,
pipeline_status: dict = None,
pipeline_status_lock=None,
) -> str:
"""Handle entity relation summary
For each entity or relation, input is the combined description of already existing description and new description.
@@ -122,6 +124,14 @@ async def _handle_entity_relation_summary(
tokens = encode_string_by_tiktoken(description, model_name=tiktoken_model_name)
if len(tokens) < summary_max_tokens: # No need for summary
return description
# Update pipeline status when LLM summary is needed
status_message = "Use LLM to re-summary description..."
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
prompt_template = PROMPTS["summarize_entity_descriptions"]
use_description = decode_tokens_by_tiktoken(
tokens[:llm_max_tokens], model_name=tiktoken_model_name
@@ -212,6 +222,8 @@ async def _merge_nodes_then_upsert(
nodes_data: list[dict],
knowledge_graph_inst: BaseGraphStorage,
global_config: dict,
pipeline_status: dict = None,
pipeline_status_lock=None,
):
"""Get existing nodes from knowledge graph use name,if exists, merge data, else create, then upsert."""
already_entity_types = []
@@ -221,6 +233,14 @@ async def _merge_nodes_then_upsert(
already_node = await knowledge_graph_inst.get_node(entity_name)
if already_node is not None:
# Update pipeline status when a node that needs merging is found
status_message = f"Merging entities: {entity_name}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
already_entity_types.append(already_node["entity_type"])
already_source_ids.extend(
split_string_by_multi_markers(already_node["source_id"], [GRAPH_FIELD_SEP])
@@ -249,7 +269,7 @@ async def _merge_nodes_then_upsert(
logger.debug(f"file_path: {file_path}")
description = await _handle_entity_relation_summary(
entity_name, description, global_config
entity_name, description, global_config, pipeline_status, pipeline_status_lock
)
node_data = dict(
entity_id=entity_name,
@@ -272,6 +292,8 @@ async def _merge_edges_then_upsert(
edges_data: list[dict],
knowledge_graph_inst: BaseGraphStorage,
global_config: dict,
pipeline_status: dict = None,
pipeline_status_lock=None,
):
already_weights = []
already_source_ids = []
@@ -280,6 +302,14 @@ async def _merge_edges_then_upsert(
already_file_paths = []
if await knowledge_graph_inst.has_edge(src_id, tgt_id):
# Update pipeline status when an edge that needs merging is found
status_message = f"Merging edges: {src_id} - {tgt_id}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
already_edge = await knowledge_graph_inst.get_edge(src_id, tgt_id)
# Handle the case where get_edge returns None or missing fields
if already_edge:
@@ -358,7 +388,11 @@ async def _merge_edges_then_upsert(
},
)
description = await _handle_entity_relation_summary(
f"({src_id}, {tgt_id})", description, global_config
f"({src_id}, {tgt_id})",
description,
global_config,
pipeline_status,
pipeline_status_lock,
)
await knowledge_graph_inst.upsert_edge(
src_id,
@@ -613,7 +647,12 @@ async def extract_entities(
# Process and update entities
for entity_name, entities in maybe_nodes.items():
entity_data = await _merge_nodes_then_upsert(
entity_name, entities, knowledge_graph_inst, global_config
entity_name,
entities,
knowledge_graph_inst,
global_config,
pipeline_status,
pipeline_status_lock,
)
chunk_entities_data.append(entity_data)
@@ -627,6 +666,8 @@ async def extract_entities(
edges,
knowledge_graph_inst,
global_config,
pipeline_status,
pipeline_status_lock,
)
chunk_relationships_data.append(edge_data)