Merge branch 'HKUDS:main' into feat_login-jwt

This commit is contained in:
Milin
2025-03-05 20:21:21 +08:00
committed by GitHub
5 changed files with 82 additions and 14 deletions

View File

@@ -48,7 +48,7 @@
# CHUNK_OVERLAP_SIZE=100 # CHUNK_OVERLAP_SIZE=100
# MAX_TOKENS=32768 # Max tokens send to LLM for summarization # MAX_TOKENS=32768 # Max tokens send to LLM for summarization
# MAX_TOKEN_SUMMARY=500 # Max tokens for entity or relations summary # MAX_TOKEN_SUMMARY=500 # Max tokens for entity or relations summary
# LANGUAGE=English # SUMMARY_LANGUAGE=English
# MAX_EMBED_TOKENS=8192 # MAX_EMBED_TOKENS=8192
### LLM Configuration (Use valid host. For local services installed with docker, you can use host.docker.internal) ### LLM Configuration (Use valid host. For local services installed with docker, you can use host.docker.internal)

View File

@@ -337,9 +337,6 @@ def create_app(args):
"use_llm_check": False, "use_llm_check": False,
}, },
namespace_prefix=args.namespace_prefix, namespace_prefix=args.namespace_prefix,
addon_params={
"language": args.language,
},
auto_manage_storages_states=False, auto_manage_storages_states=False,
) )
else: # azure_openai else: # azure_openai

View File

@@ -396,7 +396,6 @@ def parse_args(is_uvicorn_mode: bool = False) -> argparse.Namespace:
# Inject chunk configuration # Inject chunk configuration
args.chunk_size = get_env_value("CHUNK_SIZE", 1200, int) args.chunk_size = get_env_value("CHUNK_SIZE", 1200, int)
args.chunk_overlap_size = get_env_value("CHUNK_OVERLAP_SIZE", 100, int) args.chunk_overlap_size = get_env_value("CHUNK_OVERLAP_SIZE", 100, int)
args.language = get_env_value("LANGUAGE", "English")
ollama_server_infos.LIGHTRAG_MODEL = args.simulated_model_name ollama_server_infos.LIGHTRAG_MODEL = args.simulated_model_name

View File

@@ -845,7 +845,8 @@ class LightRAG:
) )
} }
# Process document (text chunks and full docs) in parallel # Process document (text chunks and full docs) in parallel
tasks = [ # Create tasks with references for potential cancellation
doc_status_task = asyncio.create_task(
self.doc_status.upsert( self.doc_status.upsert(
{ {
doc_id: { doc_id: {
@@ -857,13 +858,28 @@ class LightRAG:
"created_at": status_doc.created_at, "created_at": status_doc.created_at,
} }
} }
), )
self.chunks_vdb.upsert(chunks), )
self._process_entity_relation_graph(chunks), chunks_vdb_task = asyncio.create_task(
self.chunks_vdb.upsert(chunks)
)
entity_relation_task = asyncio.create_task(
self._process_entity_relation_graph(chunks)
)
full_docs_task = asyncio.create_task(
self.full_docs.upsert( self.full_docs.upsert(
{doc_id: {"content": status_doc.content}} {doc_id: {"content": status_doc.content}}
), )
self.text_chunks.upsert(chunks), )
text_chunks_task = asyncio.create_task(
self.text_chunks.upsert(chunks)
)
tasks = [
doc_status_task,
chunks_vdb_task,
entity_relation_task,
full_docs_task,
text_chunks_task,
] ]
try: try:
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
@@ -881,9 +897,25 @@ class LightRAG:
} }
) )
except Exception as e: except Exception as e:
logger.error( # Log error and update pipeline status
error_msg = (
f"Failed to process document {doc_id}: {str(e)}" f"Failed to process document {doc_id}: {str(e)}"
) )
logger.error(error_msg)
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(error_msg)
# Cancel other tasks as they are no longer meaningful
for task in [
chunks_vdb_task,
entity_relation_task,
full_docs_task,
text_chunks_task,
]:
if not task.done():
task.cancel()
# Update document status to failed
await self.doc_status.upsert( await self.doc_status.upsert(
{ {
doc_id: { doc_id: {
@@ -926,7 +958,7 @@ class LightRAG:
pipeline_status["latest_message"] = log_message pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message) pipeline_status["history_messages"].append(log_message)
# 获取新的待处理文档 # Check for pending documents again
processing_docs, failed_docs, pending_docs = await asyncio.gather( processing_docs, failed_docs, pending_docs = await asyncio.gather(
self.doc_status.get_docs_by_status(DocStatus.PROCESSING), self.doc_status.get_docs_by_status(DocStatus.PROCESSING),
self.doc_status.get_docs_by_status(DocStatus.FAILED), self.doc_status.get_docs_by_status(DocStatus.FAILED),
@@ -1949,6 +1981,9 @@ class LightRAG:
new_entity_name, new_node_data new_entity_name, new_node_data
) )
# Store relationships that need to be updated
relations_to_update = []
# Get all edges related to the original entity # Get all edges related to the original entity
edges = await self.chunk_entity_relation_graph.get_node_edges( edges = await self.chunk_entity_relation_graph.get_node_edges(
entity_name entity_name
@@ -1964,10 +1999,16 @@ class LightRAG:
await self.chunk_entity_relation_graph.upsert_edge( await self.chunk_entity_relation_graph.upsert_edge(
new_entity_name, target, edge_data new_entity_name, target, edge_data
) )
relations_to_update.append(
(new_entity_name, target, edge_data)
)
else: # target == entity_name else: # target == entity_name
await self.chunk_entity_relation_graph.upsert_edge( await self.chunk_entity_relation_graph.upsert_edge(
source, new_entity_name, edge_data source, new_entity_name, edge_data
) )
relations_to_update.append(
(source, new_entity_name, edge_data)
)
# Delete old entity # Delete old entity
await self.chunk_entity_relation_graph.delete_node(entity_name) await self.chunk_entity_relation_graph.delete_node(entity_name)
@@ -1976,6 +2017,35 @@ class LightRAG:
old_entity_id = compute_mdhash_id(entity_name, prefix="ent-") old_entity_id = compute_mdhash_id(entity_name, prefix="ent-")
await self.entities_vdb.delete([old_entity_id]) await self.entities_vdb.delete([old_entity_id])
# Update relationship vector representations
for src, tgt, edge_data in relations_to_update:
description = edge_data.get("description", "")
keywords = edge_data.get("keywords", "")
source_id = edge_data.get("source_id", "")
weight = float(edge_data.get("weight", 1.0))
# Create new content for embedding
content = f"{src}\t{tgt}\n{keywords}\n{description}"
# Calculate relationship ID
relation_id = compute_mdhash_id(src + tgt, prefix="rel-")
# Prepare data for vector database update
relation_data = {
relation_id: {
"content": content,
"src_id": src,
"tgt_id": tgt,
"source_id": source_id,
"description": description,
"keywords": keywords,
"weight": weight,
}
}
# Update vector database
await self.relationships_vdb.upsert(relation_data)
# Update working entity name to new name # Update working entity name to new name
entity_name = new_entity_name entity_name = new_entity_name
else: else:
@@ -2086,7 +2156,7 @@ class LightRAG:
weight = float(new_edge_data.get("weight", 1.0)) weight = float(new_edge_data.get("weight", 1.0))
# Create content for embedding # Create content for embedding
content = f"{keywords}\t{source_entity}\n{target_entity}\n{description}" content = f"{source_entity}\t{target_entity}\n{keywords}\n{description}"
# Calculate relation ID # Calculate relation ID
relation_id = compute_mdhash_id( relation_id = compute_mdhash_id(

View File

@@ -1242,9 +1242,11 @@ async def _find_most_related_text_unit_from_entities(
all_text_units_lookup = {} all_text_units_lookup = {}
tasks = [] tasks = []
for index, (this_text_units, this_edges) in enumerate(zip(text_units, edges)): for index, (this_text_units, this_edges) in enumerate(zip(text_units, edges)):
for c_id in this_text_units: for c_id in this_text_units:
if c_id not in all_text_units_lookup: if c_id not in all_text_units_lookup:
all_text_units_lookup[c_id] = index
tasks.append((c_id, index, this_edges)) tasks.append((c_id, index, this_edges))
results = await asyncio.gather( results = await asyncio.gather(