From 8d58fd956f963299b5041ff3412178d689d3beae Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 20 Jan 2026 15:23:41 +0000 Subject: [PATCH 1/3] fix(batch): handle locked ReadableStream when retrying batch trigger When fetch crashes mid-stream during batch item upload (e.g., connection reset, timeout), the request stream may remain locked by fetch's internal reader. Attempting to cancel a locked stream throws 'Invalid state: ReadableStream is locked', causing the batch operation to fail. Added safeStreamCancel() helper that gracefully handles locked streams by catching and ignoring the locked error. The stream will be cleaned up by garbage collection when fetch eventually releases the reader. Fixes customer issue where batchTrigger failed with ReadableStream locked error during network instability. --- packages/core/src/v3/apiClient/index.ts | 40 +++++++++++--- .../src/v3/apiClient/streamBatchItems.test.ts | 52 +++++++++++++++++++ 2 files changed, 84 insertions(+), 8 deletions(-) diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index 7b07b2fb60..8efbc762ab 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -444,14 +444,14 @@ export class ApiClient { if (retryResult.retry) { // Cancel the request stream before retry to prevent tee() from buffering - await forRequest.cancel(); + await safeStreamCancel(forRequest); await sleep(retryResult.delay); // Use the backup stream for retry return this.#streamBatchItemsWithRetry(batchId, forRetry, retryOptions, attempt + 1); } // Not retrying - cancel the backup stream - await forRetry.cancel(); + await safeStreamCancel(forRetry); const errText = await response.text().catch((e) => (e as Error).message); let errJSON: Object | undefined; @@ -471,7 +471,7 @@ export class ApiClient { if (!parsed.success) { // Cancel backup stream since we're throwing - await forRetry.cancel(); + await safeStreamCancel(forRetry); throw new Error( `Invalid response from server for batch ${batchId}: ${parsed.error.message}` ); @@ -484,14 +484,14 @@ export class ApiClient { if (delay) { // Cancel the request stream before retry to prevent tee() from buffering - await forRequest.cancel(); + await safeStreamCancel(forRequest); // Retry with the backup stream await sleep(delay); return this.#streamBatchItemsWithRetry(batchId, forRetry, retryOptions, attempt + 1); } // No more retries - cancel backup stream and throw descriptive error - await forRetry.cancel(); + await safeStreamCancel(forRetry); throw new BatchNotSealedError({ batchId, enqueuedCount: parsed.data.enqueuedCount ?? 0, @@ -502,7 +502,7 @@ export class ApiClient { } // Success - cancel the backup stream to release resources - await forRetry.cancel(); + await safeStreamCancel(forRetry); return parsed.data; } catch (error) { @@ -519,13 +519,13 @@ export class ApiClient { const delay = calculateNextRetryDelay(retryOptions, attempt); if (delay) { // Cancel the request stream before retry to prevent tee() from buffering - await forRequest.cancel(); + await safeStreamCancel(forRequest); await sleep(delay); return this.#streamBatchItemsWithRetry(batchId, forRetry, retryOptions, attempt + 1); } // No more retries - cancel the backup stream - await forRetry.cancel(); + await safeStreamCancel(forRetry); // Wrap in a more descriptive error const cause = error instanceof Error ? error : new Error(String(error)); @@ -1731,6 +1731,30 @@ function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } +/** + * Safely cancels a ReadableStream, handling the case where it might be locked. + * + * When fetch uses a ReadableStream as a request body and an error occurs mid-transfer + * (connection reset, timeout, etc.), the stream may remain locked by fetch's internal reader. + * Attempting to cancel a locked stream throws "Invalid state: ReadableStream is locked". + * + * This function gracefully handles that case by catching the error and doing nothing - + * the stream will be cleaned up by garbage collection when the reader is released. + */ +async function safeStreamCancel(stream: ReadableStream): Promise { + try { + await stream.cancel(); + } catch (error) { + // Ignore "locked" errors - the stream will be cleaned up when the reader is released. + // This happens when fetch crashes mid-read and doesn't release the reader lock. + if (error instanceof TypeError && String(error).includes("locked")) { + return; + } + // Re-throw unexpected errors + throw error; + } +} + // ============================================================================ // NDJSON Stream Helpers // ============================================================================ diff --git a/packages/core/src/v3/apiClient/streamBatchItems.test.ts b/packages/core/src/v3/apiClient/streamBatchItems.test.ts index c148f9b68e..23a42b5732 100644 --- a/packages/core/src/v3/apiClient/streamBatchItems.test.ts +++ b/packages/core/src/v3/apiClient/streamBatchItems.test.ts @@ -398,6 +398,58 @@ describe("streamBatchItems stream cancellation on retry", () => { expect(cancelCallCount).toBeGreaterThanOrEqual(1); }); + it("handles locked stream when connection error occurs mid-read", async () => { + // This test simulates the real-world scenario where fetch throws an error + // while still holding the reader lock on the request body stream. + // This can happen with connection resets, timeouts, or network failures. + let callIndex = 0; + + const mockFetch = vi.fn().mockImplementation(async (_url: string, init?: RequestInit) => { + const currentAttempt = callIndex; + callIndex++; + + if (init?.body && init.body instanceof ReadableStream) { + if (currentAttempt === 0) { + // First attempt: Get a reader and start reading, but throw while still holding the lock. + // This simulates a connection error that happens mid-transfer. + const reader = init.body.getReader(); + await reader.read(); // Start reading + // DON'T release the lock - this simulates fetch crashing mid-read + throw new TypeError("Connection reset by peer"); + } + + // Subsequent attempts: consume and release normally + await consumeAndRelease(init.body); + } + + // Second attempt: success + return { + ok: true, + json: () => + Promise.resolve({ + id: "batch_test123", + itemsAccepted: 10, + itemsDeduplicated: 0, + sealed: true, + }), + }; + }); + globalThis.fetch = mockFetch; + + const client = new ApiClient("http://localhost:3030", "tr_test_key"); + + // This should NOT throw "ReadableStream is locked" error + // Instead it should gracefully handle the locked stream and retry + const result = await client.streamBatchItems( + "batch_test123", + [{ index: 0, task: "test-task", payload: "{}" }], + { retry: { maxAttempts: 3, minTimeoutInMs: 10, maxTimeoutInMs: 50 } } + ); + + expect(result.sealed).toBe(true); + expect(mockFetch).toHaveBeenCalledTimes(2); + }); + it("does not leak memory by leaving tee branches unconsumed during multiple retries", async () => { let cancelCallCount = 0; let callIndex = 0; From dc5e755cf89cad7d3a6b7f81147ad3af3b03df28 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 20 Jan 2026 15:24:36 +0000 Subject: [PATCH 2/3] chore: add changeset --- .changeset/gentle-streams-flow.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/gentle-streams-flow.md diff --git a/.changeset/gentle-streams-flow.md b/.changeset/gentle-streams-flow.md new file mode 100644 index 0000000000..16113f7c8e --- /dev/null +++ b/.changeset/gentle-streams-flow.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/core": patch +--- + +Fix batch trigger failing with "ReadableStream is locked" error when network failures occur mid-stream. Added safe stream cancellation that gracefully handles locked streams during retry attempts. From d6d933dd6d2fd2d1c6ba35e3eb1e774a1d58f8b6 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 20 Jan 2026 15:32:12 +0000 Subject: [PATCH 3/3] remove batch concurrency cleaner script