diff --git a/src/aws_durable_execution_sdk_python/state.py b/src/aws_durable_execution_sdk_python/state.py index 6bc6826..8f8ff82 100644 --- a/src/aws_durable_execution_sdk_python/state.py +++ b/src/aws_durable_execution_sdk_python/state.py @@ -45,12 +45,12 @@ class CheckpointBatcherConfig: Attributes: max_batch_size_bytes: Maximum batch size in bytes (default: 750KB) max_batch_time_seconds: Maximum time to wait before flushing batch (default: 1.0 second) - max_batch_operations: Maximum number of operations per batch (default: unlimited) + max_batch_operations: Maximum number of operations per batch (default: 250) """ - max_batch_size_bytes: int = 750 * 1024 # 750KB - private readonly MAX_PAYLOAD_SIZE - max_batch_time_seconds: float = 1.0 # 1 second default - max_batch_operations: int | float = float("inf") # No operation limit by default + max_batch_size_bytes: int = 750 * 1024 # 750KB + max_batch_time_seconds: float = 1.0 + max_batch_operations: int = 250 @dataclass(frozen=True) diff --git a/tests/state_test.py b/tests/state_test.py index be9e6dd..968a3b4 100644 --- a/tests/state_test.py +++ b/tests/state_test.py @@ -779,7 +779,7 @@ def test_checkpoint_batcher_config_default_values(): assert config.max_batch_size_bytes == 750 * 1024 # 750KB assert config.max_batch_time_seconds == 1.0 - assert config.max_batch_operations == float("inf") + assert config.max_batch_operations == 250 def test_checkpoint_batcher_config_custom_values(): @@ -803,6 +803,49 @@ def test_checkpoint_batcher_config_immutable(): config.max_batch_size_bytes = 1000 +def test_checkpoint_batch_respects_default_max_items_limit(): + """Test that batch collection respects the default MAX_ITEMS_IN_BATCH (250) limit. + + This ensures consistency across all Durable Execution SDK implementations. + """ + mock_lambda_client = Mock(spec=LambdaClient) + + # Use default config (max_batch_operations=250) + config = CheckpointBatcherConfig( + max_batch_size_bytes=10 * 1024 * 1024, + max_batch_time_seconds=10.0, + ) + + state = ExecutionState( + durable_execution_arn="test_arn", + initial_checkpoint_token="token123", # noqa: S106 + operations={}, + service_client=mock_lambda_client, + batcher_config=config, + ) + + # Enqueue 300 small operations (exceeds MAX_ITEMS_IN_BATCH of 250) + for i in range(300): + operation_update = OperationUpdate( + operation_id=f"op_{i}", + operation_type=OperationType.STEP, + action=OperationAction.START, + ) + state._checkpoint_queue.put(QueuedOperation(operation_update, None)) + + # Collect first batch + batch1 = state._collect_checkpoint_batch() + + # First batch should have exactly 250 items + assert len(batch1) == 250 + + # Collect second batch + batch2 = state._collect_checkpoint_batch() + + # Second batch should have remaining 50 items + assert len(batch2) == 50 + + def test_calculate_operation_size_with_operation(): """Test _calculate_operation_size with a real operation.""" operation_update = OperationUpdate(