Skip to content

[Feature]: max_concurrency to limit operations rather than threads #279

@yaythomas

Description

@yaythomas

What would you like?

max_concurrency in map/parallel doesn't throttle async operations like invoke

Problem Statement

When using context.map() or context.parallel() with max_concurrency set, you'd expect it to limit the number of concurrent operations in progress. However, the current implementation only limits the number of active threads, not the number of in-flight async operations.

This means when map/parallel iterations contain async operations like context.invoke(), context.wait(), or context.wait_for_condition() - all items can be started nearly simultaneously regardless of the max_concurrency setting. While this does optimize processing, it does seem surprising given the max_concurrency semantic.

Current Behavior

How it works today

  1. All tasks are submitted to a ThreadPoolExecutor immediately
  2. The thread pool has max_workers = max_concurrency threads
  3. When a task calls an async operation like context.invoke():
    • It creates a checkpoint
    • Raises SuspendExecution
    • The thread completes and returns to the pool
    • The next queued task starts immediately
  4. Result: All async operations are initiated as fast as threads can cycle through items (typically milliseconds per item)

Example

# User expects: Only 5 Lambda invocations in-flight at once
# Actual behavior: All 100 Lambda invocations start within ~1 second
result = context.map(
    items=range(100),
    func=lambda ctx, item, idx, items: ctx.invoke("MyFunction", item),
    config=MapConfig(max_concurrency=5)
)

With max_concurrency=5 and 100 items:

  • Tasks 0-4 start in 5 threads
  • Each calls invoke() and suspends (releases thread)
  • Tasks 5-9 start immediately in the freed threads
  • This continues rapidly until all 100 invocations are started
  • All 100 Lambda functions are invoked concurrently

Expected Behavior

max_concurrency should limit the number of in-flight operations (including suspended ones), not just active threads.

Desired behavior

  1. Start max_concurrency tasks initially
  2. When a task completes (reaches SUCCEEDED or FAILED state), start the next pending task
  3. Suspended tasks remain "in-flight" and count against the concurrency limit
  4. If all in-flight tasks are suspended, the parent suspends too

Example with expected behavior

result = context.map(
    items=range(100),
    func=lambda ctx, item, idx, items: ctx.invoke("MyFunction", item),
    config=MapConfig(max_concurrency=5)
)

With max_concurrency=5 and 100 items:

  • Tasks 0-4 start and invoke Lambda functions
  • Parent suspends waiting for responses
  • When Lambda 0 completes, task 5 starts
  • When Lambda 1 completes, task 6 starts
  • At most 5 Lambda invocations are in-flight at any time

Impact

Current workarounds

Manually pre-batch their arrays:

# Manual batching to achieve max 5 concurrent invocations
def batch_items(items, batch_size):
    return [items[i:i + batch_size] for i in range(0, len(items), batch_size)]

batches = batch_items(items, max_concurrency=5)
for batch in batches:
    for i, item in enumerate(batch):
        context.invoke(f"batch_{i}", "MyFunction", item)

This is not great.

Related Discussion

See Discussion #278 for the initial report.

Proposed Solution

Implement dynamic task submission where:

  1. Track "in-flight" tasks (PENDING + RUNNING + SUSPENDED states)
  2. Only submit new tasks when in_flight_count < max_concurrency
  3. Decrement in_flight_count only when tasks reach terminal states (COMPLETED or FAILED)
  4. Suspended tasks remain in-flight and block new submissions

Acceptance Criteria

  • max_concurrency limits in-flight operations, not just active threads
  • Suspended tasks count against the concurrency limit
  • New tasks only start when existing tasks complete (not when they suspend)
  • All existing tests pass
  • New tests verify the corrected concurrency behavior
  • Documentation updated to clarify concurrency semantics
  • Consider backward compatibility strategy

Additional Context

Code locations

  • src/aws_durable_execution_sdk_python/concurrency/executor.py: ConcurrentExecutor.execute()
  • src/aws_durable_execution_sdk_python/operation/map.py: Map operation
  • src/aws_durable_execution_sdk_python/operation/parallel.py: Parallel operation

Key insight

The issue stems from submitting all tasks upfront:

futures = [submit_task(exe_state) for exe_state in self.executables_with_state]

Replace this with dynamic submission based on in-flight count.

Possible Implementation

No response

Is this a breaking change?

No

Does this require an RFC?

No

Additional Context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions