Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/gentle-streams-flow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Fix batch trigger failing with "ReadableStream is locked" error when network failures occur mid-stream. Added safe stream cancellation that gracefully handles locked streams during retry attempts.
40 changes: 32 additions & 8 deletions packages/core/src/v3/apiClient/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -444,14 +444,14 @@ export class ApiClient {

if (retryResult.retry) {
// Cancel the request stream before retry to prevent tee() from buffering
await forRequest.cancel();
await safeStreamCancel(forRequest);
await sleep(retryResult.delay);
// Use the backup stream for retry
return this.#streamBatchItemsWithRetry(batchId, forRetry, retryOptions, attempt + 1);
}

// Not retrying - cancel the backup stream
await forRetry.cancel();
await safeStreamCancel(forRetry);

const errText = await response.text().catch((e) => (e as Error).message);
let errJSON: Object | undefined;
Expand All @@ -471,7 +471,7 @@ export class ApiClient {

if (!parsed.success) {
// Cancel backup stream since we're throwing
await forRetry.cancel();
await safeStreamCancel(forRetry);
throw new Error(
`Invalid response from server for batch ${batchId}: ${parsed.error.message}`
);
Expand All @@ -484,14 +484,14 @@ export class ApiClient {

if (delay) {
// Cancel the request stream before retry to prevent tee() from buffering
await forRequest.cancel();
await safeStreamCancel(forRequest);
// Retry with the backup stream
await sleep(delay);
return this.#streamBatchItemsWithRetry(batchId, forRetry, retryOptions, attempt + 1);
}

// No more retries - cancel backup stream and throw descriptive error
await forRetry.cancel();
await safeStreamCancel(forRetry);
throw new BatchNotSealedError({
batchId,
enqueuedCount: parsed.data.enqueuedCount ?? 0,
Expand All @@ -502,7 +502,7 @@ export class ApiClient {
}

// Success - cancel the backup stream to release resources
await forRetry.cancel();
await safeStreamCancel(forRetry);

return parsed.data;
} catch (error) {
Expand All @@ -519,13 +519,13 @@ export class ApiClient {
const delay = calculateNextRetryDelay(retryOptions, attempt);
if (delay) {
// Cancel the request stream before retry to prevent tee() from buffering
await forRequest.cancel();
await safeStreamCancel(forRequest);
await sleep(delay);
return this.#streamBatchItemsWithRetry(batchId, forRetry, retryOptions, attempt + 1);
}

// No more retries - cancel the backup stream
await forRetry.cancel();
await safeStreamCancel(forRetry);

// Wrap in a more descriptive error
const cause = error instanceof Error ? error : new Error(String(error));
Expand Down Expand Up @@ -1731,6 +1731,30 @@ function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

/**
* Safely cancels a ReadableStream, handling the case where it might be locked.
*
* When fetch uses a ReadableStream as a request body and an error occurs mid-transfer
* (connection reset, timeout, etc.), the stream may remain locked by fetch's internal reader.
* Attempting to cancel a locked stream throws "Invalid state: ReadableStream is locked".
*
* This function gracefully handles that case by catching the error and doing nothing -
* the stream will be cleaned up by garbage collection when the reader is released.
*/
async function safeStreamCancel(stream: ReadableStream<unknown>): Promise<void> {
try {
await stream.cancel();
} catch (error) {
// Ignore "locked" errors - the stream will be cleaned up when the reader is released.
// This happens when fetch crashes mid-read and doesn't release the reader lock.
if (error instanceof TypeError && String(error).includes("locked")) {
return;
}
// Re-throw unexpected errors
throw error;
}
}

// ============================================================================
// NDJSON Stream Helpers
// ============================================================================
Expand Down
52 changes: 52 additions & 0 deletions packages/core/src/v3/apiClient/streamBatchItems.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,58 @@ describe("streamBatchItems stream cancellation on retry", () => {
expect(cancelCallCount).toBeGreaterThanOrEqual(1);
});

it("handles locked stream when connection error occurs mid-read", async () => {
// This test simulates the real-world scenario where fetch throws an error
// while still holding the reader lock on the request body stream.
// This can happen with connection resets, timeouts, or network failures.
let callIndex = 0;

const mockFetch = vi.fn().mockImplementation(async (_url: string, init?: RequestInit) => {
const currentAttempt = callIndex;
callIndex++;

if (init?.body && init.body instanceof ReadableStream) {
if (currentAttempt === 0) {
// First attempt: Get a reader and start reading, but throw while still holding the lock.
// This simulates a connection error that happens mid-transfer.
const reader = init.body.getReader();
await reader.read(); // Start reading
// DON'T release the lock - this simulates fetch crashing mid-read
throw new TypeError("Connection reset by peer");
}

// Subsequent attempts: consume and release normally
await consumeAndRelease(init.body);
}

// Second attempt: success
return {
ok: true,
json: () =>
Promise.resolve({
id: "batch_test123",
itemsAccepted: 10,
itemsDeduplicated: 0,
sealed: true,
}),
};
});
globalThis.fetch = mockFetch;

const client = new ApiClient("http://localhost:3030", "tr_test_key");

// This should NOT throw "ReadableStream is locked" error
// Instead it should gracefully handle the locked stream and retry
const result = await client.streamBatchItems(
"batch_test123",
[{ index: 0, task: "test-task", payload: "{}" }],
{ retry: { maxAttempts: 3, minTimeoutInMs: 10, maxTimeoutInMs: 50 } }
);

expect(result.sealed).toBe(true);
expect(mockFetch).toHaveBeenCalledTimes(2);
});

it("does not leak memory by leaving tee branches unconsumed during multiple retries", async () => {
let cancelCallCount = 0;
let callIndex = 0;
Expand Down