Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
d04f2db
Add streaming engine v2 design doc
JonoPrest Jun 5, 2026
05ca081
Refine streaming design doc per review
JonoPrest Jun 5, 2026
a55adf6
Pin down max_buffered_bytes default in design doc
JonoPrest Jun 5, 2026
64e6ce9
Keep max_batch_size as an optional per-chunk block cap
JonoPrest Jun 5, 2026
f306caf
Add observability & tuning section to streaming design doc
JonoPrest Jun 5, 2026
75c9466
Finalize observability API in design doc
JonoPrest Jun 5, 2026
a23dfc4
Add streaming metrics module (RequestStats, StreamMetrics, StreamObse…
JonoPrest Jun 5, 2026
b2da589
Rewrite streaming engine: earliest-hole-first scheduler (v2)
JonoPrest Jun 5, 2026
d254ec9
Add tune_stream config-sweep example
JonoPrest Jun 5, 2026
e71103c
Add v2 streaming parity tests to api_test
JonoPrest Jun 5, 2026
dbad716
Update streaming design doc to match the v2 implementation
JonoPrest Jun 5, 2026
034fd4b
Move streaming design doc into docs/
JonoPrest Jun 5, 2026
9de15b9
Raise stream truncation-warning threshold to 100
JonoPrest Jun 5, 2026
723edf2
Document FetchResult and Scheduler fields
JonoPrest Jun 5, 2026
9e06f2d
Adaptive reorder buffer, workload presets, quieter truncation warning
JonoPrest Jun 8, 2026
f295027
Add live all-blocks contiguity assertions to api_test
JonoPrest Jun 8, 2026
155c242
Address CodeRabbit review on PR #131
JonoPrest Jun 8, 2026
d188605
Honor max_buffered_bytes = 0 verbatim (CodeRabbit)
JonoPrest Jun 8, 2026
d472517
Repurpose string-param decode test off the deprecated mev-commit chain
JonoPrest Jun 8, 2026
384e85c
ci: add live integration-tests job
JonoPrest Jun 8, 2026
0140dd0
Bump hypersync-client to 1.3.0 for the streaming v2 release
JonoPrest Jun 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,25 @@ jobs:
- name: Test
run: cargo test

integration_tests:
runs-on: ubuntu-latest
timeout-minutes: 20
# The live tests need the API token secret, which GitHub does not expose to
# PRs from forks — skip there. They still run on push to main and on PRs from
# branches within this repo.
if: github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository
steps:
- uses: actions/checkout@v3
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
- uses: Swatinem/rust-cache@v2
- name: Run live integration tests
env:
ENVIO_API_TOKEN: ${{ secrets.ENVIO_API_TOKEN }}
run: cargo test -p hypersync-client --test api_test -- --ignored

package:
runs-on: ubuntu-latest
steps:
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ members = [
"examples/call_watch",
"examples/call_decode_output",
"examples/height_stream",
"examples/tune_stream",
]

[workspace.dependencies]
Expand Down
537 changes: 537 additions & 0 deletions docs/STREAMING.md

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions examples/tune_stream/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "tune_stream"
version = "0.1.0"
edition = "2021"
publish = false

[dependencies]
hypersync-client = { path = "../../hypersync-client" }

tokio = { version = "1", features = ["full"] }
serde_json = "1"
anyhow = "1.0.100"
env_logger = "0.11"
log = "0.4"
14 changes: 14 additions & 0 deletions examples/tune_stream/query.example.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"from_block": 18000000,
"to_block": 18100000,
"logs": [
{
"topics": [
["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"]
]
}
],
"field_selection": {
"log": ["data", "topic0", "topic1", "topic2", "topic3"]
}
}
256 changes: 256 additions & 0 deletions examples/tune_stream/src/main.rs

@JasoonS JasoonS Jun 8, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty cool!

Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
//! `tune_stream` — a config-sweep CLI for the HyperSync streaming engine.
//!
//! Give it a query (JSON) and it runs the stream under a grid of `StreamConfig`s,
//! printing a comparison table of the aggregate metrics so you can pick the best
//! one (highest throughput, response sizes near `response_bytes_target`, low
//! truncation). Because it takes a query JSON it is usable by any client
//! regardless of language.
//!
//! Usage:
//! ENVIO_API_TOKEN=... cargo run -p tune_stream -- <query.json> [--single]
//!
//! Args / env:
//! <query.json> Path to a JSON-serialised `Query` (default: `query.json`).
//! Set a `fromBlock`/`toBlock` for a bounded benchmark.
//! --single Run one config (the default) and print a detailed report
//! instead of the sweep table.
//! CHAIN_ID=<n> Chain to query (default: 1 / eth mainnet).
//! HYPERSYNC_URL=... Override the server URL (otherwise derived from CHAIN_ID).
//! TUNE_DEBUG=1 Emit one `log::debug!` line per request (needs
//! `RUST_LOG=debug`).

use std::sync::Arc;

use hypersync_client::{
net_types::Query, Client, RequestStats, StreamConfig, StreamMetrics, StreamObserver,
StreamSummary, SIZE_BUCKET_LABELS,
};

/// Observer that aggregates into a [`StreamMetrics`] handle and, when `debug` is
/// set, logs one line per request.
struct TuneObserver {
metrics: Arc<StreamMetrics>,
debug: bool,
}

impl StreamObserver for TuneObserver {
fn on_request(&self, s: &RequestStats) {
if self.debug {
log::debug!(
"req {}..{} -> {} | {} bytes ({:.2}x) | {} blocks | {:?} | trunc={} | {:?}",
s.from_block,
s.requested_end,
s.next_block,
s.response_bytes,
s.size_ratio,
s.actual_blocks,
s.kind,
s.truncated,
s.duration,
);
}
self.metrics.on_request(s);
}

fn on_progress(&self, in_flight: u64, buffered_bytes: u64) {
self.metrics.on_progress(in_flight, buffered_bytes);
}

fn on_finish(&self, summary: &StreamSummary) {
self.metrics.on_finish(summary);
}
}

/// One config in the sweep grid.
struct Variant {
label: String,
config: StreamConfig,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();

let mut args = std::env::args().skip(1);
let mut query_path = String::from("query.json");
let mut single = false;
for arg in args.by_ref() {
match arg.as_str() {
"--single" => single = true,
other => query_path = other.to_string(),
}
}

let query_json = std::fs::read_to_string(&query_path)
.map_err(|e| anyhow::anyhow!("failed to read query file '{query_path}': {e}"))?;
let query: Query = serde_json::from_str(&query_json)
.map_err(|e| anyhow::anyhow!("failed to parse query JSON: {e}"))?;

let chain_id: u64 = std::env::var("CHAIN_ID")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(1);
let api_token = std::env::var("ENVIO_API_TOKEN")
.map_err(|_| anyhow::anyhow!("ENVIO_API_TOKEN env var is required"))?;

let mut builder = Client::builder().chain_id(chain_id).api_token(api_token);
if let Ok(url) = std::env::var("HYPERSYNC_URL") {
builder = builder.url(url);
}
let client = builder.build()?;

let debug = std::env::var("TUNE_DEBUG").as_deref() == Ok("1");

let variants = if single {
vec![Variant {
label: "default".to_string(),
config: StreamConfig::default(),
}]
} else {
build_grid()
};

println!(
"Running {} config(s) against '{query_path}' on chain {chain_id}\n",
variants.len()
);

let mut rows = Vec::new();
for variant in &variants {
eprintln!("running: {} ...", variant.label);
let summary = run_config(&client, query.clone(), variant.config.clone(), debug).await?;
if single {
print_report(&variant.label, &variant.config, &summary);
}
rows.push((variant.label.clone(), summary));
}

if !single {
print_table(&rows);
}

Ok(())
}

/// The default sweep grid: a small cross-product of the dynamic knobs.
fn build_grid() -> Vec<Variant> {
let targets = [200_000u64, 400_000, 800_000];
let concurrencies = [5usize, 10, 20];
let mut variants = Vec::new();
for &target in &targets {
for &concurrency in &concurrencies {
variants.push(Variant {
label: format!("t={}k c={}", target / 1000, concurrency),
config: StreamConfig {
response_bytes_target: target,
concurrency,
..Default::default()
},
});
}
}
variants
}

async fn run_config(
client: &Client,
query: Query,
config: StreamConfig,
debug: bool,
) -> anyhow::Result<StreamSummary> {
let metrics = Arc::new(StreamMetrics::new());
let observer: Arc<dyn StreamObserver> = Arc::new(TuneObserver {
metrics: metrics.clone(),
debug,
});
let mut rx = client
.stream_arrow_with_observer(query, config, observer)
.await?;
while let Some(res) = rx.recv().await {
// Drain; we only care about the metrics, not the payload.
let _ = res?;
}
Ok(metrics.summary())
}

fn print_table(rows: &[(String, StreamSummary)]) {
println!(
"{:<14} {:>6} {:>7} {:>10} {:>9} {:>9} {:>9} {:>9}",
"config", "reqs", "trunc%", "blocks/s", "MB/s", "ratio", "mblocks", "maxbuf"
);
println!("{}", "-".repeat(80));
for (label, s) in rows {
println!(
"{:<14} {:>6} {:>6.1}% {:>10.0} {:>9.2} {:>9.2} {:>9.0} {:>8}M",
label,
s.num_requests,
s.truncation_rate * 100.0,
s.blocks_per_sec,
s.bytes_per_sec / 1_000_000.0,
s.mean_size_ratio,
s.mean_blocks,
s.max_buffered_bytes_observed / 1_000_000,
);
}
}

fn print_report(label: &str, config: &StreamConfig, s: &StreamSummary) {
println!("===== {label} =====");
println!(
"config: target={} concurrency={} batch_size={} max_batch_size={:?} max_buffered_bytes={:?}",
config.response_bytes_target,
config.concurrency,
config.batch_size,
config.max_batch_size,
config.max_buffered_bytes,
);
println!("requests: {}", s.num_requests);
println!(
"truncated: {} ({:.1}%)",
s.num_truncated,
s.truncation_rate * 100.0
);
println!("total bytes: {}", s.total_bytes);
println!("total blocks: {}", s.total_blocks);
println!("wall clock: {:?}", s.wall_clock);
println!("blocks/s: {:.0}", s.blocks_per_sec);
println!("MB/s: {:.2}", s.bytes_per_sec / 1_000_000.0);
println!("mean size ratio: {:.3}", s.mean_size_ratio);
println!(
"blocks/req: min={} mean={:.0} max={}",
s.min_blocks, s.mean_blocks, s.max_blocks
);
println!("mean bytes/blk: {:.1}", s.mean_bytes_per_block);
println!("mean in-flight: {:.2}", s.mean_in_flight);
println!("max buffered: {} bytes", s.max_buffered_bytes_observed);
println!("frontier/gap: {} / {}", s.num_frontier, s.num_gap_fill);
println!("size-vs-target histogram:");
for (label, count) in SIZE_BUCKET_LABELS.iter().zip(s.size_histogram.iter()) {
println!(" {label:>9}x: {count}");
}
println!();
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn example_query_deserializes() {
let path = concat!(env!("CARGO_MANIFEST_DIR"), "/query.example.json");
let json = std::fs::read_to_string(path).unwrap();
let query: Query = serde_json::from_str(&json).unwrap();
assert_eq!(query.from_block, 18_000_000);
assert_eq!(query.to_block, Some(18_100_000));
assert_eq!(query.logs.len(), 1);
assert_eq!(query.field_selection.log.len(), 5);
}

#[test]
fn grid_is_non_empty_and_distinct() {
let grid = build_grid();
assert_eq!(grid.len(), 9);
let labels: std::collections::HashSet<_> = grid.iter().map(|v| v.label.clone()).collect();
assert_eq!(labels.len(), grid.len(), "labels are unique");
}
}
2 changes: 1 addition & 1 deletion hypersync-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hypersync-client"
version = "1.2.0"
version = "1.3.0"
edition = "2021"
description = "client library for hypersync"
license = "MPL-2.0"
Expand Down
Loading
Loading