diff --git a/lib/internal/streams/iter/broadcast.js b/lib/internal/streams/iter/broadcast.js index aa83c9636598ea..dd1f12ec8e11a9 100644 --- a/lib/internal/streams/iter/broadcast.js +++ b/lib/internal/streams/iter/broadcast.js @@ -127,7 +127,7 @@ class BroadcastImpl { // own internal AbortController that follows the external signal. // When no transforms, return rawConsumer directly (controller elided // per PULL-02 optimization -- no transforms means no signal recipient). - if (transforms.length > 0) { + if (transforms.length > 0 || options?.signal) { const pullArgs = [...transforms]; if (options?.signal) { ArrayPrototypePush(pullArgs, diff --git a/lib/internal/streams/iter/share.js b/lib/internal/streams/iter/share.js index f755550712efa7..e65a5eb648b620 100644 --- a/lib/internal/streams/iter/share.js +++ b/lib/internal/streams/iter/share.js @@ -97,7 +97,7 @@ class ShareImpl { const { transforms, options } = parsePullArgs(args); const rawConsumer = this.#createRawConsumer(); - if (transforms.length > 0) { + if (transforms.length > 0 || options?.signal) { if (options) { return pullWithTransforms(rawConsumer, ...transforms, options); } diff --git a/test/parallel/test-stream-iter-broadcast-basic.js b/test/parallel/test-stream-iter-broadcast-basic.js index 32c1750fb4cbfd..141a4ee1d83e64 100644 --- a/test/parallel/test-stream-iter-broadcast-basic.js +++ b/test/parallel/test-stream-iter-broadcast-basic.js @@ -173,6 +173,19 @@ async function testPendingNextSettlesAfterReturn() { assert.strictEqual(result.value, undefined); } +async function testPushAbortSignalRejectsPendingNext() { + const ac = new AbortController(); + const reason = new Error('push aborted'); + const { broadcast: bc } = broadcast(); + const iter = bc.push({ signal: ac.signal })[Symbol.asyncIterator](); + + const pendingNext = iter.next(); + const rejected = assert.rejects(pendingNext, (error) => error === reason); + ac.abort(reason); + + await rejected; +} + // ============================================================================= // Writer fail detaches consumers // ============================================================================= @@ -267,6 +280,7 @@ Promise.all([ testCancelWithReason(), testCancelWithFalsyReason(), testPendingNextSettlesAfterReturn(), + testPushAbortSignalRejectsPendingNext(), testFailDetachesConsumers(), testWriterFailIdempotent(), testLateJoinerSeesBufferedData(), diff --git a/test/parallel/test-stream-iter-share-async.js b/test/parallel/test-stream-iter-share-async.js index 7e1c06b6328f19..ad382c97e03cf7 100644 --- a/test/parallel/test-stream-iter-share-async.js +++ b/test/parallel/test-stream-iter-share-async.js @@ -196,6 +196,25 @@ async function testShareAbortSignalWhileSourcePullPending() { await Promise.all([rejected1, rejected2]); } +async function testSharePullAbortSignalRejectsPendingNext() { + const ac = new AbortController(); + const reason = new Error('pull aborted'); + const shared = share( + // eslint-disable-next-line require-yield + (async function* never() { + await new Promise(() => {}); + })(), + ); + const iter = shared.pull({ signal: ac.signal })[Symbol.asyncIterator](); + + const pendingNext = iter.next(); + const rejected = assert.rejects(pendingNext, (error) => error === reason); + ac.abort(reason); + + await rejected; + shared.cancel(); +} + async function testShareAlreadyAborted() { const shared = share(from('data'), { signal: AbortSignal.abort() }); const consumer = shared.pull(); @@ -340,6 +359,7 @@ Promise.all([ testShareCancelWithReason(), testShareAbortSignal(), testShareAbortSignalWhileSourcePullPending(), + testSharePullAbortSignalRejectsPendingNext(), testShareAlreadyAborted(), testShareSourceError(), testShareLateJoiningConsumer(),