diff --git a/lightrag/operate.py b/lightrag/operate.py index 97a356ad..ee5dd469 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -106,6 +106,8 @@ async def _handle_entity_relation_summary( entity_or_relation_name: str, description: str, global_config: dict, + pipeline_status: dict = None, + pipeline_status_lock=None, ) -> str: """Handle entity relation summary For each entity or relation, input is the combined description of already existing description and new description. @@ -122,6 +124,14 @@ async def _handle_entity_relation_summary( tokens = encode_string_by_tiktoken(description, model_name=tiktoken_model_name) if len(tokens) < summary_max_tokens: # No need for summary return description + + # Update pipeline status when LLM summary is needed + status_message = "Use LLM to re-summary description..." + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) prompt_template = PROMPTS["summarize_entity_descriptions"] use_description = decode_tokens_by_tiktoken( tokens[:llm_max_tokens], model_name=tiktoken_model_name @@ -212,6 +222,8 @@ async def _merge_nodes_then_upsert( nodes_data: list[dict], knowledge_graph_inst: BaseGraphStorage, global_config: dict, + pipeline_status: dict = None, + pipeline_status_lock=None, ): """Get existing nodes from knowledge graph use name,if exists, merge data, else create, then upsert.""" already_entity_types = [] @@ -221,6 +233,14 @@ async def _merge_nodes_then_upsert( already_node = await knowledge_graph_inst.get_node(entity_name) if already_node is not None: + # Update pipeline status when a node that needs merging is found + status_message = f"Merging entities: {entity_name}" + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + already_entity_types.append(already_node["entity_type"]) already_source_ids.extend( split_string_by_multi_markers(already_node["source_id"], [GRAPH_FIELD_SEP]) @@ -249,7 +269,7 @@ async def _merge_nodes_then_upsert( logger.debug(f"file_path: {file_path}") description = await _handle_entity_relation_summary( - entity_name, description, global_config + entity_name, description, global_config, pipeline_status, pipeline_status_lock ) node_data = dict( entity_id=entity_name, @@ -272,6 +292,8 @@ async def _merge_edges_then_upsert( edges_data: list[dict], knowledge_graph_inst: BaseGraphStorage, global_config: dict, + pipeline_status: dict = None, + pipeline_status_lock=None, ): already_weights = [] already_source_ids = [] @@ -280,6 +302,14 @@ async def _merge_edges_then_upsert( already_file_paths = [] if await knowledge_graph_inst.has_edge(src_id, tgt_id): + # Update pipeline status when an edge that needs merging is found + status_message = f"Merging edges: {src_id} - {tgt_id}" + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + already_edge = await knowledge_graph_inst.get_edge(src_id, tgt_id) # Handle the case where get_edge returns None or missing fields if already_edge: @@ -358,7 +388,11 @@ async def _merge_edges_then_upsert( }, ) description = await _handle_entity_relation_summary( - f"({src_id}, {tgt_id})", description, global_config + f"({src_id}, {tgt_id})", + description, + global_config, + pipeline_status, + pipeline_status_lock, ) await knowledge_graph_inst.upsert_edge( src_id, @@ -613,7 +647,12 @@ async def extract_entities( # Process and update entities for entity_name, entities in maybe_nodes.items(): entity_data = await _merge_nodes_then_upsert( - entity_name, entities, knowledge_graph_inst, global_config + entity_name, + entities, + knowledge_graph_inst, + global_config, + pipeline_status, + pipeline_status_lock, ) chunk_entities_data.append(entity_data) @@ -627,6 +666,8 @@ async def extract_entities( edges, knowledge_graph_inst, global_config, + pipeline_status, + pipeline_status_lock, ) chunk_relationships_data.append(edge_data)