diff --git a/env.example b/env.example index b5242cc1..cd92abc8 100644 --- a/env.example +++ b/env.example @@ -48,7 +48,7 @@ # CHUNK_OVERLAP_SIZE=100 # MAX_TOKENS=32768 # Max tokens send to LLM for summarization # MAX_TOKEN_SUMMARY=500 # Max tokens for entity or relations summary -# LANGUAGE=English +# SUMMARY_LANGUAGE=English # MAX_EMBED_TOKENS=8192 ### LLM Configuration (Use valid host. For local services installed with docker, you can use host.docker.internal) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 4da612fa..49ca9b45 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -337,9 +337,6 @@ def create_app(args): "use_llm_check": False, }, namespace_prefix=args.namespace_prefix, - addon_params={ - "language": args.language, - }, auto_manage_storages_states=False, ) else: # azure_openai diff --git a/lightrag/api/utils_api.py b/lightrag/api/utils_api.py index 47652219..d7622ac0 100644 --- a/lightrag/api/utils_api.py +++ b/lightrag/api/utils_api.py @@ -396,7 +396,6 @@ def parse_args(is_uvicorn_mode: bool = False) -> argparse.Namespace: # Inject chunk configuration args.chunk_size = get_env_value("CHUNK_SIZE", 1200, int) args.chunk_overlap_size = get_env_value("CHUNK_OVERLAP_SIZE", 100, int) - args.language = get_env_value("LANGUAGE", "English") ollama_server_infos.LIGHTRAG_MODEL = args.simulated_model_name diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index cad1826d..e8c2bb9c 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -845,7 +845,8 @@ class LightRAG: ) } # Process document (text chunks and full docs) in parallel - tasks = [ + # Create tasks with references for potential cancellation + doc_status_task = asyncio.create_task( self.doc_status.upsert( { doc_id: { @@ -857,13 +858,28 @@ class LightRAG: "created_at": status_doc.created_at, } } - ), - self.chunks_vdb.upsert(chunks), - self._process_entity_relation_graph(chunks), + ) + ) + chunks_vdb_task = asyncio.create_task( + self.chunks_vdb.upsert(chunks) + ) + entity_relation_task = asyncio.create_task( + self._process_entity_relation_graph(chunks) + ) + full_docs_task = asyncio.create_task( self.full_docs.upsert( {doc_id: {"content": status_doc.content}} - ), - self.text_chunks.upsert(chunks), + ) + ) + text_chunks_task = asyncio.create_task( + self.text_chunks.upsert(chunks) + ) + tasks = [ + doc_status_task, + chunks_vdb_task, + entity_relation_task, + full_docs_task, + text_chunks_task, ] try: await asyncio.gather(*tasks) @@ -881,9 +897,25 @@ class LightRAG: } ) except Exception as e: - logger.error( + # Log error and update pipeline status + error_msg = ( f"Failed to process document {doc_id}: {str(e)}" ) + logger.error(error_msg) + pipeline_status["latest_message"] = error_msg + pipeline_status["history_messages"].append(error_msg) + + # Cancel other tasks as they are no longer meaningful + for task in [ + chunks_vdb_task, + entity_relation_task, + full_docs_task, + text_chunks_task, + ]: + if not task.done(): + task.cancel() + + # Update document status to failed await self.doc_status.upsert( { doc_id: { @@ -926,7 +958,7 @@ class LightRAG: pipeline_status["latest_message"] = log_message pipeline_status["history_messages"].append(log_message) - # 获取新的待处理文档 + # Check for pending documents again processing_docs, failed_docs, pending_docs = await asyncio.gather( self.doc_status.get_docs_by_status(DocStatus.PROCESSING), self.doc_status.get_docs_by_status(DocStatus.FAILED), @@ -1949,6 +1981,9 @@ class LightRAG: new_entity_name, new_node_data ) + # Store relationships that need to be updated + relations_to_update = [] + # Get all edges related to the original entity edges = await self.chunk_entity_relation_graph.get_node_edges( entity_name @@ -1964,10 +1999,16 @@ class LightRAG: await self.chunk_entity_relation_graph.upsert_edge( new_entity_name, target, edge_data ) + relations_to_update.append( + (new_entity_name, target, edge_data) + ) else: # target == entity_name await self.chunk_entity_relation_graph.upsert_edge( source, new_entity_name, edge_data ) + relations_to_update.append( + (source, new_entity_name, edge_data) + ) # Delete old entity await self.chunk_entity_relation_graph.delete_node(entity_name) @@ -1976,6 +2017,35 @@ class LightRAG: old_entity_id = compute_mdhash_id(entity_name, prefix="ent-") await self.entities_vdb.delete([old_entity_id]) + # Update relationship vector representations + for src, tgt, edge_data in relations_to_update: + description = edge_data.get("description", "") + keywords = edge_data.get("keywords", "") + source_id = edge_data.get("source_id", "") + weight = float(edge_data.get("weight", 1.0)) + + # Create new content for embedding + content = f"{src}\t{tgt}\n{keywords}\n{description}" + + # Calculate relationship ID + relation_id = compute_mdhash_id(src + tgt, prefix="rel-") + + # Prepare data for vector database update + relation_data = { + relation_id: { + "content": content, + "src_id": src, + "tgt_id": tgt, + "source_id": source_id, + "description": description, + "keywords": keywords, + "weight": weight, + } + } + + # Update vector database + await self.relationships_vdb.upsert(relation_data) + # Update working entity name to new name entity_name = new_entity_name else: @@ -2086,7 +2156,7 @@ class LightRAG: weight = float(new_edge_data.get("weight", 1.0)) # Create content for embedding - content = f"{keywords}\t{source_entity}\n{target_entity}\n{description}" + content = f"{source_entity}\t{target_entity}\n{keywords}\n{description}" # Calculate relation ID relation_id = compute_mdhash_id( diff --git a/lightrag/operate.py b/lightrag/operate.py index 7db42284..30983145 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -1242,9 +1242,11 @@ async def _find_most_related_text_unit_from_entities( all_text_units_lookup = {} tasks = [] + for index, (this_text_units, this_edges) in enumerate(zip(text_units, edges)): for c_id in this_text_units: if c_id not in all_text_units_lookup: + all_text_units_lookup[c_id] = index tasks.append((c_id, index, this_edges)) results = await asyncio.gather(