diff --git a/lightrag/operate.py b/lightrag/operate.py index 055f0f80..318efdfc 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -674,11 +674,17 @@ async def extract_entities( # Return the extracted nodes and edges for centralized processing return maybe_nodes, maybe_edges - # Handle all chunks in parallel and collect results - # Create tasks for all chunks + # Get max async tasks limit from global_config + llm_model_max_async = global_config.get("llm_model_max_async", 4) + semaphore = asyncio.Semaphore(llm_model_max_async) + + async def _process_with_semaphore(chunk): + async with semaphore: + return await _process_single_content(chunk) + tasks = [] for c in ordered_chunks: - task = asyncio.create_task(_process_single_content(c)) + task = asyncio.create_task(_process_with_semaphore(c)) tasks.append(task) # Wait for tasks to complete or for the first exception to occur @@ -755,7 +761,7 @@ async def extract_entities( total_entities_count = len(entities_data) total_relations_count = len(relationships_data) - log_message = f"Updating vector storage: {total_entities_count} entities" + log_message = f"Updating vector storage: {total_entities_count} entities..." logger.info(log_message) if pipeline_status is not None: async with pipeline_status_lock: @@ -776,7 +782,9 @@ async def extract_entities( } await entity_vdb.upsert(data_for_vdb) - log_message = f"Updating vector storage: {total_relations_count} relationships" + log_message = ( + f"Updating vector storage: {total_relations_count} relationships..." + ) logger.info(log_message) if pipeline_status is not None: async with pipeline_status_lock: