fix: cancel pending tasks when any chunk processing fails
Modify extract_entities function to terminate all pending text chunk processing tasks when any single chunk processing fails.
This commit is contained in:
@@ -666,8 +666,33 @@ async def extract_entities(
|
|||||||
return maybe_nodes, maybe_edges
|
return maybe_nodes, maybe_edges
|
||||||
|
|
||||||
# Handle all chunks in parallel and collect results
|
# Handle all chunks in parallel and collect results
|
||||||
tasks = [_process_single_content(c) for c in ordered_chunks]
|
# Create tasks for all chunks
|
||||||
chunk_results = await asyncio.gather(*tasks)
|
tasks = []
|
||||||
|
for c in ordered_chunks:
|
||||||
|
task = asyncio.create_task(_process_single_content(c))
|
||||||
|
tasks.append(task)
|
||||||
|
|
||||||
|
# Wait for tasks to complete or for the first exception to occur
|
||||||
|
# This allows us to cancel remaining tasks if any task fails
|
||||||
|
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
|
||||||
|
|
||||||
|
# Check if any task raised an exception
|
||||||
|
for task in done:
|
||||||
|
if task.exception():
|
||||||
|
# If a task failed, cancel all pending tasks
|
||||||
|
# This prevents unnecessary processing since the parent function will abort anyway
|
||||||
|
for pending_task in pending:
|
||||||
|
pending_task.cancel()
|
||||||
|
|
||||||
|
# Wait for cancellation to complete
|
||||||
|
if pending:
|
||||||
|
await asyncio.wait(pending)
|
||||||
|
|
||||||
|
# Re-raise the exception to notify the caller
|
||||||
|
raise task.exception()
|
||||||
|
|
||||||
|
# If all tasks completed successfully, collect results
|
||||||
|
chunk_results = [task.result() for task in tasks]
|
||||||
|
|
||||||
# Collect all nodes and edges from all chunks
|
# Collect all nodes and edges from all chunks
|
||||||
all_nodes = defaultdict(list)
|
all_nodes = defaultdict(list)
|
||||||
|
Reference in New Issue
Block a user