From b4b4c0fc72bf94c8bfa72150105249061879d62b Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 20 Jan 2026 15:08:16 +0000 Subject: [PATCH] fix(batch-queue): allow batch queue consumers to run independently from the run engine worker new environment variable `BATCH_QUEUE_WORKER_ENABLED` now can be used independently from `RUN_ENGINE_WORKER_ENABLED` --- apps/webapp/app/env.server.ts | 1 + apps/webapp/app/v3/runEngine.server.ts | 1 + .../run-engine/src/engine/index.ts | 70 +++++++++---------- .../run-engine/src/engine/types.ts | 1 + 4 files changed, 38 insertions(+), 35 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 82ba812a67..1dc3091f16 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -952,6 +952,7 @@ const EnvironmentSchema = z BATCH_QUEUE_MAX_DEFICIT: z.coerce.number().int().default(100), BATCH_QUEUE_CONSUMER_COUNT: z.coerce.number().int().default(3), BATCH_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(50), + BATCH_QUEUE_WORKER_ENABLED: BoolEnv.default(true), // Number of master queue shards for horizontal scaling BATCH_QUEUE_SHARD_COUNT: z.coerce.number().int().default(1), // Maximum queues to fetch from master queue per iteration diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index 10f8e550b3..efba5fbdb0 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -179,6 +179,7 @@ function createRunEngine() { : undefined, consumerCount: env.BATCH_QUEUE_CONSUMER_COUNT, consumerIntervalMs: env.BATCH_QUEUE_CONSUMER_INTERVAL_MS, + consumerEnabled: env.BATCH_QUEUE_WORKER_ENABLED, // Default processing concurrency when no specific limit is set // This is overridden per-batch based on the plan type at batch creation defaultConcurrency: env.BATCH_CONCURRENCY_LIMIT_DEFAULT, diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 88e91b1bb3..1021c32fbf 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -202,9 +202,9 @@ export class RunEngine { id: payload.waitpointId, output: payload.error ? { - value: payload.error, - isError: true, - } + value: payload.error, + isError: true, + } : undefined, }); }, @@ -329,8 +329,8 @@ export class RunEngine { }); // Initialize BatchQueue for DRR-based batch processing (if configured) - // Only start consumers if worker is not disabled (same as main worker) - const startConsumers = !options.worker.disabled; + // Only start consumers if consumerDisabled is not set or is false + const startBatchQueueConsumers = options.batchQueue?.consumerEnabled ?? true; this.batchQueue = new BatchQueue({ redis: { @@ -348,7 +348,7 @@ export class RunEngine { consumerIntervalMs: options.batchQueue?.consumerIntervalMs ?? 100, defaultConcurrency: options.batchQueue?.defaultConcurrency ?? 10, globalRateLimiter: options.batchQueue?.globalRateLimiter, - startConsumers, + startConsumers: startBatchQueueConsumers, tracer: options.tracer, meter: options.meter, }); @@ -357,7 +357,7 @@ export class RunEngine { consumerCount: options.batchQueue?.consumerCount ?? 2, drrQuantum: options.batchQueue?.drr?.quantum ?? 5, defaultConcurrency: options.batchQueue?.defaultConcurrency ?? 10, - consumersEnabled: startConsumers, + consumersEnabled: startBatchQueueConsumers, }); this.runAttemptSystem = new RunAttemptSystem({ @@ -464,18 +464,18 @@ export class RunEngine { debounce: debounce.mode === "trailing" ? { - ...debounce, - updateData: { - payload, - payloadType, - metadata, - metadataType, - tags, - maxAttempts, - maxDurationInSeconds, - machine, - }, - } + ...debounce, + updateData: { + payload, + payloadType, + metadata, + metadataType, + tags, + maxAttempts, + maxDurationInSeconds, + machine, + }, + } : debounce, tx: prisma, }); @@ -574,8 +574,8 @@ export class RunEngine { tags.length === 0 ? undefined : { - connect: tags, - }, + connect: tags, + }, runTags: tags.length === 0 ? undefined : tags.map((tag) => tag.name), oneTimeUseToken, parentTaskRunId, @@ -598,10 +598,10 @@ export class RunEngine { realtimeStreamsVersion, debounce: debounce ? { - key: debounce.key, - delay: debounce.delay, - createdAt: new Date(), - } + key: debounce.key, + delay: debounce.delay, + createdAt: new Date(), + } : undefined, executionSnapshots: { create: { @@ -1750,17 +1750,17 @@ export class RunEngine { const error = latestSnapshot.environmentType === "DEVELOPMENT" ? ({ - type: "INTERNAL_ERROR", - code: taskStalledErrorCode, - message: errorMessage, - } satisfies TaskRunInternalError) + type: "INTERNAL_ERROR", + code: taskStalledErrorCode, + message: errorMessage, + } satisfies TaskRunInternalError) : this.options.treatProductionExecutionStallsAsOOM - ? ({ + ? ({ type: "INTERNAL_ERROR", code: "TASK_PROCESS_OOM_KILLED", message: "Run was terminated due to running out of memory", } satisfies TaskRunInternalError) - : ({ + : ({ type: "INTERNAL_ERROR", code: taskStalledErrorCode, message: errorMessage, @@ -1775,10 +1775,10 @@ export class RunEngine { error, retry: shouldRetry ? { - //250ms in the future - timestamp: Date.now() + retryDelay, - delay: retryDelay, - } + //250ms in the future + timestamp: Date.now() + retryDelay, + delay: retryDelay, + } : undefined, }, forceRequeue: true, diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index c48316691b..ee5176c2fa 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -80,6 +80,7 @@ export type RunEngineOptions = { shardCount?: number; /** Worker queue blocking timeout in seconds (enables two-stage processing) */ workerQueueBlockingTimeoutSeconds?: number; + consumerEnabled?: boolean; consumerCount?: number; consumerIntervalMs?: number; /** Default processing concurrency per environment when no specific limit is set */