From 3cca18c59c3d81cad721154a78673d131593d4b2 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 10 Mar 2025 16:48:59 +0800 Subject: [PATCH] Refactor pipeline status updates and entity extraction. - Let all parrallel jobs using one pipe_status objects - Improved thread safety with pipeline_status_lock - Only pipeline jobs can add message to pipe_status - Marked insert_custom_chunks as deprecated --- lightrag/lightrag.py | 22 ++++++++++++---------- lightrag/operate.py | 35 ++++++++++++++++++++++------------- 2 files changed, 34 insertions(+), 23 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 6abd7a17..5b42fa3d 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -583,6 +583,7 @@ class LightRAG: split_by_character, split_by_character_only ) + # TODO: deprecated, use insert instead def insert_custom_chunks( self, full_text: str, @@ -594,6 +595,7 @@ class LightRAG: self.ainsert_custom_chunks(full_text, text_chunks, doc_id) ) + # TODO: deprecated, use ainsert instead async def ainsert_custom_chunks( self, full_text: str, text_chunks: list[str], doc_id: str | None = None ) -> None: @@ -885,7 +887,7 @@ class LightRAG: self.chunks_vdb.upsert(chunks) ) entity_relation_task = asyncio.create_task( - self._process_entity_relation_graph(chunks) + self._process_entity_relation_graph(chunks, pipeline_status, pipeline_status_lock) ) full_docs_task = asyncio.create_task( self.full_docs.upsert( @@ -1000,21 +1002,23 @@ class LightRAG: pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) - async def _process_entity_relation_graph(self, chunk: dict[str, Any]) -> None: + async def _process_entity_relation_graph(self, chunk: dict[str, Any], pipeline_status=None, pipeline_status_lock=None) -> None: try: await extract_entities( chunk, knowledge_graph_inst=self.chunk_entity_relation_graph, entity_vdb=self.entities_vdb, relationships_vdb=self.relationships_vdb, - llm_response_cache=self.llm_response_cache, global_config=asdict(self), + pipeline_status=pipeline_status, + pipeline_status_lock=pipeline_status_lock, + llm_response_cache=self.llm_response_cache, ) except Exception as e: logger.error("Failed to extract entities and relationships") raise e - async def _insert_done(self) -> None: + async def _insert_done(self, pipeline_status=None, pipeline_status_lock=None) -> None: tasks = [ cast(StorageNameSpace, storage_inst).index_done_callback() for storage_inst in [ # type: ignore @@ -1033,12 +1037,10 @@ class LightRAG: log_message = "All Insert done" logger.info(log_message) - # 获取 pipeline_status 并更新 latest_message 和 history_messages - from lightrag.kg.shared_storage import get_namespace_data - - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + if pipeline_status is not None and pipeline_status_lock is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) def insert_custom_kg( self, custom_kg: dict[str, Any], full_doc_id: str = None diff --git a/lightrag/operate.py b/lightrag/operate.py index ba39fe89..5d6b7c7d 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -340,11 +340,10 @@ async def extract_entities( entity_vdb: BaseVectorStorage, relationships_vdb: BaseVectorStorage, global_config: dict[str, str], + pipeline_status: dict = None, + pipeline_status_lock = None, llm_response_cache: BaseKVStorage | None = None, ) -> None: - from lightrag.kg.shared_storage import get_namespace_data - - pipeline_status = await get_namespace_data("pipeline_status") use_llm_func: callable = global_config["llm_model_func"] entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"] enable_llm_cache_for_entity_extract: bool = global_config[ @@ -507,8 +506,10 @@ async def extract_entities( relations_count = len(maybe_edges) log_message = f" Chunk {processed_chunks}/{total_chunks}: extracted {entities_count} entities and {relations_count} relationships (deduplicated)" logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + if pipeline_status is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) return dict(maybe_nodes), dict(maybe_edges) tasks = [_process_single_content(c) for c in ordered_chunks] @@ -547,25 +548,33 @@ async def extract_entities( if not (all_entities_data or all_relationships_data): log_message = "Didn't extract any entities and relationships." logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + if pipeline_status is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) return if not all_entities_data: log_message = "Didn't extract any entities" logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + if pipeline_status is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) if not all_relationships_data: log_message = "Didn't extract any relationships" logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + if pipeline_status is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) log_message = f"Extracted {len(all_entities_data)} entities and {len(all_relationships_data)} relationships (deduplicated)" logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + if pipeline_status is not None: + async with pipeline_status_lock: + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) verbose_debug( f"New entities:{all_entities_data}, relationships:{all_relationships_data}" )