Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/aws_durable_execution_sdk_python/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ class CheckpointBatcherConfig:
Attributes:
max_batch_size_bytes: Maximum batch size in bytes (default: 750KB)
max_batch_time_seconds: Maximum time to wait before flushing batch (default: 1.0 second)
max_batch_operations: Maximum number of operations per batch (default: unlimited)
max_batch_operations: Maximum number of operations per batch (default: 250)
"""

max_batch_size_bytes: int = 750 * 1024 # 750KB - private readonly MAX_PAYLOAD_SIZE
max_batch_time_seconds: float = 1.0 # 1 second default
max_batch_operations: int | float = float("inf") # No operation limit by default
max_batch_size_bytes: int = 750 * 1024 # 750KB
max_batch_time_seconds: float = 1.0
max_batch_operations: int = 250


@dataclass(frozen=True)
Expand Down
45 changes: 44 additions & 1 deletion tests/state_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ def test_checkpoint_batcher_config_default_values():

assert config.max_batch_size_bytes == 750 * 1024 # 750KB
assert config.max_batch_time_seconds == 1.0
assert config.max_batch_operations == float("inf")
assert config.max_batch_operations == 250


def test_checkpoint_batcher_config_custom_values():
Expand All @@ -803,6 +803,49 @@ def test_checkpoint_batcher_config_immutable():
config.max_batch_size_bytes = 1000


def test_checkpoint_batch_respects_default_max_items_limit():
"""Test that batch collection respects the default MAX_ITEMS_IN_BATCH (250) limit.

This ensures consistency across all Durable Execution SDK implementations.
"""
mock_lambda_client = Mock(spec=LambdaClient)

# Use default config (max_batch_operations=250)
config = CheckpointBatcherConfig(
max_batch_size_bytes=10 * 1024 * 1024,
max_batch_time_seconds=10.0,
)

state = ExecutionState(
durable_execution_arn="test_arn",
initial_checkpoint_token="token123", # noqa: S106
operations={},
service_client=mock_lambda_client,
batcher_config=config,
)

# Enqueue 300 small operations (exceeds MAX_ITEMS_IN_BATCH of 250)
for i in range(300):
operation_update = OperationUpdate(
operation_id=f"op_{i}",
operation_type=OperationType.STEP,
action=OperationAction.START,
)
state._checkpoint_queue.put(QueuedOperation(operation_update, None))

# Collect first batch
batch1 = state._collect_checkpoint_batch()

# First batch should have exactly 250 items
assert len(batch1) == 250

# Collect second batch
batch2 = state._collect_checkpoint_batch()

# Second batch should have remaining 50 items
assert len(batch2) == 50


def test_calculate_operation_size_with_operation():
"""Test _calculate_operation_size with a real operation."""
operation_update = OperationUpdate(
Expand Down
Loading