From ea13e11c0d4b41eae3b1335e314c00b28bd82cac Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 14 Jun 2026 14:17:50 -0700 Subject: [PATCH] fix(moq-net): read frames in place, without minting a per-poll FrameConsumer GroupConsumer::poll_read_frame / poll_read_frame_chunks called poll_get_frame -> frame.consume() on every poll, then dropped that FrameConsumer whenever the frame's data wasn't complete yet (still in flight). A FrameConsumer is a kio consumer handle, so that create+drop flips the frame's consumer count 0->1->0 each poll, and kio wakes the state's waiters on both the first-appears and last-drops transitions -- the same waiters our own read registered on. Every poll re-woke itself: a silent busy spin. On a multi-threaded runtime the producer fills the frame concurrently so the spin ends in microseconds (wasted CPU, no visible hang). On a single-thread executor (e.g. wasm) the consumer's self-wake loop starves the producer, so the frame never completes and the spin runs away into a hard freeze. Read the frame in place instead of through a consumer handle: - kio: add `Producer::poll_ref`, a read-only counterpart to `Producer::poll` that registers a waiter on a read condition without taking a `Mut` (no modified flag, no consumer-count churn). - model/frame: `FrameProducer::poll_read_all` reads the producer's own buffer once finished, via poll_ref. Stateless (always offset 0), so parallel readers are fine. - model/group: `GroupState::poll_frame_read_all` reads the cached FrameProducer directly; poll_read_frame / poll_read_frame_chunks use it and no longer mint a FrameConsumer. GroupConsumer stays a plain derive(Clone) with no extra state. Co-Authored-By: Claude Opus 4.8 (1M context) --- rs/kio/src/producer.rs | 28 ++++++++++++++++++++++++++++ rs/moq-net/src/model/frame.rs | 29 +++++++++++++++++++++++++++++ rs/moq-net/src/model/group.rs | 34 +++++++++++++++++++++++++++------- 3 files changed, 84 insertions(+), 7 deletions(-) diff --git a/rs/kio/src/producer.rs b/rs/kio/src/producer.rs index 77ce38b0c..7db3eba6f 100644 --- a/rs/kio/src/producer.rs +++ b/rs/kio/src/producer.rs @@ -111,6 +111,34 @@ impl Producer { Poll::Pending } + /// Poll-based **read-only** access with waker registration. + /// + /// Like [`Self::poll`] but hands `f` a [`Ref`] instead of a [`Mut`], so it + /// never flags the state modified and never wakes consumers. Use it to wait on + /// a read condition (e.g. a `fin` flag) from the producer side without creating + /// a [`Consumer`] — creating/dropping a consumer churns the consumer count and + /// wakes the value waiters, which would spin a polling reader. + pub fn poll_ref(&self, waiter: &Waiter, mut f: F) -> Poll>> + where + F: FnMut(&Ref<'_, T>) -> Poll, + { + let state = self.state.lock(); + let state = Ref { state }; + + if let Poll::Ready(res) = f(&state) { + return Poll::Ready(Ok(res)); + } + + if state.state.closed { + return Poll::Ready(Err(state)); + } + + let mut state = state.state; + waiter.register(&mut state.waiters); + + Poll::Pending + } + /// Wait for the closure to return [`Poll::Ready`], re-polling on each state change. /// /// Returns `Ok(R)` when the closure returns [`Poll::Ready`], or `Err(Ref)` with diff --git a/rs/moq-net/src/model/frame.rs b/rs/moq-net/src/model/frame.rs index d46ad2521..85866fed3 100644 --- a/rs/moq-net/src/model/frame.rs +++ b/rs/moq-net/src/model/frame.rs @@ -226,6 +226,35 @@ impl FrameProducer { } } + /// Poll for the frame's full payload, resolving once it's finished. + /// + /// Reads from the producer side (via `kio::Producer::poll_ref`) so a polling + /// reader doesn't mint a transient [`FrameConsumer`] per poll: that would churn + /// the consumer count and wake the value waiters, spinning the poll. Always + /// returns the whole payload (offset 0), so it's safe to call from any number + /// of readers in parallel. + pub(crate) fn poll_read_all(&self, waiter: &kio::Waiter) -> Poll> { + let res = ready!(self.state.poll_ref(waiter, |state| { + if state.fin { + Poll::Ready(Ok(())) + } else if let Some(err) = &state.abort { + Poll::Ready(Err(err.clone())) + } else { + Poll::Pending + } + })); + + match res { + Ok(Ok(())) => { + // `fin` implies written == capacity (the producer fills the whole buffer). + let written = self.buf.written(Ordering::Acquire); + Poll::Ready(Ok(Bytes::from_owner(self.buf.clone()).slice(0..written))) + } + Ok(Err(err)) => Poll::Ready(Err(err)), + Err(state) => Poll::Ready(Err(state.abort.clone().unwrap_or(Error::Dropped))), + } + } + /// Block until there are no active consumers. pub async fn unused(&self) -> Result<()> { self.state diff --git a/rs/moq-net/src/model/group.rs b/rs/moq-net/src/model/group.rs index 0532e9ee2..c7b9d1e0d 100644 --- a/rs/moq-net/src/model/group.rs +++ b/rs/moq-net/src/model/group.rs @@ -104,6 +104,26 @@ impl GroupState { } } + /// Poll for the full payload of the frame at `index`, reading it in place. + /// + /// Unlike [`Self::poll_get_frame`] this never mints a [`FrameConsumer`] (which + /// would churn the frame's consumer count and wake its waiters every poll); it + /// reads the cached [`FrameProducer`] directly. `waiter` is registered on the + /// frame's state so the reader wakes when it finishes. + fn poll_frame_read_all(&self, index: usize, waiter: &kio::Waiter) -> Poll>> { + if index < self.offset { + return Poll::Ready(Err(Error::CacheFull)); + } + match self.frames.get(index - self.offset) { + Some(frame) => Poll::Ready(Ok(Some(ready!(frame.poll_read_all(waiter))?))), + None if self.fin => Poll::Ready(Ok(None)), + None => match &self.abort { + Some(err) => Poll::Ready(Err(err.clone())), + None => Poll::Pending, + }, + } + } + fn poll_finished(&self) -> Poll> { if self.fin { Poll::Ready(Ok((self.offset + self.frames.len()) as u64)) @@ -347,13 +367,12 @@ impl GroupConsumer { /// Read the next frame's data all at once, without blocking. pub fn poll_read_frame(&mut self, waiter: &kio::Waiter) -> Poll>> { - let Some(mut frame) = ready!(self.poll(waiter, |state| state.poll_get_frame(self.index))?) else { + let index = self.index; + let Some(data) = ready!(self.poll(waiter, |state| state.poll_frame_read_all(index, waiter))?) else { return Poll::Ready(Ok(None)); }; - let data = ready!(frame.poll_read_all(waiter))?; self.index += 1; - Poll::Ready(Ok(Some(data))) } @@ -364,14 +383,15 @@ impl GroupConsumer { /// Read all of the chunks of the next frame, without blocking. pub fn poll_read_frame_chunks(&mut self, waiter: &kio::Waiter) -> Poll>>> { - let Some(mut frame) = ready!(self.poll(waiter, |state| state.poll_get_frame(self.index))?) else { + let index = self.index; + let Some(data) = ready!(self.poll(waiter, |state| state.poll_frame_read_all(index, waiter))?) else { return Poll::Ready(Ok(None)); }; - let data = ready!(frame.poll_read_all_chunks(waiter))?; self.index += 1; - - Poll::Ready(Ok(Some(data))) + // In-place reads return the whole frame as one slice; keep the chunked API + // shape (empty payload -> no chunks). + Poll::Ready(Ok(Some(if data.is_empty() { Vec::new() } else { vec![data] }))) } /// Read all of the chunks of the next frame.