Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions rs/kio/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,34 @@ impl<T> Producer<T> {
Poll::Pending
}

/// Poll-based **read-only** access with waker registration.
///
/// Like [`Self::poll`] but hands `f` a [`Ref`] instead of a [`Mut`], so it
/// never flags the state modified and never wakes consumers. Use it to wait on
/// a read condition (e.g. a `fin` flag) from the producer side without creating
/// a [`Consumer`] — creating/dropping a consumer churns the consumer count and
/// wakes the value waiters, which would spin a polling reader.
pub fn poll_ref<F, R>(&self, waiter: &Waiter, mut f: F) -> Poll<Result<R, Ref<'_, T>>>
where
F: FnMut(&Ref<'_, T>) -> Poll<R>,
{
let state = self.state.lock();
let state = Ref { state };

if let Poll::Ready(res) = f(&state) {
return Poll::Ready(Ok(res));
}

if state.state.closed {
return Poll::Ready(Err(state));
}

let mut state = state.state;
waiter.register(&mut state.waiters);

Poll::Pending
}

/// Wait for the closure to return [`Poll::Ready`], re-polling on each state change.
///
/// Returns `Ok(R)` when the closure returns [`Poll::Ready`], or `Err(Ref)` with
Expand Down
29 changes: 29 additions & 0 deletions rs/moq-net/src/model/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,35 @@ impl FrameProducer {
}
}

/// Poll for the frame's full payload, resolving once it's finished.
///
/// Reads from the producer side (via `kio::Producer::poll_ref`) so a polling
/// reader doesn't mint a transient [`FrameConsumer`] per poll: that would churn
/// the consumer count and wake the value waiters, spinning the poll. Always
/// returns the whole payload (offset 0), so it's safe to call from any number
/// of readers in parallel.
pub(crate) fn poll_read_all(&self, waiter: &kio::Waiter) -> Poll<Result<Bytes>> {
let res = ready!(self.state.poll_ref(waiter, |state| {
if state.fin {
Poll::Ready(Ok(()))
} else if let Some(err) = &state.abort {
Poll::Ready(Err(err.clone()))
} else {
Poll::Pending
}
}));

match res {
Ok(Ok(())) => {
// `fin` implies written == capacity (the producer fills the whole buffer).
let written = self.buf.written(Ordering::Acquire);
Poll::Ready(Ok(Bytes::from_owner(self.buf.clone()).slice(0..written)))
}
Ok(Err(err)) => Poll::Ready(Err(err)),
Err(state) => Poll::Ready(Err(state.abort.clone().unwrap_or(Error::Dropped))),
}
}

/// Block until there are no active consumers.
pub async fn unused(&self) -> Result<()> {
self.state
Expand Down
34 changes: 27 additions & 7 deletions rs/moq-net/src/model/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,26 @@ impl GroupState {
}
}

/// Poll for the full payload of the frame at `index`, reading it in place.
///
/// Unlike [`Self::poll_get_frame`] this never mints a [`FrameConsumer`] (which
/// would churn the frame's consumer count and wake its waiters every poll); it
/// reads the cached [`FrameProducer`] directly. `waiter` is registered on the
/// frame's state so the reader wakes when it finishes.
fn poll_frame_read_all(&self, index: usize, waiter: &kio::Waiter) -> Poll<Result<Option<Bytes>>> {
if index < self.offset {
return Poll::Ready(Err(Error::CacheFull));
}
match self.frames.get(index - self.offset) {
Some(frame) => Poll::Ready(Ok(Some(ready!(frame.poll_read_all(waiter))?))),
None if self.fin => Poll::Ready(Ok(None)),
None => match &self.abort {
Some(err) => Poll::Ready(Err(err.clone())),
None => Poll::Pending,
},
}
}

fn poll_finished(&self) -> Poll<Result<u64>> {
if self.fin {
Poll::Ready(Ok((self.offset + self.frames.len()) as u64))
Expand Down Expand Up @@ -347,13 +367,12 @@ impl GroupConsumer {

/// Read the next frame's data all at once, without blocking.
pub fn poll_read_frame(&mut self, waiter: &kio::Waiter) -> Poll<Result<Option<Bytes>>> {
let Some(mut frame) = ready!(self.poll(waiter, |state| state.poll_get_frame(self.index))?) else {
let index = self.index;
let Some(data) = ready!(self.poll(waiter, |state| state.poll_frame_read_all(index, waiter))?) else {
return Poll::Ready(Ok(None));
};

let data = ready!(frame.poll_read_all(waiter))?;
self.index += 1;

Poll::Ready(Ok(Some(data)))
Comment on lines 369 to 376

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Coordinate next_frame with the cached in-flight frame.

Persisting self.frame here makes GroupConsumer stateful across polls, but poll_next_frame still ignores that state. A reachable sequence is: poll_read_frame* caches frame N and returns Pending, next_frame then resolves frame N again and bumps index, and when the cached read finally completes it bumps index a second time. That duplicates frame N and skips frame N + 1.

Either have poll_next_frame return self.frame.take() before resolving a fresh frame, or explicitly reject switching APIs while frame.is_some(). Please add a regression test for the Pending -> next_frame -> complete path with the fix.

Suggested direction
 pub fn poll_next_frame(&mut self, waiter: &kio::Waiter) -> Poll<Result<Option<FrameConsumer>>> {
-	let Some(frame) = ready!(self.poll(waiter, |state| state.poll_get_frame(self.index))?) else {
-		return Poll::Ready(Ok(None));
-	};
+	let frame = if let Some(frame) = self.frame.take() {
+		frame
+	} else {
+		let Some(frame) = ready!(self.poll(waiter, |state| state.poll_get_frame(self.index))?) else {
+			return Poll::Ready(Ok(None));
+		};
+		frame
+	};
 
 	self.index += 1;
 	Poll::Ready(Ok(Some(frame)))
 }

Also applies to: 392-405

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-net/src/model/group.rs` around lines 370 - 383, The issue is that
poll_read_frame caches a frame in self.frame while poll_next_frame ignores this
cached state, causing frame duplication and skipping when APIs are mixed. Fix
this by having poll_next_frame check and return self.frame.take() before
resolving a fresh frame, ensuring the same frame is not processed twice.
Additionally, add a regression test that verifies the Pending to next_frame to
complete flow works correctly and doesn't duplicate or skip frames.

}

Expand All @@ -364,14 +383,15 @@ impl GroupConsumer {

/// Read all of the chunks of the next frame, without blocking.
pub fn poll_read_frame_chunks(&mut self, waiter: &kio::Waiter) -> Poll<Result<Option<Vec<Bytes>>>> {
let Some(mut frame) = ready!(self.poll(waiter, |state| state.poll_get_frame(self.index))?) else {
let index = self.index;
let Some(data) = ready!(self.poll(waiter, |state| state.poll_frame_read_all(index, waiter))?) else {
return Poll::Ready(Ok(None));
};

let data = ready!(frame.poll_read_all_chunks(waiter))?;
self.index += 1;

Poll::Ready(Ok(Some(data)))
// In-place reads return the whole frame as one slice; keep the chunked API
// shape (empty payload -> no chunks).
Poll::Ready(Ok(Some(if data.is_empty() { Vec::new() } else { vec![data] })))
}

/// Read all of the chunks of the next frame.
Expand Down
Loading