diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 153dc27a..0933a4d1 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -902,6 +902,13 @@ class LightRAG: # Get file path from status document file_path = getattr(status_doc, "file_path", "unknown_source") + async with pipeline_status_lock: + log_message = f"Processing file: {file_path}" + pipeline_status["history_messages"].append(log_message) + log_message = f"Processing d-id: {doc_id}" + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + # Generate chunks from document chunks: dict[str, Any] = { compute_mdhash_id(dp["content"], prefix="chunk-"): { diff --git a/lightrag/operate.py b/lightrag/operate.py index b1b803db..02d9c85e 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -613,11 +613,17 @@ async def extract_entities( glean_result, chunk_key, file_path ) - # Merge results + # Merge results - only add entities and edges with new names for entity_name, entities in glean_nodes.items(): - maybe_nodes[entity_name].extend(entities) + if ( + entity_name not in maybe_nodes + ): # Only accetp entities with new name in gleaning stage + maybe_nodes[entity_name].extend(entities) for edge_key, edges in glean_edges.items(): - maybe_edges[edge_key].extend(edges) + if ( + edge_key not in maybe_edges + ): # Only accetp edges with new name in gleaning stage + maybe_edges[edge_key].extend(edges) if now_glean_index == entity_extract_max_gleaning - 1: break @@ -636,7 +642,7 @@ async def extract_entities( processed_chunks += 1 entities_count = len(maybe_nodes) relations_count = len(maybe_edges) - log_message = f" Chk {processed_chunks}/{total_chunks}: extracted {entities_count} Ent + {relations_count} Rel (deduplicated)" + log_message = f"Chk {processed_chunks}/{total_chunks}: extracted {entities_count} Ent + {relations_count} Rel (deduplicated)" logger.info(log_message) if pipeline_status is not None: async with pipeline_status_lock: