Refactor pipeline status updates and entity extraction.

- Let all parrallel jobs using one pipe_status objects
- Improved thread safety with pipeline_status_lock
- Only pipeline jobs can add message to pipe_status
- Marked insert_custom_chunks as deprecated
This commit is contained in:
yangdx
2025-03-10 16:48:59 +08:00
parent 57a41eedb8
commit 3cca18c59c
2 changed files with 34 additions and 23 deletions

View File

@@ -583,6 +583,7 @@ class LightRAG:
split_by_character, split_by_character_only split_by_character, split_by_character_only
) )
# TODO: deprecated, use insert instead
def insert_custom_chunks( def insert_custom_chunks(
self, self,
full_text: str, full_text: str,
@@ -594,6 +595,7 @@ class LightRAG:
self.ainsert_custom_chunks(full_text, text_chunks, doc_id) self.ainsert_custom_chunks(full_text, text_chunks, doc_id)
) )
# TODO: deprecated, use ainsert instead
async def ainsert_custom_chunks( async def ainsert_custom_chunks(
self, full_text: str, text_chunks: list[str], doc_id: str | None = None self, full_text: str, text_chunks: list[str], doc_id: str | None = None
) -> None: ) -> None:
@@ -885,7 +887,7 @@ class LightRAG:
self.chunks_vdb.upsert(chunks) self.chunks_vdb.upsert(chunks)
) )
entity_relation_task = asyncio.create_task( entity_relation_task = asyncio.create_task(
self._process_entity_relation_graph(chunks) self._process_entity_relation_graph(chunks, pipeline_status, pipeline_status_lock)
) )
full_docs_task = asyncio.create_task( full_docs_task = asyncio.create_task(
self.full_docs.upsert( self.full_docs.upsert(
@@ -1000,21 +1002,23 @@ class LightRAG:
pipeline_status["latest_message"] = log_message pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message) pipeline_status["history_messages"].append(log_message)
async def _process_entity_relation_graph(self, chunk: dict[str, Any]) -> None: async def _process_entity_relation_graph(self, chunk: dict[str, Any], pipeline_status=None, pipeline_status_lock=None) -> None:
try: try:
await extract_entities( await extract_entities(
chunk, chunk,
knowledge_graph_inst=self.chunk_entity_relation_graph, knowledge_graph_inst=self.chunk_entity_relation_graph,
entity_vdb=self.entities_vdb, entity_vdb=self.entities_vdb,
relationships_vdb=self.relationships_vdb, relationships_vdb=self.relationships_vdb,
llm_response_cache=self.llm_response_cache,
global_config=asdict(self), global_config=asdict(self),
pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock,
llm_response_cache=self.llm_response_cache,
) )
except Exception as e: except Exception as e:
logger.error("Failed to extract entities and relationships") logger.error("Failed to extract entities and relationships")
raise e raise e
async def _insert_done(self) -> None: async def _insert_done(self, pipeline_status=None, pipeline_status_lock=None) -> None:
tasks = [ tasks = [
cast(StorageNameSpace, storage_inst).index_done_callback() cast(StorageNameSpace, storage_inst).index_done_callback()
for storage_inst in [ # type: ignore for storage_inst in [ # type: ignore
@@ -1033,12 +1037,10 @@ class LightRAG:
log_message = "All Insert done" log_message = "All Insert done"
logger.info(log_message) logger.info(log_message)
# 获取 pipeline_status 并更新 latest_message 和 history_messages if pipeline_status is not None and pipeline_status_lock is not None:
from lightrag.kg.shared_storage import get_namespace_data async with pipeline_status_lock:
pipeline_status["latest_message"] = log_message
pipeline_status = await get_namespace_data("pipeline_status") pipeline_status["history_messages"].append(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
def insert_custom_kg( def insert_custom_kg(
self, custom_kg: dict[str, Any], full_doc_id: str = None self, custom_kg: dict[str, Any], full_doc_id: str = None

View File

@@ -340,11 +340,10 @@ async def extract_entities(
entity_vdb: BaseVectorStorage, entity_vdb: BaseVectorStorage,
relationships_vdb: BaseVectorStorage, relationships_vdb: BaseVectorStorage,
global_config: dict[str, str], global_config: dict[str, str],
pipeline_status: dict = None,
pipeline_status_lock = None,
llm_response_cache: BaseKVStorage | None = None, llm_response_cache: BaseKVStorage | None = None,
) -> None: ) -> None:
from lightrag.kg.shared_storage import get_namespace_data
pipeline_status = await get_namespace_data("pipeline_status")
use_llm_func: callable = global_config["llm_model_func"] use_llm_func: callable = global_config["llm_model_func"]
entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"] entity_extract_max_gleaning = global_config["entity_extract_max_gleaning"]
enable_llm_cache_for_entity_extract: bool = global_config[ enable_llm_cache_for_entity_extract: bool = global_config[
@@ -507,8 +506,10 @@ async def extract_entities(
relations_count = len(maybe_edges) relations_count = len(maybe_edges)
log_message = f" Chunk {processed_chunks}/{total_chunks}: extracted {entities_count} entities and {relations_count} relationships (deduplicated)" log_message = f" Chunk {processed_chunks}/{total_chunks}: extracted {entities_count} entities and {relations_count} relationships (deduplicated)"
logger.info(log_message) logger.info(log_message)
pipeline_status["latest_message"] = log_message if pipeline_status is not None:
pipeline_status["history_messages"].append(log_message) async with pipeline_status_lock:
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
return dict(maybe_nodes), dict(maybe_edges) return dict(maybe_nodes), dict(maybe_edges)
tasks = [_process_single_content(c) for c in ordered_chunks] tasks = [_process_single_content(c) for c in ordered_chunks]
@@ -547,25 +548,33 @@ async def extract_entities(
if not (all_entities_data or all_relationships_data): if not (all_entities_data or all_relationships_data):
log_message = "Didn't extract any entities and relationships." log_message = "Didn't extract any entities and relationships."
logger.info(log_message) logger.info(log_message)
pipeline_status["latest_message"] = log_message if pipeline_status is not None:
pipeline_status["history_messages"].append(log_message) async with pipeline_status_lock:
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
return return
if not all_entities_data: if not all_entities_data:
log_message = "Didn't extract any entities" log_message = "Didn't extract any entities"
logger.info(log_message) logger.info(log_message)
pipeline_status["latest_message"] = log_message if pipeline_status is not None:
pipeline_status["history_messages"].append(log_message) async with pipeline_status_lock:
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
if not all_relationships_data: if not all_relationships_data:
log_message = "Didn't extract any relationships" log_message = "Didn't extract any relationships"
logger.info(log_message) logger.info(log_message)
pipeline_status["latest_message"] = log_message if pipeline_status is not None:
pipeline_status["history_messages"].append(log_message) async with pipeline_status_lock:
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
log_message = f"Extracted {len(all_entities_data)} entities and {len(all_relationships_data)} relationships (deduplicated)" log_message = f"Extracted {len(all_entities_data)} entities and {len(all_relationships_data)} relationships (deduplicated)"
logger.info(log_message) logger.info(log_message)
pipeline_status["latest_message"] = log_message if pipeline_status is not None:
pipeline_status["history_messages"].append(log_message) async with pipeline_status_lock:
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
verbose_debug( verbose_debug(
f"New entities:{all_entities_data}, relationships:{all_relationships_data}" f"New entities:{all_entities_data}, relationships:{all_relationships_data}"
) )