Merge pull request #1337 from danielaskdd/main
Only merge new entities/edges during gleaning stage
This commit is contained in:
@@ -902,6 +902,13 @@ class LightRAG:
|
|||||||
# Get file path from status document
|
# Get file path from status document
|
||||||
file_path = getattr(status_doc, "file_path", "unknown_source")
|
file_path = getattr(status_doc, "file_path", "unknown_source")
|
||||||
|
|
||||||
|
async with pipeline_status_lock:
|
||||||
|
log_message = f"Processing file: {file_path}"
|
||||||
|
pipeline_status["history_messages"].append(log_message)
|
||||||
|
log_message = f"Processing d-id: {doc_id}"
|
||||||
|
pipeline_status["latest_message"] = log_message
|
||||||
|
pipeline_status["history_messages"].append(log_message)
|
||||||
|
|
||||||
# Generate chunks from document
|
# Generate chunks from document
|
||||||
chunks: dict[str, Any] = {
|
chunks: dict[str, Any] = {
|
||||||
compute_mdhash_id(dp["content"], prefix="chunk-"): {
|
compute_mdhash_id(dp["content"], prefix="chunk-"): {
|
||||||
|
@@ -613,10 +613,16 @@ async def extract_entities(
|
|||||||
glean_result, chunk_key, file_path
|
glean_result, chunk_key, file_path
|
||||||
)
|
)
|
||||||
|
|
||||||
# Merge results
|
# Merge results - only add entities and edges with new names
|
||||||
for entity_name, entities in glean_nodes.items():
|
for entity_name, entities in glean_nodes.items():
|
||||||
|
if (
|
||||||
|
entity_name not in maybe_nodes
|
||||||
|
): # Only accetp entities with new name in gleaning stage
|
||||||
maybe_nodes[entity_name].extend(entities)
|
maybe_nodes[entity_name].extend(entities)
|
||||||
for edge_key, edges in glean_edges.items():
|
for edge_key, edges in glean_edges.items():
|
||||||
|
if (
|
||||||
|
edge_key not in maybe_edges
|
||||||
|
): # Only accetp edges with new name in gleaning stage
|
||||||
maybe_edges[edge_key].extend(edges)
|
maybe_edges[edge_key].extend(edges)
|
||||||
|
|
||||||
if now_glean_index == entity_extract_max_gleaning - 1:
|
if now_glean_index == entity_extract_max_gleaning - 1:
|
||||||
@@ -636,7 +642,7 @@ async def extract_entities(
|
|||||||
processed_chunks += 1
|
processed_chunks += 1
|
||||||
entities_count = len(maybe_nodes)
|
entities_count = len(maybe_nodes)
|
||||||
relations_count = len(maybe_edges)
|
relations_count = len(maybe_edges)
|
||||||
log_message = f" Chk {processed_chunks}/{total_chunks}: extracted {entities_count} Ent + {relations_count} Rel (deduplicated)"
|
log_message = f"Chk {processed_chunks}/{total_chunks}: extracted {entities_count} Ent + {relations_count} Rel (deduplicated)"
|
||||||
logger.info(log_message)
|
logger.info(log_message)
|
||||||
if pipeline_status is not None:
|
if pipeline_status is not None:
|
||||||
async with pipeline_status_lock:
|
async with pipeline_status_lock:
|
||||||
|
Reference in New Issue
Block a user