feat: add history_messages to track pipeline processing progress

• Add shared history_messages list
• Track pipeline progress with messages
This commit is contained in:
yangdx
2025-02-28 13:53:40 +08:00
parent b090a22be7
commit 8cd45161f2
4 changed files with 77 additions and 18 deletions

View File

@@ -672,6 +672,10 @@ def create_document_routes(
# Convert to regular dict if it's a Manager.dict # Convert to regular dict if it's a Manager.dict
status_dict = dict(pipeline_status) status_dict = dict(pipeline_status)
# Convert history_messages to a regular list if it's a Manager.list
if "history_messages" in status_dict:
status_dict["history_messages"] = list(status_dict["history_messages"])
# Format the job_start time if it exists # Format the job_start time if it exists
if status_dict.get("job_start"): if status_dict.get("job_start"):
status_dict["job_start"] = str(status_dict["job_start"]) status_dict["job_start"] = str(status_dict["job_start"])

View File

@@ -83,6 +83,10 @@ def initialize_share_data(workers: int = 1):
# Initialize pipeline status for document indexing control # Initialize pipeline status for document indexing control
pipeline_namespace = get_namespace_data("pipeline_status") pipeline_namespace = get_namespace_data("pipeline_status")
# 创建一个共享列表对象用于 history_messages
history_messages = _manager.list() if is_multiprocess else []
pipeline_namespace.update({ pipeline_namespace.update({
"busy": False, # Control concurrent processes "busy": False, # Control concurrent processes
"job_name": "Default Job", # Current job name (indexing files/indexing texts) "job_name": "Default Job", # Current job name (indexing files/indexing texts)
@@ -91,7 +95,8 @@ def initialize_share_data(workers: int = 1):
"batchs": 0, # Number of batches for processing documents "batchs": 0, # Number of batches for processing documents
"cur_batch": 0, # Current processing batch "cur_batch": 0, # Current processing batch
"request_pending": False, # Flag for pending request for processing "request_pending": False, # Flag for pending request for processing
"latest_message": "" # Latest message from pipeline processing "latest_message": "", # Latest message from pipeline processing
"history_messages": history_messages, # 使用共享列表对象
}) })

View File

@@ -681,6 +681,13 @@ class LightRAG:
with storage_lock: with storage_lock:
if not pipeline_status.get("busy", False): if not pipeline_status.get("busy", False):
# No other process is busy, we can process documents # No other process is busy, we can process documents
# 获取当前的 history_messages 列表
current_history = pipeline_status.get("history_messages", [])
# 清空当前列表内容但保持同一个列表对象
if hasattr(current_history, "clear"):
current_history.clear()
pipeline_status.update({ pipeline_status.update({
"busy": True, "busy": True,
"job_name": "indexing files", "job_name": "indexing files",
@@ -688,7 +695,10 @@ class LightRAG:
"docs": 0, "docs": 0,
"batchs": 0, "batchs": 0,
"cur_batch": 0, "cur_batch": 0,
"request_pending": False # Clear any previous request "request_pending": False, # Clear any previous request
"latest_message": "",
# 保持使用同一个列表对象
"history_messages": current_history,
}) })
process_documents = True process_documents = True
else: else:
@@ -715,7 +725,10 @@ class LightRAG:
to_process_docs.update(pending_docs) to_process_docs.update(pending_docs)
if not to_process_docs: if not to_process_docs:
logger.info("All documents have been processed or are duplicates") log_message = "All documents have been processed or are duplicates"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
break break
# Update pipeline status with document count (with lock) # Update pipeline status with document count (with lock)
@@ -734,7 +747,10 @@ class LightRAG:
"cur_batch": 0 "cur_batch": 0
}) })
logger.info(f"Number of batches to process: {len(docs_batches)}.") log_message = f"Number of batches to process: {len(docs_batches)}."
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
batches: list[Any] = [] batches: list[Any] = []
# 3. iterate over batches # 3. iterate over batches
@@ -747,7 +763,10 @@ class LightRAG:
docs_batch: list[tuple[str, DocProcessingStatus]], docs_batch: list[tuple[str, DocProcessingStatus]],
size_batch: int, size_batch: int,
) -> None: ) -> None:
logger.info(f"Start processing batch {batch_idx + 1} of {size_batch}.") log_message = f"Start processing batch {batch_idx + 1} of {size_batch}."
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# 4. iterate over batch # 4. iterate over batch
for doc_id_processing_status in docs_batch: for doc_id_processing_status in docs_batch:
doc_id, status_doc = doc_id_processing_status doc_id, status_doc = doc_id_processing_status
@@ -818,7 +837,10 @@ class LightRAG:
} }
) )
continue continue
logger.info(f"Completed batch {batch_idx + 1} of {len(docs_batches)}.") log_message = f"Completed batch {batch_idx + 1} of {len(docs_batches)}."
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
batches.append(batch(batch_idx, docs_batch, len(docs_batches))) batches.append(batch(batch_idx, docs_batch, len(docs_batches)))
@@ -836,13 +858,19 @@ class LightRAG:
if not has_pending_request: if not has_pending_request:
break break
logger.info("Processing additional documents due to pending request") log_message = "Processing additional documents due to pending request"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
finally: finally:
# Always reset busy status when done or if an exception occurs (with lock) # Always reset busy status when done or if an exception occurs (with lock)
with storage_lock: with storage_lock:
pipeline_status["busy"] = False pipeline_status["busy"] = False
logger.info("Document processing pipeline completed") log_message = "Document processing pipeline completed"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
async def _process_entity_relation_graph(self, chunk: dict[str, Any]) -> None: async def _process_entity_relation_graph(self, chunk: dict[str, Any]) -> None:
try: try:
@@ -873,7 +901,15 @@ class LightRAG:
if storage_inst is not None if storage_inst is not None
] ]
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
logger.info("All Insert done")
log_message = "All Insert done"
logger.info(log_message)
# 获取 pipeline_status 并更新 latest_message 和 history_messages
from lightrag.kg.shared_storage import get_namespace_data
pipeline_status = get_namespace_data("pipeline_status")
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
def insert_custom_kg(self, custom_kg: dict[str, Any]) -> None: def insert_custom_kg(self, custom_kg: dict[str, Any]) -> None:
loop = always_get_an_event_loop() loop = always_get_an_event_loop()

View File

@@ -336,6 +336,9 @@ async def extract_entities(
global_config: dict[str, str], global_config: dict[str, str],
llm_response_cache: BaseKVStorage | None = None, llm_response_cache: BaseKVStorage | None = None,
) -> None: ) -> None:
# 在函数开始处添加获取 pipeline_status 的代码
from lightrag.kg.shared_storage import get_namespace_data
pipeline_status = get_namespace_data("pipeline_status")
use_llm_func: callable = global_config["llm_model_func"] use_llm_func: callable = global_config["llm_model_func"]
entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"] entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"]
enable_llm_cache_for_entity_extract: bool = global_config[ enable_llm_cache_for_entity_extract: bool = global_config[
@@ -496,9 +499,10 @@ async def extract_entities(
processed_chunks += 1 processed_chunks += 1
entities_count = len(maybe_nodes) entities_count = len(maybe_nodes)
relations_count = len(maybe_edges) relations_count = len(maybe_edges)
logger.info( log_message = f" Chunk {processed_chunks}/{total_chunks}: extracted {entities_count} entities and {relations_count} relationships (deduplicated)"
f" Chunk {processed_chunks}/{total_chunks}: extracted {entities_count} entities and {relations_count} relationships (deduplicated)" logger.info(log_message)
) pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
return dict(maybe_nodes), dict(maybe_edges) return dict(maybe_nodes), dict(maybe_edges)
tasks = [_process_single_content(c) for c in ordered_chunks] tasks = [_process_single_content(c) for c in ordered_chunks]
@@ -527,17 +531,27 @@ async def extract_entities(
) )
if not (all_entities_data or all_relationships_data): if not (all_entities_data or all_relationships_data):
logger.info("Didn't extract any entities and relationships.") log_message = "Didn't extract any entities and relationships."
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
return return
if not all_entities_data: if not all_entities_data:
logger.info("Didn't extract any entities") log_message = "Didn't extract any entities"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
if not all_relationships_data: if not all_relationships_data:
logger.info("Didn't extract any relationships") log_message = "Didn't extract any relationships"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
logger.info( log_message = f"Extracted {len(all_entities_data)} entities and {len(all_relationships_data)} relationships (deduplicated)"
f"Extracted {len(all_entities_data)} entities and {len(all_relationships_data)} relationships (deduplicated)" logger.info(log_message)
) pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
verbose_debug( verbose_debug(
f"New entities:{all_entities_data}, relationships:{all_relationships_data}" f"New entities:{all_entities_data}, relationships:{all_relationships_data}"
) )