diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index d2ff91f1..e274f4c4 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -672,6 +672,10 @@ def create_document_routes( # Convert to regular dict if it's a Manager.dict status_dict = dict(pipeline_status) + # Convert history_messages to a regular list if it's a Manager.list + if "history_messages" in status_dict: + status_dict["history_messages"] = list(status_dict["history_messages"]) + # Format the job_start time if it exists if status_dict.get("job_start"): status_dict["job_start"] = str(status_dict["job_start"]) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index a4970321..3a21dc5c 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -83,6 +83,10 @@ def initialize_share_data(workers: int = 1): # Initialize pipeline status for document indexing control pipeline_namespace = get_namespace_data("pipeline_status") + + # 创建一个共享列表对象用于 history_messages + history_messages = _manager.list() if is_multiprocess else [] + pipeline_namespace.update({ "busy": False, # Control concurrent processes "job_name": "Default Job", # Current job name (indexing files/indexing texts) @@ -91,7 +95,8 @@ def initialize_share_data(workers: int = 1): "batchs": 0, # Number of batches for processing documents "cur_batch": 0, # Current processing batch "request_pending": False, # Flag for pending request for processing - "latest_message": "" # Latest message from pipeline processing + "latest_message": "", # Latest message from pipeline processing + "history_messages": history_messages, # 使用共享列表对象 }) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index b95da952..ee5bc397 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -681,6 +681,13 @@ class LightRAG: with storage_lock: if not pipeline_status.get("busy", False): # No other process is busy, we can process documents + # 获取当前的 history_messages 列表 + current_history = pipeline_status.get("history_messages", []) + + # 清空当前列表内容但保持同一个列表对象 + if hasattr(current_history, "clear"): + current_history.clear() + pipeline_status.update({ "busy": True, "job_name": "indexing files", @@ -688,7 +695,10 @@ class LightRAG: "docs": 0, "batchs": 0, "cur_batch": 0, - "request_pending": False # Clear any previous request + "request_pending": False, # Clear any previous request + "latest_message": "", + # 保持使用同一个列表对象 + "history_messages": current_history, }) process_documents = True else: @@ -715,7 +725,10 @@ class LightRAG: to_process_docs.update(pending_docs) if not to_process_docs: - logger.info("All documents have been processed or are duplicates") + log_message = "All documents have been processed or are duplicates" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) break # Update pipeline status with document count (with lock) @@ -734,7 +747,10 @@ class LightRAG: "cur_batch": 0 }) - logger.info(f"Number of batches to process: {len(docs_batches)}.") + log_message = f"Number of batches to process: {len(docs_batches)}." + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) batches: list[Any] = [] # 3. iterate over batches @@ -747,7 +763,10 @@ class LightRAG: docs_batch: list[tuple[str, DocProcessingStatus]], size_batch: int, ) -> None: - logger.info(f"Start processing batch {batch_idx + 1} of {size_batch}.") + log_message = f"Start processing batch {batch_idx + 1} of {size_batch}." + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) # 4. iterate over batch for doc_id_processing_status in docs_batch: doc_id, status_doc = doc_id_processing_status @@ -818,7 +837,10 @@ class LightRAG: } ) continue - logger.info(f"Completed batch {batch_idx + 1} of {len(docs_batches)}.") + log_message = f"Completed batch {batch_idx + 1} of {len(docs_batches)}." + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) batches.append(batch(batch_idx, docs_batch, len(docs_batches))) @@ -836,13 +858,19 @@ class LightRAG: if not has_pending_request: break - logger.info("Processing additional documents due to pending request") + log_message = "Processing additional documents due to pending request" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) finally: # Always reset busy status when done or if an exception occurs (with lock) with storage_lock: pipeline_status["busy"] = False - logger.info("Document processing pipeline completed") + log_message = "Document processing pipeline completed" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) async def _process_entity_relation_graph(self, chunk: dict[str, Any]) -> None: try: @@ -873,7 +901,15 @@ class LightRAG: if storage_inst is not None ] await asyncio.gather(*tasks) - logger.info("All Insert done") + + log_message = "All Insert done" + logger.info(log_message) + + # 获取 pipeline_status 并更新 latest_message 和 history_messages + from lightrag.kg.shared_storage import get_namespace_data + pipeline_status = get_namespace_data("pipeline_status") + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) def insert_custom_kg(self, custom_kg: dict[str, Any]) -> None: loop = always_get_an_event_loop() diff --git a/lightrag/operate.py b/lightrag/operate.py index e3f445bb..44a68655 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -336,6 +336,9 @@ async def extract_entities( global_config: dict[str, str], llm_response_cache: BaseKVStorage | None = None, ) -> None: + # 在函数开始处添加获取 pipeline_status 的代码 + from lightrag.kg.shared_storage import get_namespace_data + pipeline_status = get_namespace_data("pipeline_status") use_llm_func: callable = global_config["llm_model_func"] entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"] enable_llm_cache_for_entity_extract: bool = global_config[ @@ -496,9 +499,10 @@ async def extract_entities( processed_chunks += 1 entities_count = len(maybe_nodes) relations_count = len(maybe_edges) - logger.info( - f" Chunk {processed_chunks}/{total_chunks}: extracted {entities_count} entities and {relations_count} relationships (deduplicated)" - ) + log_message = f" Chunk {processed_chunks}/{total_chunks}: extracted {entities_count} entities and {relations_count} relationships (deduplicated)" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) return dict(maybe_nodes), dict(maybe_edges) tasks = [_process_single_content(c) for c in ordered_chunks] @@ -527,17 +531,27 @@ async def extract_entities( ) if not (all_entities_data or all_relationships_data): - logger.info("Didn't extract any entities and relationships.") + log_message = "Didn't extract any entities and relationships." + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) return if not all_entities_data: - logger.info("Didn't extract any entities") + log_message = "Didn't extract any entities" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) if not all_relationships_data: - logger.info("Didn't extract any relationships") + log_message = "Didn't extract any relationships" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) - logger.info( - f"Extracted {len(all_entities_data)} entities and {len(all_relationships_data)} relationships (deduplicated)" - ) + log_message = f"Extracted {len(all_entities_data)} entities and {len(all_relationships_data)} relationships (deduplicated)" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) verbose_debug( f"New entities:{all_entities_data}, relationships:{all_relationships_data}" )