diff --git a/env.example b/env.example index 3dbca084..cb6b8e28 100644 --- a/env.example +++ b/env.example @@ -43,11 +43,15 @@ WEBUI_DESCRIPTION="Simple and Fast Graph Based RAG System" SUMMARY_LANGUAGE=English # CHUNK_SIZE=1200 # CHUNK_OVERLAP_SIZE=100 -### Max tokens for entity or relations summary -# MAX_TOKEN_SUMMARY=500 + ### Number of parallel processing documents in one patch # MAX_PARALLEL_INSERT=2 +### Max tokens for entity/relations description after merge +# MAX_TOKEN_SUMMARY=500 +### Number of entities/edges to trigger LLM re-summary on merge ( at least 3 is recommented) +# FORCE_LLM_SUMMARY_ON_MERGE=6 + ### Num of chunks send to Embedding in single request # EMBEDDING_BATCH_NUM=32 ### Max concurrency requests for Embedding diff --git a/lightrag/api/utils_api.py b/lightrag/api/utils_api.py index e9772854..a1dade88 100644 --- a/lightrag/api/utils_api.py +++ b/lightrag/api/utils_api.py @@ -261,8 +261,12 @@ def display_splash_screen(args: argparse.Namespace) -> None: ASCIIColors.yellow(f"{args.chunk_overlap_size}") ASCIIColors.white(" ├─ Cosine Threshold: ", end="") ASCIIColors.yellow(f"{args.cosine_threshold}") - ASCIIColors.white(" └─ Top-K: ", end="") + ASCIIColors.white(" ├─ Top-K: ", end="") ASCIIColors.yellow(f"{args.top_k}") + ASCIIColors.white(" ├─ Max Token Summary: ", end="") + ASCIIColors.yellow(f"{int(os.getenv('MAX_TOKEN_SUMMARY', 500))}") + ASCIIColors.white(" └─ Force LLM Summary on Merge: ", end="") + ASCIIColors.yellow(f"{int(os.getenv('FORCE_LLM_SUMMARY_ON_MERGE', 6))}") # System Configuration ASCIIColors.magenta("\n💾 Storage Configuration:") diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 50ee079a..153dc27a 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -103,8 +103,10 @@ class LightRAG: entity_extract_max_gleaning: int = field(default=1) """Maximum number of entity extraction attempts for ambiguous content.""" - entity_summary_to_max_tokens: int = field( - default=int(os.getenv("MAX_TOKEN_SUMMARY", 500)) + summary_to_max_tokens: int = field(default=int(os.getenv("MAX_TOKEN_SUMMARY", 500))) + + force_llm_summary_on_merge: int = field( + default=int(os.getenv("FORCE_LLM_SUMMARY_ON_MERGE", 6)) ) # Text chunking diff --git a/lightrag/operate.py b/lightrag/operate.py index 53ce47e9..3f5c3dfb 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -117,15 +117,13 @@ async def _handle_entity_relation_summary( use_llm_func: callable = global_config["llm_model_func"] llm_max_tokens = global_config["llm_model_max_token_size"] tiktoken_model_name = global_config["tiktoken_model_name"] - summary_max_tokens = global_config["entity_summary_to_max_tokens"] + summary_max_tokens = global_config["summary_to_max_tokens"] + language = global_config["addon_params"].get( "language", PROMPTS["DEFAULT_LANGUAGE"] ) tokens = encode_string_by_tiktoken(description, model_name=tiktoken_model_name) - if len(tokens) < summary_max_tokens: # No need for summary - return description - prompt_template = PROMPTS["summarize_entity_descriptions"] use_description = decode_tokens_by_tiktoken( tokens[:llm_max_tokens], model_name=tiktoken_model_name @@ -138,14 +136,6 @@ async def _handle_entity_relation_summary( use_prompt = prompt_template.format(**context_base) logger.debug(f"Trigger summary: {entity_or_relation_name}") - # Update pipeline status when LLM summary is needed - status_message = " == Use LLM == to re-summary description..." - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) - # Use LLM function with cache summary = await use_llm_func_with_cache( use_prompt, @@ -270,23 +260,34 @@ async def _merge_nodes_then_upsert( set([dp["file_path"] for dp in nodes_data] + already_file_paths) ) - if len(nodes_data) > 1 or len(already_entity_types) > 0: - # Update pipeline status when a node that needs merging - status_message = f"Merging entity: {entity_name} | {len(nodes_data)}+{len(already_entity_types)}" - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) + force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] - description = await _handle_entity_relation_summary( - entity_name, - description, - global_config, - pipeline_status, - pipeline_status_lock, - llm_response_cache, - ) + num_fragment = description.count(GRAPH_FIELD_SEP) + 1 + num_new_fragment = len(set([dp["description"] for dp in nodes_data])) + + if num_fragment > 1: + if num_fragment >= force_llm_summary_on_merge: + status_message = f"LLM merge N: {entity_name} | {num_new_fragment}+{num_fragment-num_new_fragment}" + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + description = await _handle_entity_relation_summary( + entity_name, + description, + global_config, + pipeline_status, + pipeline_status_lock, + llm_response_cache, + ) + else: + status_message = f"Merge N: {entity_name} | {num_new_fragment}+{num_fragment-num_new_fragment}" + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) node_data = dict( entity_id=entity_name, @@ -398,23 +399,36 @@ async def _merge_edges_then_upsert( }, ) - if len(edges_data) > 1 or len(already_weights) > 0: - # Update pipeline status when a edge that needs merging - status_message = f"Merging edge::: {src_id} - {tgt_id} | {len(edges_data)}+{len(already_weights)}" - logger.info(status_message) - if pipeline_status is not None and pipeline_status_lock is not None: - async with pipeline_status_lock: - pipeline_status["latest_message"] = status_message - pipeline_status["history_messages"].append(status_message) + force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] - description = await _handle_entity_relation_summary( - f"({src_id}, {tgt_id})", - description, - global_config, - pipeline_status, - pipeline_status_lock, - llm_response_cache, - ) + num_fragment = description.count(GRAPH_FIELD_SEP) + 1 + num_new_fragment = len( + set([dp["description"] for dp in edges_data if dp.get("description")]) + ) + + if num_fragment > 1: + if num_fragment >= force_llm_summary_on_merge: + status_message = f"LLM merge E: {src_id} - {tgt_id} | {num_new_fragment}+{num_fragment-num_new_fragment}" + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) + description = await _handle_entity_relation_summary( + f"({src_id}, {tgt_id})", + description, + global_config, + pipeline_status, + pipeline_status_lock, + llm_response_cache, + ) + else: + status_message = f"Merge E: {src_id} - {tgt_id} | {num_new_fragment}+{num_fragment-num_new_fragment}" + logger.info(status_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = status_message + pipeline_status["history_messages"].append(status_message) await knowledge_graph_inst.upsert_edge( src_id, diff --git a/lightrag/utils.py b/lightrag/utils.py index dd7f217a..fd188498 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -967,7 +967,7 @@ async def use_llm_func_with_cache( res: str = await use_llm_func(input_text, **kwargs) # Save to cache - logger.info(f"Saving LLM cache for {arg_hash}") + logger.info(f" == LLM cache == saving {arg_hash}") await save_to_cache( llm_response_cache, CacheData(