-
Notifications
You must be signed in to change notification settings - Fork 7
Description
What would you like?
max_concurrency in map/parallel doesn't throttle async operations like invoke
Problem Statement
When using context.map() or context.parallel() with max_concurrency set, you'd expect it to limit the number of concurrent operations in progress. However, the current implementation only limits the number of active threads, not the number of in-flight async operations.
This means when map/parallel iterations contain async operations like context.invoke(), context.wait(), or context.wait_for_condition() - all items can be started nearly simultaneously regardless of the max_concurrency setting. While this does optimize processing, it does seem surprising given the max_concurrency semantic.
Current Behavior
How it works today
- All tasks are submitted to a
ThreadPoolExecutorimmediately - The thread pool has
max_workers = max_concurrencythreads - When a task calls an async operation like
context.invoke():- It creates a checkpoint
- Raises
SuspendExecution - The thread completes and returns to the pool
- The next queued task starts immediately
- Result: All async operations are initiated as fast as threads can cycle through items (typically milliseconds per item)
Example
# User expects: Only 5 Lambda invocations in-flight at once
# Actual behavior: All 100 Lambda invocations start within ~1 second
result = context.map(
items=range(100),
func=lambda ctx, item, idx, items: ctx.invoke("MyFunction", item),
config=MapConfig(max_concurrency=5)
)With max_concurrency=5 and 100 items:
- Tasks 0-4 start in 5 threads
- Each calls
invoke()and suspends (releases thread) - Tasks 5-9 start immediately in the freed threads
- This continues rapidly until all 100 invocations are started
- All 100 Lambda functions are invoked concurrently
Expected Behavior
max_concurrency should limit the number of in-flight operations (including suspended ones), not just active threads.
Desired behavior
- Start
max_concurrencytasks initially - When a task completes (reaches SUCCEEDED or FAILED state), start the next pending task
- Suspended tasks remain "in-flight" and count against the concurrency limit
- If all in-flight tasks are suspended, the parent suspends too
Example with expected behavior
result = context.map(
items=range(100),
func=lambda ctx, item, idx, items: ctx.invoke("MyFunction", item),
config=MapConfig(max_concurrency=5)
)With max_concurrency=5 and 100 items:
- Tasks 0-4 start and invoke Lambda functions
- Parent suspends waiting for responses
- When Lambda 0 completes, task 5 starts
- When Lambda 1 completes, task 6 starts
- At most 5 Lambda invocations are in-flight at any time
Impact
Current workarounds
Manually pre-batch their arrays:
# Manual batching to achieve max 5 concurrent invocations
def batch_items(items, batch_size):
return [items[i:i + batch_size] for i in range(0, len(items), batch_size)]
batches = batch_items(items, max_concurrency=5)
for batch in batches:
for i, item in enumerate(batch):
context.invoke(f"batch_{i}", "MyFunction", item)This is not great.
Related Discussion
See Discussion #278 for the initial report.
Proposed Solution
Implement dynamic task submission where:
- Track "in-flight" tasks (PENDING + RUNNING + SUSPENDED states)
- Only submit new tasks when
in_flight_count < max_concurrency - Decrement
in_flight_countonly when tasks reach terminal states (COMPLETED or FAILED) - Suspended tasks remain in-flight and block new submissions
Acceptance Criteria
-
max_concurrencylimits in-flight operations, not just active threads - Suspended tasks count against the concurrency limit
- New tasks only start when existing tasks complete (not when they suspend)
- All existing tests pass
- New tests verify the corrected concurrency behavior
- Documentation updated to clarify concurrency semantics
- Consider backward compatibility strategy
Additional Context
Code locations
src/aws_durable_execution_sdk_python/concurrency/executor.py:ConcurrentExecutor.execute()src/aws_durable_execution_sdk_python/operation/map.py: Map operationsrc/aws_durable_execution_sdk_python/operation/parallel.py: Parallel operation
Key insight
The issue stems from submitting all tasks upfront:
futures = [submit_task(exe_state) for exe_state in self.executables_with_state]Replace this with dynamic submission based on in-flight count.
Possible Implementation
No response
Is this a breaking change?
No
Does this require an RFC?
No
Additional Context
No response