Skip to content

Commit d2bf99e

Browse files
committed
stream: keep overlapping broadcast reads pending
Broadcast consumers may receive overlapping next() calls on the same iterator. Queue those reads so chunks satisfy them in call order. A single written chunk should resolve the earliest pending next(); later next() calls remain pending until more data is written or the broadcast completes. Fixes: #63499 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5
1 parent 330f44f commit d2bf99e

2 files changed

Lines changed: 90 additions & 4 deletions

File tree

lib/internal/streams/iter/broadcast.js

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
const {
1010
ArrayIsArray,
1111
ArrayPrototypePush,
12+
ArrayPrototypeShift,
1213
MathMax,
1314
PromisePrototypeThen,
1415
PromiseReject,
@@ -146,6 +147,7 @@ class BroadcastImpl {
146147
cursor: this.#bufferStart,
147148
resolve: null,
148149
reject: null,
150+
pending: [],
149151
detached: false,
150152
};
151153

@@ -165,9 +167,10 @@ class BroadcastImpl {
165167

166168
function detach() {
167169
state.detached = true;
168-
state.resolve?.({ __proto__: null, done: true, value: undefined });
169-
state.resolve = null;
170-
state.reject = null;
170+
if (state.resolve) {
171+
state.resolve({ __proto__: null, done: true, value: undefined });
172+
}
173+
self.#resolvePendingDone(state);
171174
if (self.#deleteConsumer(state)) {
172175
self.#tryTrimBuffer();
173176
}
@@ -208,6 +211,13 @@ class BroadcastImpl {
208211
return kDone;
209212
}
210213

214+
if (state.resolve) {
215+
const { promise, resolve, reject } = PromiseWithResolvers();
216+
ArrayPrototypePush(state.pending,
217+
{ __proto__: null, resolve, reject });
218+
return promise;
219+
}
220+
211221
const { promise, resolve, reject } = PromiseWithResolvers();
212222
state.resolve = resolve;
213223
state.reject = reject;
@@ -251,6 +261,11 @@ class BroadcastImpl {
251261
consumer.resolve = null;
252262
consumer.reject = null;
253263
}
264+
if (reason !== undefined) {
265+
this.#rejectPending(consumer, reason);
266+
} else {
267+
this.#resolvePendingDone(consumer);
268+
}
254269
consumer.detached = true;
255270
}
256271
this.#consumers.clear();
@@ -297,7 +312,7 @@ class BroadcastImpl {
297312
this.#ended = true;
298313

299314
for (const consumer of this.#consumers) {
300-
if (consumer.resolve) {
315+
while (consumer.resolve) {
301316
const bufferIndex = consumer.cursor - this.#bufferStart;
302317
if (bufferIndex < this.#buffer.length) {
303318
const chunk = this.#buffer.get(bufferIndex);
@@ -310,9 +325,15 @@ class BroadcastImpl {
310325
consumer.resolve({ __proto__: null, done: false, value: chunk });
311326
} else {
312327
consumer.resolve({ __proto__: null, done: true, value: undefined });
328+
this.#resolvePendingDone(consumer);
329+
consumer.detached = true;
313330
}
314331
consumer.resolve = null;
315332
consumer.reject = null;
333+
if (consumer.detached && this.#deleteConsumer(consumer)) {
334+
this.#tryTrimBuffer();
335+
break;
336+
}
316337
}
317338
}
318339
}
@@ -329,6 +350,7 @@ class BroadcastImpl {
329350
consumer.resolve = null;
330351
consumer.reject = null;
331352
}
353+
this.#rejectPending(consumer, reason);
332354
consumer.detached = true;
333355
}
334356
this.#consumers.clear();
@@ -397,6 +419,11 @@ class BroadcastImpl {
397419
consumer.resolve = null;
398420
consumer.reject = null;
399421
resolve({ __proto__: null, done: false, value: chunk });
422+
if (consumer.detached && this.#deleteConsumer(consumer)) {
423+
this.#tryTrimBuffer();
424+
} else if (this.#promotePending(consumer)) {
425+
ArrayPrototypePush(this.#waiters, consumer);
426+
}
400427
} else {
401428
// Still waiting -- put back
402429
ArrayPrototypePush(this.#waiters, consumer);
@@ -419,6 +446,31 @@ class BroadcastImpl {
419446
}
420447
return false;
421448
}
449+
450+
#promotePending(consumer) {
451+
const next = ArrayPrototypeShift(consumer.pending);
452+
if (next === undefined) return false;
453+
consumer.resolve = next.resolve;
454+
consumer.reject = next.reject;
455+
return true;
456+
}
457+
458+
#resolvePendingDone(consumer) {
459+
if (consumer.resolve) {
460+
consumer.resolve = null;
461+
consumer.reject = null;
462+
}
463+
while (consumer.pending.length > 0) {
464+
ArrayPrototypeShift(consumer.pending).resolve(
465+
{ __proto__: null, done: true, value: undefined });
466+
}
467+
}
468+
469+
#rejectPending(consumer, reason) {
470+
while (consumer.pending.length > 0) {
471+
ArrayPrototypeShift(consumer.pending).reject(reason);
472+
}
473+
}
422474
}
423475

424476
// =============================================================================

test/parallel/test-stream-iter-broadcast-basic.js

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
const common = require('../common');
55
const assert = require('assert');
6+
const { setTimeout } = require('timers/promises');
67
const { broadcast, text } = require('stream/iter');
78

89
// =============================================================================
@@ -255,6 +256,38 @@ async function testLateJoinerSeesBufferedData() {
255256
assert.strictEqual(result, 'before-join');
256257
}
257258

259+
async function testOverlappingNextKeepsEarlierRead() {
260+
const { writer, broadcast: bc } = broadcast();
261+
const it = bc.push()[Symbol.asyncIterator]();
262+
263+
const first = it.next();
264+
const second = it.next();
265+
266+
await writer.write('x');
267+
268+
const secondResult = await Promise.race([
269+
second.then((value) => ({ __proto__: null, settled: true, value })),
270+
setTimeout(common.platformTimeout(50),
271+
{ __proto__: null, settled: false }),
272+
]);
273+
assert.deepStrictEqual(secondResult, {
274+
__proto__: null,
275+
settled: false,
276+
});
277+
278+
const result = await first;
279+
assert.strictEqual(result.done, false);
280+
assert.strictEqual(Buffer.concat(result.value).toString(), 'x');
281+
282+
writer.endSync();
283+
assert.deepStrictEqual(await second, {
284+
__proto__: null,
285+
done: true,
286+
value: undefined,
287+
});
288+
assert.strictEqual(bc.consumerCount, 0);
289+
}
290+
258291
Promise.all([
259292
testBasicBroadcast(),
260293
testMultipleWrites(),
@@ -270,4 +303,5 @@ Promise.all([
270303
testFailDetachesConsumers(),
271304
testWriterFailIdempotent(),
272305
testLateJoinerSeesBufferedData(),
306+
testOverlappingNextKeepsEarlierRead(),
273307
]).then(common.mustCall());

0 commit comments

Comments
 (0)