-
Notifications
You must be signed in to change notification settings - Fork 9
Redesign streaming engine: per-request scheduling with local density projection #131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
d04f2db
Add streaming engine v2 design doc
JonoPrest 05ca081
Refine streaming design doc per review
JonoPrest a55adf6
Pin down max_buffered_bytes default in design doc
JonoPrest 64e6ce9
Keep max_batch_size as an optional per-chunk block cap
JonoPrest f306caf
Add observability & tuning section to streaming design doc
JonoPrest 75c9466
Finalize observability API in design doc
JonoPrest a23dfc4
Add streaming metrics module (RequestStats, StreamMetrics, StreamObse…
JonoPrest b2da589
Rewrite streaming engine: earliest-hole-first scheduler (v2)
JonoPrest d254ec9
Add tune_stream config-sweep example
JonoPrest e71103c
Add v2 streaming parity tests to api_test
JonoPrest dbad716
Update streaming design doc to match the v2 implementation
JonoPrest 034fd4b
Move streaming design doc into docs/
JonoPrest 9de15b9
Raise stream truncation-warning threshold to 100
JonoPrest 723edf2
Document FetchResult and Scheduler fields
JonoPrest 9e06f2d
Adaptive reorder buffer, workload presets, quieter truncation warning
JonoPrest f295027
Add live all-blocks contiguity assertions to api_test
JonoPrest 155c242
Address CodeRabbit review on PR #131
JonoPrest d188605
Honor max_buffered_bytes = 0 verbatim (CodeRabbit)
JonoPrest d472517
Repurpose string-param decode test off the deprecated mev-commit chain
JonoPrest 384e85c
ci: add live integration-tests job
JonoPrest 0140dd0
Bump hypersync-client to 1.3.0 for the streaming v2 release
JonoPrest File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| [package] | ||
| name = "tune_stream" | ||
| version = "0.1.0" | ||
| edition = "2021" | ||
| publish = false | ||
|
|
||
| [dependencies] | ||
| hypersync-client = { path = "../../hypersync-client" } | ||
|
|
||
| tokio = { version = "1", features = ["full"] } | ||
| serde_json = "1" | ||
| anyhow = "1.0.100" | ||
| env_logger = "0.11" | ||
| log = "0.4" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| { | ||
| "from_block": 18000000, | ||
| "to_block": 18100000, | ||
| "logs": [ | ||
| { | ||
| "topics": [ | ||
| ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"] | ||
| ] | ||
| } | ||
| ], | ||
| "field_selection": { | ||
| "log": ["data", "topic0", "topic1", "topic2", "topic3"] | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,256 @@ | ||
| //! `tune_stream` — a config-sweep CLI for the HyperSync streaming engine. | ||
| //! | ||
| //! Give it a query (JSON) and it runs the stream under a grid of `StreamConfig`s, | ||
| //! printing a comparison table of the aggregate metrics so you can pick the best | ||
| //! one (highest throughput, response sizes near `response_bytes_target`, low | ||
| //! truncation). Because it takes a query JSON it is usable by any client | ||
| //! regardless of language. | ||
| //! | ||
| //! Usage: | ||
| //! ENVIO_API_TOKEN=... cargo run -p tune_stream -- <query.json> [--single] | ||
| //! | ||
| //! Args / env: | ||
| //! <query.json> Path to a JSON-serialised `Query` (default: `query.json`). | ||
| //! Set a `fromBlock`/`toBlock` for a bounded benchmark. | ||
| //! --single Run one config (the default) and print a detailed report | ||
| //! instead of the sweep table. | ||
| //! CHAIN_ID=<n> Chain to query (default: 1 / eth mainnet). | ||
| //! HYPERSYNC_URL=... Override the server URL (otherwise derived from CHAIN_ID). | ||
| //! TUNE_DEBUG=1 Emit one `log::debug!` line per request (needs | ||
| //! `RUST_LOG=debug`). | ||
|
|
||
| use std::sync::Arc; | ||
|
|
||
| use hypersync_client::{ | ||
| net_types::Query, Client, RequestStats, StreamConfig, StreamMetrics, StreamObserver, | ||
| StreamSummary, SIZE_BUCKET_LABELS, | ||
| }; | ||
|
|
||
| /// Observer that aggregates into a [`StreamMetrics`] handle and, when `debug` is | ||
| /// set, logs one line per request. | ||
| struct TuneObserver { | ||
| metrics: Arc<StreamMetrics>, | ||
| debug: bool, | ||
| } | ||
|
|
||
| impl StreamObserver for TuneObserver { | ||
| fn on_request(&self, s: &RequestStats) { | ||
| if self.debug { | ||
| log::debug!( | ||
| "req {}..{} -> {} | {} bytes ({:.2}x) | {} blocks | {:?} | trunc={} | {:?}", | ||
| s.from_block, | ||
| s.requested_end, | ||
| s.next_block, | ||
| s.response_bytes, | ||
| s.size_ratio, | ||
| s.actual_blocks, | ||
| s.kind, | ||
| s.truncated, | ||
| s.duration, | ||
| ); | ||
| } | ||
| self.metrics.on_request(s); | ||
| } | ||
|
|
||
| fn on_progress(&self, in_flight: u64, buffered_bytes: u64) { | ||
| self.metrics.on_progress(in_flight, buffered_bytes); | ||
| } | ||
|
|
||
| fn on_finish(&self, summary: &StreamSummary) { | ||
| self.metrics.on_finish(summary); | ||
| } | ||
| } | ||
|
|
||
| /// One config in the sweep grid. | ||
| struct Variant { | ||
| label: String, | ||
| config: StreamConfig, | ||
| } | ||
|
|
||
| #[tokio::main] | ||
| async fn main() -> anyhow::Result<()> { | ||
| env_logger::init(); | ||
|
|
||
| let mut args = std::env::args().skip(1); | ||
| let mut query_path = String::from("query.json"); | ||
| let mut single = false; | ||
| for arg in args.by_ref() { | ||
| match arg.as_str() { | ||
| "--single" => single = true, | ||
| other => query_path = other.to_string(), | ||
| } | ||
| } | ||
|
|
||
| let query_json = std::fs::read_to_string(&query_path) | ||
| .map_err(|e| anyhow::anyhow!("failed to read query file '{query_path}': {e}"))?; | ||
| let query: Query = serde_json::from_str(&query_json) | ||
| .map_err(|e| anyhow::anyhow!("failed to parse query JSON: {e}"))?; | ||
|
|
||
| let chain_id: u64 = std::env::var("CHAIN_ID") | ||
| .ok() | ||
| .and_then(|s| s.parse().ok()) | ||
| .unwrap_or(1); | ||
| let api_token = std::env::var("ENVIO_API_TOKEN") | ||
| .map_err(|_| anyhow::anyhow!("ENVIO_API_TOKEN env var is required"))?; | ||
|
|
||
| let mut builder = Client::builder().chain_id(chain_id).api_token(api_token); | ||
| if let Ok(url) = std::env::var("HYPERSYNC_URL") { | ||
| builder = builder.url(url); | ||
| } | ||
| let client = builder.build()?; | ||
|
|
||
| let debug = std::env::var("TUNE_DEBUG").as_deref() == Ok("1"); | ||
|
|
||
| let variants = if single { | ||
| vec![Variant { | ||
| label: "default".to_string(), | ||
| config: StreamConfig::default(), | ||
| }] | ||
| } else { | ||
| build_grid() | ||
| }; | ||
|
|
||
| println!( | ||
| "Running {} config(s) against '{query_path}' on chain {chain_id}\n", | ||
| variants.len() | ||
| ); | ||
|
|
||
| let mut rows = Vec::new(); | ||
| for variant in &variants { | ||
| eprintln!("running: {} ...", variant.label); | ||
| let summary = run_config(&client, query.clone(), variant.config.clone(), debug).await?; | ||
| if single { | ||
| print_report(&variant.label, &variant.config, &summary); | ||
| } | ||
| rows.push((variant.label.clone(), summary)); | ||
| } | ||
|
|
||
| if !single { | ||
| print_table(&rows); | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// The default sweep grid: a small cross-product of the dynamic knobs. | ||
| fn build_grid() -> Vec<Variant> { | ||
| let targets = [200_000u64, 400_000, 800_000]; | ||
| let concurrencies = [5usize, 10, 20]; | ||
| let mut variants = Vec::new(); | ||
| for &target in &targets { | ||
| for &concurrency in &concurrencies { | ||
| variants.push(Variant { | ||
| label: format!("t={}k c={}", target / 1000, concurrency), | ||
| config: StreamConfig { | ||
| response_bytes_target: target, | ||
| concurrency, | ||
| ..Default::default() | ||
| }, | ||
| }); | ||
| } | ||
| } | ||
| variants | ||
| } | ||
|
|
||
| async fn run_config( | ||
| client: &Client, | ||
| query: Query, | ||
| config: StreamConfig, | ||
| debug: bool, | ||
| ) -> anyhow::Result<StreamSummary> { | ||
| let metrics = Arc::new(StreamMetrics::new()); | ||
| let observer: Arc<dyn StreamObserver> = Arc::new(TuneObserver { | ||
| metrics: metrics.clone(), | ||
| debug, | ||
| }); | ||
| let mut rx = client | ||
| .stream_arrow_with_observer(query, config, observer) | ||
| .await?; | ||
| while let Some(res) = rx.recv().await { | ||
| // Drain; we only care about the metrics, not the payload. | ||
| let _ = res?; | ||
| } | ||
| Ok(metrics.summary()) | ||
| } | ||
|
|
||
| fn print_table(rows: &[(String, StreamSummary)]) { | ||
| println!( | ||
| "{:<14} {:>6} {:>7} {:>10} {:>9} {:>9} {:>9} {:>9}", | ||
| "config", "reqs", "trunc%", "blocks/s", "MB/s", "ratio", "mblocks", "maxbuf" | ||
| ); | ||
| println!("{}", "-".repeat(80)); | ||
| for (label, s) in rows { | ||
| println!( | ||
| "{:<14} {:>6} {:>6.1}% {:>10.0} {:>9.2} {:>9.2} {:>9.0} {:>8}M", | ||
| label, | ||
| s.num_requests, | ||
| s.truncation_rate * 100.0, | ||
| s.blocks_per_sec, | ||
| s.bytes_per_sec / 1_000_000.0, | ||
| s.mean_size_ratio, | ||
| s.mean_blocks, | ||
| s.max_buffered_bytes_observed / 1_000_000, | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| fn print_report(label: &str, config: &StreamConfig, s: &StreamSummary) { | ||
| println!("===== {label} ====="); | ||
| println!( | ||
| "config: target={} concurrency={} batch_size={} max_batch_size={:?} max_buffered_bytes={:?}", | ||
| config.response_bytes_target, | ||
| config.concurrency, | ||
| config.batch_size, | ||
| config.max_batch_size, | ||
| config.max_buffered_bytes, | ||
| ); | ||
| println!("requests: {}", s.num_requests); | ||
| println!( | ||
| "truncated: {} ({:.1}%)", | ||
| s.num_truncated, | ||
| s.truncation_rate * 100.0 | ||
| ); | ||
| println!("total bytes: {}", s.total_bytes); | ||
| println!("total blocks: {}", s.total_blocks); | ||
| println!("wall clock: {:?}", s.wall_clock); | ||
| println!("blocks/s: {:.0}", s.blocks_per_sec); | ||
| println!("MB/s: {:.2}", s.bytes_per_sec / 1_000_000.0); | ||
| println!("mean size ratio: {:.3}", s.mean_size_ratio); | ||
| println!( | ||
| "blocks/req: min={} mean={:.0} max={}", | ||
| s.min_blocks, s.mean_blocks, s.max_blocks | ||
| ); | ||
| println!("mean bytes/blk: {:.1}", s.mean_bytes_per_block); | ||
| println!("mean in-flight: {:.2}", s.mean_in_flight); | ||
| println!("max buffered: {} bytes", s.max_buffered_bytes_observed); | ||
| println!("frontier/gap: {} / {}", s.num_frontier, s.num_gap_fill); | ||
| println!("size-vs-target histogram:"); | ||
| for (label, count) in SIZE_BUCKET_LABELS.iter().zip(s.size_histogram.iter()) { | ||
| println!(" {label:>9}x: {count}"); | ||
| } | ||
| println!(); | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
|
|
||
| #[test] | ||
| fn example_query_deserializes() { | ||
| let path = concat!(env!("CARGO_MANIFEST_DIR"), "/query.example.json"); | ||
| let json = std::fs::read_to_string(path).unwrap(); | ||
| let query: Query = serde_json::from_str(&json).unwrap(); | ||
| assert_eq!(query.from_block, 18_000_000); | ||
| assert_eq!(query.to_block, Some(18_100_000)); | ||
| assert_eq!(query.logs.len(), 1); | ||
| assert_eq!(query.field_selection.log.len(), 5); | ||
| } | ||
|
|
||
| #[test] | ||
| fn grid_is_non_empty_and_distinct() { | ||
| let grid = build_grid(); | ||
| assert_eq!(grid.len(), 9); | ||
| let labels: std::collections::HashSet<_> = grid.iter().map(|v| v.label.clone()).collect(); | ||
| assert_eq!(labels.len(), grid.len(), "labels are unique"); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty cool!