diff --git a/.changeset/gentle-streams-flow.md b/.changeset/gentle-streams-flow.md new file mode 100644 index 0000000000..16113f7c8e --- /dev/null +++ b/.changeset/gentle-streams-flow.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/core": patch +--- + +Fix batch trigger failing with "ReadableStream is locked" error when network failures occur mid-stream. Added safe stream cancellation that gracefully handles locked streams during retry attempts. diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index 7b07b2fb60..8efbc762ab 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -444,14 +444,14 @@ export class ApiClient { if (retryResult.retry) { // Cancel the request stream before retry to prevent tee() from buffering - await forRequest.cancel(); + await safeStreamCancel(forRequest); await sleep(retryResult.delay); // Use the backup stream for retry return this.#streamBatchItemsWithRetry(batchId, forRetry, retryOptions, attempt + 1); } // Not retrying - cancel the backup stream - await forRetry.cancel(); + await safeStreamCancel(forRetry); const errText = await response.text().catch((e) => (e as Error).message); let errJSON: Object | undefined; @@ -471,7 +471,7 @@ export class ApiClient { if (!parsed.success) { // Cancel backup stream since we're throwing - await forRetry.cancel(); + await safeStreamCancel(forRetry); throw new Error( `Invalid response from server for batch ${batchId}: ${parsed.error.message}` ); @@ -484,14 +484,14 @@ export class ApiClient { if (delay) { // Cancel the request stream before retry to prevent tee() from buffering - await forRequest.cancel(); + await safeStreamCancel(forRequest); // Retry with the backup stream await sleep(delay); return this.#streamBatchItemsWithRetry(batchId, forRetry, retryOptions, attempt + 1); } // No more retries - cancel backup stream and throw descriptive error - await forRetry.cancel(); + await safeStreamCancel(forRetry); throw new BatchNotSealedError({ batchId, enqueuedCount: parsed.data.enqueuedCount ?? 0, @@ -502,7 +502,7 @@ export class ApiClient { } // Success - cancel the backup stream to release resources - await forRetry.cancel(); + await safeStreamCancel(forRetry); return parsed.data; } catch (error) { @@ -519,13 +519,13 @@ export class ApiClient { const delay = calculateNextRetryDelay(retryOptions, attempt); if (delay) { // Cancel the request stream before retry to prevent tee() from buffering - await forRequest.cancel(); + await safeStreamCancel(forRequest); await sleep(delay); return this.#streamBatchItemsWithRetry(batchId, forRetry, retryOptions, attempt + 1); } // No more retries - cancel the backup stream - await forRetry.cancel(); + await safeStreamCancel(forRetry); // Wrap in a more descriptive error const cause = error instanceof Error ? error : new Error(String(error)); @@ -1731,6 +1731,30 @@ function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } +/** + * Safely cancels a ReadableStream, handling the case where it might be locked. + * + * When fetch uses a ReadableStream as a request body and an error occurs mid-transfer + * (connection reset, timeout, etc.), the stream may remain locked by fetch's internal reader. + * Attempting to cancel a locked stream throws "Invalid state: ReadableStream is locked". + * + * This function gracefully handles that case by catching the error and doing nothing - + * the stream will be cleaned up by garbage collection when the reader is released. + */ +async function safeStreamCancel(stream: ReadableStream): Promise { + try { + await stream.cancel(); + } catch (error) { + // Ignore "locked" errors - the stream will be cleaned up when the reader is released. + // This happens when fetch crashes mid-read and doesn't release the reader lock. + if (error instanceof TypeError && String(error).includes("locked")) { + return; + } + // Re-throw unexpected errors + throw error; + } +} + // ============================================================================ // NDJSON Stream Helpers // ============================================================================ diff --git a/packages/core/src/v3/apiClient/streamBatchItems.test.ts b/packages/core/src/v3/apiClient/streamBatchItems.test.ts index c148f9b68e..23a42b5732 100644 --- a/packages/core/src/v3/apiClient/streamBatchItems.test.ts +++ b/packages/core/src/v3/apiClient/streamBatchItems.test.ts @@ -398,6 +398,58 @@ describe("streamBatchItems stream cancellation on retry", () => { expect(cancelCallCount).toBeGreaterThanOrEqual(1); }); + it("handles locked stream when connection error occurs mid-read", async () => { + // This test simulates the real-world scenario where fetch throws an error + // while still holding the reader lock on the request body stream. + // This can happen with connection resets, timeouts, or network failures. + let callIndex = 0; + + const mockFetch = vi.fn().mockImplementation(async (_url: string, init?: RequestInit) => { + const currentAttempt = callIndex; + callIndex++; + + if (init?.body && init.body instanceof ReadableStream) { + if (currentAttempt === 0) { + // First attempt: Get a reader and start reading, but throw while still holding the lock. + // This simulates a connection error that happens mid-transfer. + const reader = init.body.getReader(); + await reader.read(); // Start reading + // DON'T release the lock - this simulates fetch crashing mid-read + throw new TypeError("Connection reset by peer"); + } + + // Subsequent attempts: consume and release normally + await consumeAndRelease(init.body); + } + + // Second attempt: success + return { + ok: true, + json: () => + Promise.resolve({ + id: "batch_test123", + itemsAccepted: 10, + itemsDeduplicated: 0, + sealed: true, + }), + }; + }); + globalThis.fetch = mockFetch; + + const client = new ApiClient("http://localhost:3030", "tr_test_key"); + + // This should NOT throw "ReadableStream is locked" error + // Instead it should gracefully handle the locked stream and retry + const result = await client.streamBatchItems( + "batch_test123", + [{ index: 0, task: "test-task", payload: "{}" }], + { retry: { maxAttempts: 3, minTimeoutInMs: 10, maxTimeoutInMs: 50 } } + ); + + expect(result.sealed).toBe(true); + expect(mockFetch).toHaveBeenCalledTimes(2); + }); + it("does not leak memory by leaving tee branches unconsumed during multiple retries", async () => { let cancelCallCount = 0; let callIndex = 0;