Set max parallel chunks processing according to MAX_SYNC of LLM

This commit is contained in:
yangdx
2025-04-22 15:03:46 +08:00
parent 21c0bb7abf
commit 1eef9b7205

View File

@@ -674,11 +674,17 @@ async def extract_entities(
# Return the extracted nodes and edges for centralized processing # Return the extracted nodes and edges for centralized processing
return maybe_nodes, maybe_edges return maybe_nodes, maybe_edges
# Handle all chunks in parallel and collect results # Get max async tasks limit from global_config
# Create tasks for all chunks llm_model_max_async = global_config.get("llm_model_max_async", 4)
semaphore = asyncio.Semaphore(llm_model_max_async)
async def _process_with_semaphore(chunk):
async with semaphore:
return await _process_single_content(chunk)
tasks = [] tasks = []
for c in ordered_chunks: for c in ordered_chunks:
task = asyncio.create_task(_process_single_content(c)) task = asyncio.create_task(_process_with_semaphore(c))
tasks.append(task) tasks.append(task)
# Wait for tasks to complete or for the first exception to occur # Wait for tasks to complete or for the first exception to occur
@@ -755,7 +761,7 @@ async def extract_entities(
total_entities_count = len(entities_data) total_entities_count = len(entities_data)
total_relations_count = len(relationships_data) total_relations_count = len(relationships_data)
log_message = f"Updating vector storage: {total_entities_count} entities" log_message = f"Updating vector storage: {total_entities_count} entities..."
logger.info(log_message) logger.info(log_message)
if pipeline_status is not None: if pipeline_status is not None:
async with pipeline_status_lock: async with pipeline_status_lock:
@@ -776,7 +782,9 @@ async def extract_entities(
} }
await entity_vdb.upsert(data_for_vdb) await entity_vdb.upsert(data_for_vdb)
log_message = f"Updating vector storage: {total_relations_count} relationships" log_message = (
f"Updating vector storage: {total_relations_count} relationships..."
)
logger.info(log_message) logger.info(log_message)
if pipeline_status is not None: if pipeline_status is not None:
async with pipeline_status_lock: async with pipeline_status_lock: