From ed412f7db5f87953c7fba096498321180adc6542 Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Fri, 15 May 2026 17:11:50 -0700 Subject: [PATCH 1/2] stream: propagate abort reason in share and broadcast Pass signal.reason to the multi-consumer cancel paths so signal abort is reported as AbortError instead of clean iterator completion. Also make detached share consumers rethrow a stored source error when they resume after cancellation, preserving the abort reason for pending pulls. Fixes: https://github.com/nodejs/node/issues/63357 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 --- lib/internal/streams/iter/broadcast.js | 2 +- lib/internal/streams/iter/share.js | 3 +- .../test-stream-iter-broadcast-from.js | 21 +++---- test/parallel/test-stream-iter-share-async.js | 63 +++++++++++++++---- 4 files changed, 64 insertions(+), 25 deletions(-) diff --git a/lib/internal/streams/iter/broadcast.js b/lib/internal/streams/iter/broadcast.js index 7b6fc3525d122f..8cbbf0bb501dc1 100644 --- a/lib/internal/streams/iter/broadcast.js +++ b/lib/internal/streams/iter/broadcast.js @@ -720,7 +720,7 @@ function broadcast(options = { __proto__: null }) { broadcastImpl.setWriter(writer); if (signal) { - onSignalAbort(signal, () => broadcastImpl.cancel()); + onSignalAbort(signal, () => broadcastImpl.cancel(signal.reason)); } return { __proto__: null, writer, broadcast: broadcastImpl }; diff --git a/lib/internal/streams/iter/share.js b/lib/internal/streams/iter/share.js index 0160bc7eace009..9cf169f8a5f3c1 100644 --- a/lib/internal/streams/iter/share.js +++ b/lib/internal/streams/iter/share.js @@ -144,6 +144,7 @@ class ShareImpl { // cursor must re-pull rather than terminating prematurely. for (;;) { if (state.detached) { + if (self.#sourceError) throw self.#sourceError; return { __proto__: null, done: true, value: undefined }; } @@ -657,7 +658,7 @@ function share(source, options = { __proto__: null }) { const shareImpl = new ShareImpl(normalized, opts); if (signal) { - onSignalAbort(signal, () => shareImpl.cancel()); + onSignalAbort(signal, () => shareImpl.cancel(signal.reason)); } return shareImpl; diff --git a/test/parallel/test-stream-iter-broadcast-from.js b/test/parallel/test-stream-iter-broadcast-from.js index 2f17b1a7de92fa..200d0a7022d09c 100644 --- a/test/parallel/test-stream-iter-broadcast-from.js +++ b/test/parallel/test-stream-iter-broadcast-from.js @@ -66,15 +66,13 @@ async function testBroadcastFromMultipleConsumers() { async function testAbortSignal() { const ac = new AbortController(); const { broadcast: bc } = broadcast({ signal: ac.signal }); - const consumer = bc.push(); + const iter = bc.push()[Symbol.asyncIterator](); + const read = iter.next(); + const rejected = assert.rejects(read, { name: 'AbortError' }); ac.abort(); - const batches = []; - for await (const batch of consumer) { - batches.push(batch); - } - assert.strictEqual(batches.length, 0); + await rejected; } async function testAlreadyAbortedSignal() { @@ -84,11 +82,12 @@ async function testAlreadyAbortedSignal() { const { broadcast: bc } = broadcast({ signal: ac.signal }); const consumer = bc.push(); - const batches = []; - for await (const batch of consumer) { - batches.push(batch); - } - assert.strictEqual(batches.length, 0); + await assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of consumer) { + assert.fail('Should not reach here'); + } + }, { name: 'AbortError' }); } // ============================================================================= diff --git a/test/parallel/test-stream-iter-share-async.js b/test/parallel/test-stream-iter-share-async.js index 86b0eb9b273a34..8138414d264fb8 100644 --- a/test/parallel/test-stream-iter-share-async.js +++ b/test/parallel/test-stream-iter-share-async.js @@ -134,16 +134,53 @@ async function testShareCancelWithReason() { async function testShareAbortSignal() { const ac = new AbortController(); - const shared = share(from('data'), { signal: ac.signal }); - const consumer = shared.pull(); - + const enc = new TextEncoder(); + async function* source() { + yield [enc.encode('a')]; + yield [enc.encode('b')]; + } + const shared = share(source(), { + highWaterMark: 1, + backpressure: 'block', + signal: ac.signal, + }); + const fast = shared.pull()[Symbol.asyncIterator](); + shared.pull(); + + await fast.next(); + const read = fast.next(); + const rejected = assert.rejects(read, { name: 'AbortError' }); ac.abort(); - const batches = []; - for await (const batch of consumer) { - batches.push(batch); + await rejected; +} + +async function testShareAbortSignalWhileSourcePullPending() { + const ac = new AbortController(); + let resume; + let sourceStarted; + const sourceStartedPromise = new Promise((resolve) => { + sourceStarted = resolve; + }); + async function* source() { + await new Promise((resolve) => { + resume = resolve; + sourceStarted(); + }); } - assert.strictEqual(batches.length, 0); + const shared = share(source(), { signal: ac.signal }); + const iter1 = shared.pull()[Symbol.asyncIterator](); + const iter2 = shared.pull()[Symbol.asyncIterator](); + const read1 = iter1.next(); + const read2 = iter2.next(); + const rejected1 = assert.rejects(read1, { name: 'AbortError' }); + const rejected2 = assert.rejects(read2, { name: 'AbortError' }); + + await sourceStartedPromise; + ac.abort(); + resume(); + + await Promise.all([rejected1, rejected2]); } async function testShareAlreadyAborted() { @@ -153,11 +190,12 @@ async function testShareAlreadyAborted() { const shared = share(from('data'), { signal: ac.signal }); const consumer = shared.pull(); - const batches = []; - for await (const batch of consumer) { - batches.push(batch); - } - assert.strictEqual(batches.length, 0); + await assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of consumer) { + assert.fail('Should not reach here'); + } + }, { name: 'AbortError' }); } // ============================================================================= @@ -273,6 +311,7 @@ Promise.all([ testShareCancelMidIteration(), testShareCancelWithReason(), testShareAbortSignal(), + testShareAbortSignalWhileSourcePullPending(), testShareAlreadyAborted(), testShareSourceError(), testShareLateJoiningConsumer(), From 0b2f09f0b1da2e28445de49dfe7ce8462961a70e Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Fri, 15 May 2026 17:42:36 -0700 Subject: [PATCH 2/2] test: avoid no-yield generator in stream iter share test --- test/parallel/test-stream-iter-share-async.js | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/test/parallel/test-stream-iter-share-async.js b/test/parallel/test-stream-iter-share-async.js index 8138414d264fb8..f7a53a063cb3e0 100644 --- a/test/parallel/test-stream-iter-share-async.js +++ b/test/parallel/test-stream-iter-share-async.js @@ -162,13 +162,22 @@ async function testShareAbortSignalWhileSourcePullPending() { const sourceStartedPromise = new Promise((resolve) => { sourceStarted = resolve; }); - async function* source() { - await new Promise((resolve) => { - resume = resolve; - sourceStarted(); - }); - } - const shared = share(source(), { signal: ac.signal }); + const source = { + __proto__: null, + [Symbol.asyncIterator]() { + return { + __proto__: null, + async next() { + await new Promise((resolve) => { + resume = resolve; + sourceStarted(); + }); + return { __proto__: null, done: true, value: undefined }; + }, + }; + }, + }; + const shared = share(source, { signal: ac.signal }); const iter1 = shared.pull()[Symbol.asyncIterator](); const iter2 = shared.pull()[Symbol.asyncIterator](); const read1 = iter1.next();