Skip to content

Redesign streaming engine: per-request scheduling with local density projection#131

Merged
JonoPrest merged 21 commits into
mainfrom
claude/eloquent-edison-WwqYH
Jun 8, 2026
Merged

Redesign streaming engine: per-request scheduling with local density projection#131
JonoPrest merged 21 commits into
mainfrom
claude/eloquent-edison-WwqYH

Conversation

@JonoPrest

@JonoPrest JonoPrest commented Jun 5, 2026

Copy link
Copy Markdown
Collaborator

Summary

Completely redesign the stream_arrow streaming engine from a fixed-range, paginate-to-end model (v1) to a dynamic, earliest-hole-first scheduler (v2). The new engine makes one HTTP request the unit of work, always schedules the lowest-start un-fetched range (prioritizing truncation gaps over frontier extension), and projects each request's block span locally from the nearest completed neighbor's byte-density. Delivery remains gated on contiguity, so consumers see one ArrowResponse per HTTP response in block order — a drop-in replacement.

Key Changes

  • New streaming engine core (hypersync-client/src/stream.rs):

    • Removed BlockRangeIterator, run_query_to_end, and the shared atomic step / generation counter.
    • Introduced Scheduler struct (single-task, lock-free) that owns all state: holes (un-fetched ranges), completed (awaiting delivery), in_flight (active requests).
    • Implemented Fetcher trait for injectable fetch logic; ClientFetcher wraps get_arrow_with_size + map_responses.
    • Earliest-hole-first scheduling: always picks the lowest-start hole, naturally prioritizing gaps before frontier extension.
    • Per-request block projection: project_blocks() uses the nearest completed neighbor's byte-density (or fallback to batch_size on first wave) to aim at response_bytes_target.
    • Contiguity-gated delivery: deliver_ready() forwards chunks only when they abut the watermark, maintaining block-order guarantees.
    • Consumer backpressure: respects max_buffered_bytes to avoid unbounded reorder-buffer growth; the watermark hole is always exempt to prevent deadlock.
    • Truncation handling: residual gaps from truncated responses are automatically re-inserted and backfilled.
    • Open-ended support: tracks live archive height and extends the frontier hole when the chain advances mid-stream.
  • Metrics and observability (hypersync-client/src/metrics.rs — new file):

    • StreamObserver trait for pluggable metrics collection (cheap, non-blocking).
    • RequestStats: per-request telemetry (block ranges, response size, density, truncation, kind, latency).
    • StreamMetrics: thread-safe aggregate handle with atomics and a fixed histogram; zero overhead when not attached.
    • StreamSummary: final aggregate report (request counts, truncation rate, throughput, size distribution, buffer high-water, in-flight mean).
    • RequestKind enum: distinguishes frontier-extension from gap-fill requests.
  • Config updates (hypersync-client/src/config.rs):

    • batch_size: now a deliberately-overestimated initial value (not dynamically adjusted).
    • max_batch_size: changed from required u64 to optional Option<u64>; None (default) means no cap, allowing self-correcting overshoot.
    • min_batch_size: new field, hard lower clamp on projected block count.
    • response_bytes_target: new field, the density-projection target (replaces the old floor/ceiling pair).
    • max_buffered_bytes: new optional field, defaults to 2 * concurrency * response_bytes_target.
  • Public API (hypersync-client/src/lib.rs):

    • Added stream_arrow_with_observer() to attach an observer.
    • Exported metrics types: RequestKind, RequestStats, StreamMetrics, StreamObserver, StreamSummary, NUM_SIZE_BUCKETS, SIZE_BUCKET_LABELS.
  • Tuning CLI (examples/tune_stream/ — new):

    • tune_stream binary: sweeps a grid of StreamConfig variants and prints a comparison table of aggregate metrics.
    • Helps users pick optimal config (highest throughput, response sizes near target, low truncation).
    • Supports

https://claude.ai/code/session_01XCpb7XdYYgzmwDSU53wsdJ

Summary by CodeRabbit

  • New Features

    • Streaming engine v2 with ordered delivery, gap/backfill handling, head-following, and a new observer-enabled streaming API
    • Opt-in observability: per-request and aggregate streaming metrics and a thread-safe metrics observer
  • Configuration

    • Stream config semantics and defaults updated; new response-size target, optional caps, and workload presets
  • Examples

    • Added a CLI tuning example and sample query for stream benchmarking
  • Tests

    • Added unit and integration tests for streaming behaviors, presets, and metrics
  • Documentation

    • Updated streaming docs for v2 design, config changes, and observability

JonoPrest added 11 commits June 5, 2026 12:54
Document the planned redesign of the stream_arrow engine before implementation:

- earliest-hole-first scheduler with one HTTP request per work unit
- single-target local response-size projection (replaces the floor/ceiling band)
- concurrency 0 (error) and 1 (sequential) special cases
- truncation-below-half-target warning
- reverse-mode mirroring
- breaking StreamConfig change: response_bytes_floor + response_bytes_ceiling
  collapse into response_bytes_target
- node/python rollout (config edit + version bump); Go deferred

Co-authored-by: claude <noreply@anthropic.com>

https://claude.ai/code/session_01ShDBcWzTkzV4J15S8vNwKT
- Drop max_batch_size: earliest-hole backfilling makes an overestimated
  initial batch self-correcting, so no hard maximum block range is needed.
- Add max_buffered_bytes: bound the undelivered reorder buffer by bytes
  (not count) and pause look-ahead fetches under consumer backpressure,
  always exempting the watermark hole to avoid deadlock.
- Open-ended streams (no to_block) now follow the live archive_height
  reported in responses instead of a single start-time get_height snapshot,
  so a long stream finishes at the real tip instead of stopping short.

Co-authored-by: claude <noreply@anthropic.com>

https://claude.ai/code/session_01ShDBcWzTkzV4J15S8vNwKT
Make max_buffered_bytes an Option<u64> whose None default resolves at stream
start to 2 * concurrency * response_bytes_target (~8 MB at defaults), so the
look-ahead buffer scales with the worker count instead of a fixed constant.

Co-authored-by: claude <noreply@anthropic.com>

https://claude.ai/code/session_01ShDBcWzTkzV4J15S8vNwKT
Instead of removing max_batch_size, make it Option<u64>: None (default) means
no cap so an overestimated range self-corrects via backfill, while Some(n)
lets a caller bound the number of blocks per chunk. Updates the projection
rule, the config diff/table, and the node/python rollout notes accordingly.

Co-authored-by: claude <noreply@anthropic.com>

https://claude.ai/code/session_01ShDBcWzTkzV4J15S8vNwKT
Specify per-request RequestStats (response size vs target, projected vs actual
range, density, truncation, latency) and an aggregate StreamSummary (size-vs-
target histogram, truncation rate, throughput, buffer/concurrency saturation),
exposed via a binding-friendly StreamMetrics handle plus an optional Rust
StreamObserver trait. Add a tune_stream example that sweeps configs over a
query/range to find optimal defaults — usable by any client via a query JSON.

Co-authored-by: claude <noreply@anthropic.com>

https://claude.ai/code/session_01ShDBcWzTkzV4J15S8vNwKT
Switch to an explicit StreamObserver/StreamMetrics API (no serde-skip field on
StreamConfig, existing stream* signatures untouched) and require zero overhead
when no observer is attached: no RequestStats built, no timers, no histograms.

Co-authored-by: claude <noreply@anthropic.com>

https://claude.ai/code/session_01ShDBcWzTkzV4J15S8vNwKT
…rver)

Self-contained observability primitives for the streaming engine v2:
- RequestStats / RequestKind: per-request metrics.
- StreamSummary: aggregate roll-up (truncation rate, throughput,
  size-vs-target histogram, block-range stats, buffer/in-flight).
- StreamObserver trait: explicit hook for metrics, zero overhead when
  unattached.
- StreamMetrics: thread-safe (atomics + fixed histogram) ready-made
  observer the caller passes in and reads back.

Wired into lib.rs and re-exported. Unit tested.
Replaces the v1 paginate-to-end workers + shared atomic batch_size with a
single lock-free scheduler where one HTTP request is the unit of work:

- Earliest-hole-first scheduling: the next free worker always takes the
  lowest-start (reverse: highest-start) still-needed range, prioritising
  truncation gaps (backfill) over extending the frontier. Overshoot is
  self-correcting, so requests can be deliberately overestimated.
- Single-target projection: each request is sized from the nearest
  completed neighbour's byte-density to hit response_bytes_target; first
  wave / fallback uses batch_size.
- Contiguity-gated delivery via a watermark; still one ArrowResponse per
  response in block order (drop-in for consumers).
- Open-ended streams follow the live archive_height to the head.
- concurrency 0 errors, 1 streams sequentially, >=2 uses the scheduler;
  reverse mirrors forward.
- Consumer backpressure bounded by max_buffered_bytes (watermark hole
  exempt to avoid deadlock).
- Warns after 5 consecutive small-and-truncated responses.
- Per-request fetch is injectable for deterministic unit tests.

StreamConfig (breaking, minor): drop response_bytes_floor/ceiling for
response_bytes_target (400k); max_batch_size -> Option (None = no cap);
add max_buffered_bytes (None => 2*concurrency*target).

New zero-overhead Client::stream_arrow_with_observer entry point wiring
the StreamObserver/StreamMetrics handle through the engine.

Unit tests cover partition/contiguity invariants, truncation backfill,
sparse regions, backpressure, reverse, sequential, follow-to-head, the
warning, and observer aggregation.
Standalone CLI that runs a query (from JSON) under a grid of StreamConfigs
and prints a comparison table of the aggregate StreamMetrics (throughput,
truncation rate, size-vs-target, buffer/in-flight), plus a --single
detailed report and a TUNE_DEBUG per-request log mode. Takes a query JSON
so it is usable by any client language. Unit tests cover the bundled
example query and the sweep grid.
Two ignored (live-endpoint) tests for the rewritten engine:
- stream_arrow_with_observer: bounded forward stream is contiguous and
  fully covers the range; the StreamMetrics observer sees per-run
  aggregates (every block counted exactly once).
- reverse stream delivers block numbers in globally descending order.
Mark the Rust core as implemented and align the observability section
with the shipped design: metrics flow through the new zero-overhead
stream_arrow_with_observer entry point (StreamConfig untouched), with the
StreamObserver trait gaining on_progress; resolve the open question to
the handle + trait option.
@coderabbitai

coderabbitai Bot commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR redesigns the HyperSync streaming engine around a v2 scheduler, adds metrics and observability primitives, updates core StreamConfig fields and presets, and ships a tuning CLI example with docs and tests.

Changes

Streaming engine v2 and observability rollout

Layer / File(s) Summary
Config and public observability contracts
hypersync-client/src/config.rs, hypersync-client/src/metrics.rs, hypersync-client/src/lib.rs
StreamConfig field types change (max_batch_size: Option<u64>, removal of response_bytes_ceiling/response_bytes_floor, addition of response_bytes_target and max_buffered_bytes). New public metrics contracts added: RequestKind, RequestStats, StreamSummary, StreamObserver trait and re-exports. Client::stream_arrow_with_observer entry point is added.
Thread-safe metrics aggregation implementation
hypersync-client/src/metrics.rs
StreamMetrics provides atomic counters, histogram bins, in-flight/buffer tracking, monotonic elapsed-time recording, and a summary() snapshot. StreamMetrics implements StreamObserver. Unit tests validate buckets, aggregation, throughput, and request-kind accounting.
Streaming engine v2 scheduler runtime
hypersync-client/src/stream.rs, hypersync-client/tests/api_test.rs
Replaces prior iterator-based orchestration with a lock-free Scheduler that tracks holes/completed chunks, projects request spans from density anchors, handles truncation backfill, enforces backpressure via max_buffered_bytes, supports forward/reverse and sequential modes, and emits observer callbacks. Includes extensive unit and ignored integration tests covering ordering, completeness, backpressure, truncation warnings, and observer reporting.
Tune stream example crate and CLI workflow
Cargo.toml, examples/tune_stream/Cargo.toml, examples/tune_stream/query.example.json, examples/tune_stream/src/main.rs
New example crate tune_stream provides a CLI that loads a JSON Query, builds a sweep grid of StreamConfig variants (varying response_bytes_target and concurrency), runs stream_arrow_with_observer with a TuneObserver forwarding to StreamMetrics, and prints per-config reports or a comparison table. Includes tests for query deserialization and grid generation.
Streaming v2 specification and rollout notes
docs/STREAMING.md
Docs updated to describe v2 scheduler model, algorithm, contiguity-gated delivery, range projection, breaking config changes, presets, rollout guidance, and observability/metrics surfaces including the tune_stream workflow.

Sequence Diagram(s)

sequenceDiagram
  participant ClientAPI
  participant RunStream
  participant Scheduler
  participant ClientFetcher
  participant StreamObserver
  participant Receiver
  ClientAPI->>RunStream: stream_arrow_with_observer(query, config, observer)
  RunStream->>Scheduler: initialize state and mode
  Scheduler->>ClientFetcher: fetch planned block range
  ClientFetcher-->>Scheduler: FetchOutcome(next_block, archive_height, response_bytes, response)
  Scheduler->>StreamObserver: on_request / on_progress
  Scheduler->>Receiver: send contiguous ArrowResponse by watermark
  RunStream->>StreamObserver: on_finish(summary)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Redesign streaming engine: per-request scheduling with local density projection' accurately and specifically describes the main architectural changes in the changeset, matching the core redesign work in stream.rs.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Relocate STREAMING.md to docs/STREAMING.md and update the in-code
reference in stream.rs.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@hypersync-client/src/stream.rs`:
- Around line 523-524: The code uses outcome.next_block.clamp(fr.start + 1,
fr.req_end) (and similarly at the other site) which artificially forces progress
(+1) and can hide invalid non-progress responses; instead validate
outcome.next_block explicitly: require outcome.next_block > fr.start &&
outcome.next_block <= fr.req_end (or otherwise treat it as an error), set
covered = outcome.next_block (no +1/clamp), compute truncated = covered <
fr.req_end, and return or propagate an error when outcome.next_block is out of
range so non-progress responses are not silently converted into synthetic
progress (apply same change to both occurrences referencing outcome.next_block,
fr.start, fr.req_end, covered, and truncated).

In `@STREAMING.md`:
- Line 85: The fenced code block in STREAMING.md (the diagram block) is missing
a language tag and triggers markdownlint MD040; update the fence from ``` to
```text so the block is explicitly marked as plain text (i.e., change the
opening triple backticks for the diagram block to ```text).
- Around line 457-459: The docs describe fields (p50/p90/p99 response_bytes)
that don't exist on the exported StreamSummary type; update STREAMING.md to
match the public struct in hypersync-client/src/metrics.rs by removing the
percentile claims or replacing them with the actual exposed members (histogram,
mean/min/max, size_ratio) and clarify that percentiles are represented via the
histogram buckets rather than explicit p50/p90/p99 fields; reference
StreamSummary and its histogram/mean/min/max/size_ratio members when editing the
prose so the documentation matches the exported type.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 65c7bd1f-803c-4998-b551-eb1946a1c99b

📥 Commits

Reviewing files that changed from the base of the PR and between 9a13892 and dbad716.

📒 Files selected for processing (10)
  • Cargo.toml
  • STREAMING.md
  • examples/tune_stream/Cargo.toml
  • examples/tune_stream/query.example.json
  • examples/tune_stream/src/main.rs
  • hypersync-client/src/config.rs
  • hypersync-client/src/lib.rs
  • hypersync-client/src/metrics.rs
  • hypersync-client/src/stream.rs
  • hypersync-client/tests/api_test.rs

Comment thread hypersync-client/src/stream.rs Outdated
Comment thread docs/STREAMING.md Outdated
Comment thread docs/STREAMING.md Outdated
A live smoke test (all-ERC20-transfers, narrow column selection) showed
the server routinely caps such queries far below response_bytes_target on
row-count/time, so ~75% of responses are truncated-and-small on a
perfectly healthy stream (14k blocks/s). With the old threshold of 5 the
diagnostic warning fired immediately and repeatedly — pure noise.

Since the consecutive counter resets on any healthy response, a much
higher threshold only fires when the server is *persistently* capping
responses below target with no relief (the genuine execution-time/scan
limit the warning is meant to flag). Verified live: no warning at 100.

Replace the flaky end-to-end warning test with deterministic state-machine
tests of update_warning (fires once per sustained run, silent on broken
runs, ignores large truncations).

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (2)
docs/STREAMING.md (2)

85-85: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add a language tag to the fenced code block.

The diagram fence is missing a language specifier, triggering markdownlint MD040. Use ```text to explicitly mark it as plain text.

📝 Proposed fix
-```
+```text
 forward stream, blocks increasing →
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@docs/STREAMING.md` at line 85, The fenced code block in STREAMING.md that
contains the diagram line "forward stream, blocks increasing →" is missing a
language tag and triggers markdownlint MD040; update that fenced block to start
with ```text (i.e., replace the opening ``` with ```text) so the diagram is
explicitly marked as plain text and the lint rule is satisfied.

466-468: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Remove claims about percentile fields not in StreamSummary.

The docs mention p50/p90/p99 response_bytes fields, but the exported StreamSummary type in hypersync-client/src/metrics.rs exposes histogram buckets plus mean/min/max, not explicit percentile fields. Update the bullet to match the actual public API.

📝 Proposed fix
-- **size-vs-target distribution**: mean `size_ratio`, p50/p90/p99 `response_bytes`, and
-  histogram buckets relative to target (`<0.25 / 0.25–0.5 / 0.5–0.75 / 0.75–1.0 / 1.0–1.25 /
-  >1.25 ×target`)
+- **size-vs-target distribution**: mean `size_ratio` and histogram buckets relative to target
+  (`<0.25 / 0.25–0.5 / 0.5–0.75 / 0.75–1.0 / 1.0–1.25 / >1.25 ×target`)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@docs/STREAMING.md` around lines 466 - 468, The docs currently claim
p50/p90/p99 `response_bytes` exist, but the public type StreamSummary (in
hypersync-client/src/metrics.rs) exposes histogram buckets plus mean/min/max and
not explicit percentile fields; update the "size-vs-target distribution" bullet
in docs/STREAMING.md to remove the p50/p90/p99 claims and instead describe the
available metrics (mean size_ratio, min/max/mean response_bytes and the
histogram buckets relative to target), matching the fields and names of
StreamSummary.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Duplicate comments:
In `@docs/STREAMING.md`:
- Line 85: The fenced code block in STREAMING.md that contains the diagram line
"forward stream, blocks increasing →" is missing a language tag and triggers
markdownlint MD040; update that fenced block to start with ```text (i.e.,
replace the opening ``` with ```text) so the diagram is explicitly marked as
plain text and the lint rule is satisfied.
- Around line 466-468: The docs currently claim p50/p90/p99 `response_bytes`
exist, but the public type StreamSummary (in hypersync-client/src/metrics.rs)
exposes histogram buckets plus mean/min/max and not explicit percentile fields;
update the "size-vs-target distribution" bullet in docs/STREAMING.md to remove
the p50/p90/p99 claims and instead describe the available metrics (mean
size_ratio, min/max/mean response_bytes and the histogram buckets relative to
target), matching the fields and names of StreamSummary.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e9ec78c3-e4a9-405a-bcfb-2515ef289b1b

📥 Commits

Reviewing files that changed from the base of the PR and between 034fd4b and 9de15b9.

📒 Files selected for processing (2)
  • docs/STREAMING.md
  • hypersync-client/src/stream.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • hypersync-client/src/stream.rs

JonoPrest added 3 commits June 5, 2026 15:28
Add doc comments to the non-self-describing fields on FetchResult and the
Scheduler struct. No behaviour change.
Driven by live benchmarking (dense/sparse/heavy queries via the tuning
example):

1. Adaptive max_buffered_bytes default. When unset, the cap now grows to
   2*concurrency*max(target, largest_response_seen) instead of being fixed
   at 2*concurrency*target. Byte-heavy queries (full block+tx pulls) return
   responses many times larger than the target; a fixed cap was smaller than
   a single response and throttled look-ahead to ~1-2 in-flight. Measured:
   ~2x throughput on a full block+tx pull (mean in-flight ~2 -> ~8). An
   explicit cap is still honoured verbatim and never grown.

2. Workload presets: StreamConfig::dense() / sparse() / archival(), each
   documented with the workload it targets and why its knobs are set that
   way (dense = more concurrency; sparse = moderate concurrency + bigger
   first wave; archival = lean on the adaptive buffer).

3. Quieter truncation warning. It now fires at most once per stream and only
   when responses are truncated AND tiny AND cover fewer than min_batch_size
   blocks (server stalled below our smallest range). Healthy compact queries
   (truncated + under-target but covering a normal block range, streaming
   fast) no longer warn. Verified live: heavy pipelines by default, narrow
   ERC-20 query stays silent at 85% truncation / 32k blocks/s.

Unit tests cover the adaptive cap (grows on large responses, ignores
sub-target, explicit cap untouched), the presets, and the warning state
machine (fires once, silent on broken runs, silent on healthy-small).
Stream all blocks over a range (include_all_blocks, heavyweight fields to
span many responses) and assert the delivered block numbers are every
number in the range exactly once, contiguous and in order — forward
(ascending) and reverse (descending). This verifies the partition/
contiguity invariant against real chain data, not just the mock-driven
unit tests. Both verified passing against eth.hypersync.xyz.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@hypersync-client/src/config.rs`:
- Around line 180-189: The docs claim an explicit max_buffered_bytes is honored
verbatim but Scheduler::new() currently clamps with .max(1), so Some(0) is
silently converted to 1; change this by validating the config instead: in the
config-loading/validation path (where max_buffered_bytes is deserialized) detect
Some(0) and return an explicit error (or map it to a clear validation error) so
callers cannot pass zero, and remove or avoid the .max(1) clamp in
Scheduler::new() (or at least document the clamp) so the runtime behavior
matches the public contract; reference the max_buffered_bytes field and
Scheduler::new() & the existing .max(1) clamp when making the change.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 5edced9a-3658-434f-bdfc-fe7efb056f59

📥 Commits

Reviewing files that changed from the base of the PR and between 723edf2 and 9e06f2d.

📒 Files selected for processing (3)
  • docs/STREAMING.md
  • hypersync-client/src/config.rs
  • hypersync-client/src/stream.rs
✅ Files skipped from review due to trivial changes (1)
  • docs/STREAMING.md
🚧 Files skipped from review as they are similar to previous changes (1)
  • hypersync-client/src/stream.rs

Comment thread hypersync-client/src/config.rs
JonoPrest added 4 commits June 8, 2026 08:17
- stream.rs: stop fabricating progress. The defensive clamp/max forced
  next_block to start+1, which on a misbehaving server could silently drop
  (no-progress) or duplicate (over-claim) blocks and corrupt the partition
  invariant. Now validate next_block is in (start, req_end] and error the
  stream loudly otherwise, in both the scheduler and sequential paths.
- docs/STREAMING.md: add a 'text' language tag to the block-space diagram
  fence (markdownlint MD040).
- docs/STREAMING.md: correct the StreamSummary description — it exposes a
  histogram + means/min/max, not p50/p90/p99 or a median; note exact
  percentiles come from a custom StreamObserver over raw RequestStats.

Validated against the live API suite: 20/20 eth.hypersync.xyz tests pass
(incl. forward/reverse all-blocks contiguity); the lone failure is a
connection error to the mev-commit endpoint, unrelated to these changes.
The docs promised an explicit cap is honoured verbatim, but Scheduler::new
clamped the resolved value with .max(1), silently turning Some(0) into 1.
Drop the clamp: Some(0) now means 'no look-ahead buffer' — only the
watermark chunk is fetched (sequential, minimal memory), which still
completes since the watermark hole is always exempt from backpressure. The
adaptive default is unaffected (always >= 2). Documented on the field; unit
tests cover the verbatim-zero value and that a zero-buffer stream fully
covers its range without deadlock.
The mev-commit chain was removed (2025-12-08), so test_decode_string_param
_into_arrow could no longer connect. Repoint it at ENS NameRegistered
(string name, ...) on eth mainnet — same string->Arrow utf8 decode path —
and add a real assertion (decodes a non-empty utf8 'name' column) instead
of the previous dbg!. Verified live: decodes 5123 names (e.g. 'etherocean').
Run the #[ignore] live api_test suite against eth.hypersync.xyz using the
ENVIO_API_TOKEN repo secret. Skips PRs from forks (where the secret is
unavailable) and runs on push to main and same-repo PRs; 20-minute timeout.
@JonoPrest JonoPrest requested a review from JasoonS June 8, 2026 08:56
The v2 streaming engine changes StreamConfig (drops response_bytes_floor/
ceiling, makes max_batch_size optional, adds response_bytes_target and
max_buffered_bytes). Per docs/STREAMING.md §13 this source-breaking config
change ships as a deliberate minor bump. Only hypersync-client changed;
the format/net-types/schema crates are untouched.
Comment on lines +257 to +308

/// Preset for **dense** workloads: queries that match a lot of data per block
/// (busy contracts, all-logs, popular ERC-20 transfers).
///
/// Such streams are throughput-bound and scale well with parallelism, so this
/// raises `concurrency` above the default. The default `response_bytes_target`
/// (400 KB) is already a good fit — benchmarking showed dense responses
/// plateau near that size, and pushing the target higher mostly adds
/// truncation/backfill rather than bigger responses.
///
/// `max_buffered_bytes` is left unset so the adaptive default applies. If you
/// have plenty of rate-limit headroom you can push `concurrency` higher still.
pub fn dense() -> Self {
Self {
concurrency: 20,
response_bytes_target: Self::default_response_bytes_target(),
..Self::default()
}
}

/// Preset for **sparse** workloads: selective queries over wide block ranges
/// (rare events, low-volume contracts) where most blocks match nothing.
///
/// Here latency, not bytes, dominates, and benchmarking showed that *high*
/// concurrency actually hurts: extra workers just fragment a large empty
/// region into more (smaller) requests. So this keeps concurrency moderate and
/// raises `batch_size` so the first wave covers a lot of ground before
/// per-request projection kicks in — an over-estimate that self-corrects via
/// backfill if it hits a dense patch.
pub fn sparse() -> Self {
Self {
concurrency: Self::default_concurrency(),
batch_size: 20_000,
..Self::default()
}
}

/// Preset for **archival / byte-heavy** workloads: full block + transaction
/// pulls (e.g. `include_all_blocks` with wide field selection) where each
/// response is many megabytes.
///
/// These streams are bounded by the reorder buffer, not concurrency: a single
/// response can dwarf `response_bytes_target`, so the adaptive
/// `max_buffered_bytes` default (left unset here) is what keeps the pipeline
/// full — in benchmarks it roughly doubled throughput versus a buffer sized to
/// the target. Concurrency past ~10–15 gives little extra here.
pub fn archival() -> Self {
Self {
concurrency: 12,
..Self::default()
}
}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the observer, created these 3 presets for config to try get max perf out of the streaming.

@JasoonS JasoonS Jun 8, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty cool!

/// number exactly once, contiguous and strictly in order — the partition
/// invariant, verified against real chain data rather than a mock.
#[tokio::test(flavor = "multi_thread")]
#[ignore]

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious where you want to run these ignored tests.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have and added them to the ci pipeline 👍🏼 check the job called integration tests

@JonoPrest JonoPrest merged commit 164d6ab into main Jun 8, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants