diff --git a/lightrag/operate.py b/lightrag/operate.py index bba7da9d..84e1364e 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -666,8 +666,33 @@ async def extract_entities( return maybe_nodes, maybe_edges # Handle all chunks in parallel and collect results - tasks = [_process_single_content(c) for c in ordered_chunks] - chunk_results = await asyncio.gather(*tasks) + # Create tasks for all chunks + tasks = [] + for c in ordered_chunks: + task = asyncio.create_task(_process_single_content(c)) + tasks.append(task) + + # Wait for tasks to complete or for the first exception to occur + # This allows us to cancel remaining tasks if any task fails + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) + + # Check if any task raised an exception + for task in done: + if task.exception(): + # If a task failed, cancel all pending tasks + # This prevents unnecessary processing since the parent function will abort anyway + for pending_task in pending: + pending_task.cancel() + + # Wait for cancellation to complete + if pending: + await asyncio.wait(pending) + + # Re-raise the exception to notify the caller + raise task.exception() + + # If all tasks completed successfully, collect results + chunk_results = [task.result() for task in tasks] # Collect all nodes and edges from all chunks all_nodes = defaultdict(list)