diff --git a/.github/workflows/build_and_verify.yml b/.github/workflows/build_and_verify.yml index c9fe17ff3..1caa7ddfe 100644 --- a/.github/workflows/build_and_verify.yml +++ b/.github/workflows/build_and_verify.yml @@ -156,7 +156,7 @@ jobs: - uses: ./.github/actions/cargo-cache - name: Hypersync Health Test - run: cargo test --features integration_tests + run: cargo test --features hypersync_health # Template tests - verify envio templates can be initialized and built template-tests: diff --git a/CLAUDE.md b/CLAUDE.md index 9cf8e1140..ae2ebe2dd 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,5 +1,6 @@ - Use `pnpm` over `npm`/`npx`. - Always use single assert to check the whole value instead of multiple asserts for every field. +- Never use `contains` in test assertions — assert the full expected value. Use `assert_eq!` or `insta::assert_snapshot!` for error messages. ## Comments diff --git a/Cargo.lock b/Cargo.lock index 87a1409fa..da0c33fce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -548,6 +548,7 @@ dependencies = [ "atoi", "base64 0.22.1", "chrono", + "comfy-table", "half", "lexical-core", "num-traits", @@ -1019,6 +1020,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "comfy-table" +version = "7.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "958c5d6ecf1f214b4c2bbbbf6ab9523a864bd136dcf71a7e8904799acfe1ad47" +dependencies = [ + "unicode-segmentation", + "unicode-width 0.2.2", +] + [[package]] name = "console" version = "0.15.11" @@ -1592,6 +1603,7 @@ dependencies = [ "alloy-primitives", "anyhow", "arrayvec", + "arrow", "async-recursion", "bollard", "clap", @@ -1610,6 +1622,7 @@ dependencies = [ "inquire", "insta", "itertools 0.11.0", + "json5", "napi", "napi-build", "napi-derive", @@ -1626,8 +1639,8 @@ dependencies = [ "serde_json", "serde_yaml", "sha2", - "strum 0.26.3", - "strum_macros 0.26.4", + "strum", + "strum_macros", "subenum", "tar", "tempdir", @@ -2378,8 +2391,8 @@ dependencies = [ "hypersync-format", "schemars", "serde", - "strum 0.27.2", - "strum_macros 0.27.2", + "strum", + "strum_macros", "xxhash-rust", ] @@ -2595,7 +2608,7 @@ dependencies = [ "newline-converter", "thiserror 1.0.69", "unicode-segmentation", - "unicode-width", + "unicode-width 0.1.14", ] [[package]] @@ -2724,6 +2737,17 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json5" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96b0db21af676c1ce64250b5f40f3ce2cf27e4e47cb91ed91eb6fe9350b430c1" +dependencies = [ + "pest", + "pest_derive", + "serde", +] + [[package]] name = "k256" version = "0.13.4" @@ -4472,32 +4496,13 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" -[[package]] -name = "strum" -version = "0.26.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" -dependencies = [ - "strum_macros 0.26.4", -] - [[package]] name = "strum" version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af23d6f6c1a224baef9d3f61e287d2761385a5b88fdab4eb4c6f11aeb54c4bcf" - -[[package]] -name = "strum_macros" -version = "0.26.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" dependencies = [ - "heck", - "proc-macro2", - "quote", - "rustversion", - "syn 2.0.117", + "strum_macros", ] [[package]] @@ -4985,6 +4990,12 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" +[[package]] +name = "unicode-width" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254" + [[package]] name = "unicode-xid" version = "0.2.6" diff --git a/packages/cli/Cargo.toml b/packages/cli/Cargo.toml index 915d147c1..c88db2c95 100644 --- a/packages/cli/Cargo.toml +++ b/packages/cli/Cargo.toml @@ -23,10 +23,11 @@ pathdiff = "0.2.3" serde = { version = "1.0.228", features = ["derive"] } serde_json = { version = "1.0.149", features = ["raw_value"] } serde_yaml = "0.9.19" +json5 = "0.4" regex = "1.12" reqwest = { version = "0.11", features = ["json"] } -strum = { version = "0.26", features = ["derive"] } -strum_macros = "0.26" +strum = { version = "0.27", features = ["derive"] } +strum_macros = "0.27" tokio = { version = "1.49", features = [ "macros", "process", @@ -45,6 +46,7 @@ colored = "2.0.4" thiserror = "1.0.50" fuel-abi-types = "0.7.0" hypersync-client = "1.2.0" +arrow = { version = "57", default-features = false, features = ["prettyprint"] } faster-hex = "0.9" ruint = "1" env_logger = "0.11" @@ -59,7 +61,12 @@ napi = { version = "=3.8.5", features = ["napi10", "async", "serde-json"] } napi-derive = "=3.5.4" [features] -integration_tests = [] +hypersync_health = [] +# Compatibility alias for the previous feature name. Older CI workflows +# (still on main until this PR merges) reference `integration_tests`; +# enabling it must keep the hypersync health check working. Drop after +# main picks up the renamed workflow step. +integration_tests = ["hypersync_health"] [build-dependencies] napi-build = "=2.3.1" diff --git a/packages/cli/CommandLineHelp.md b/packages/cli/CommandLineHelp.md index 0b56c73cc..3e23d9e56 100644 --- a/packages/cli/CommandLineHelp.md +++ b/packages/cli/CommandLineHelp.md @@ -29,6 +29,7 @@ This document contains the help content for the `envio` command-line program. * [`envio local db-migrate setup`↴](#envio-local-db-migrate-setup) * [`envio start`↴](#envio-start) * [`envio metrics`↴](#envio-metrics) +* [`envio data`↴](#envio-data) * [`envio skills`↴](#envio-skills) * [`envio skills update`↴](#envio-skills-update) * [`envio tools`↴](#envio-tools) @@ -50,6 +51,7 @@ This document contains the help content for the `envio` command-line program. * `local` — Prepare local environment for envio testing * `start` — Start the indexer. Runs codegen automatically before launching so the on-disk types stay in sync with `config.yaml` and `schema.graphql` * `metrics` — Fetch raw Prometheus metrics from the running indexer's /metrics endpoint +* `data` — Query raw blockchain data — blocks, logs, transactions on EVM chains using the same `where` syntax as indexer filters * `skills` — Manage Envio-provided Claude Code skills under `.claude/skills/` * `tools` — Tools for people and AI agents (search-docs, fetch-docs). Run `envio tools help` for details * `config` — Inspect the indexer config @@ -381,6 +383,29 @@ Fetch raw Prometheus metrics from the running indexer's /metrics endpoint +## `envio data` + +Query raw blockchain data — blocks, logs, transactions on EVM chains using the same `where` syntax as indexer filters. + +Output is TOON (token-oriented) tabular form. + +Example — earliest USDC transfers on Base: `envio data block.number log.srcAddress --chain=base --where='{ block: { number: { _gte: 0 } }, log: { srcAddress: "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913" } }'` + +Example — current archive height: `envio data knownHeight --chain=arbitrum-one` + +**Usage:** `envio data [OPTIONS] --chain [FIELD]...` + +###### **Arguments:** + +* `` — Fields to fetch (e.g. `block.number`, `log.srcAddress`, `transaction.transactionIndex`). Use `knownHeight` to get the chain's current archive height + +###### **Options:** + +* `--chain ` — Chain id (e.g. `8453`) or kebab-case name (e.g. `base`, `arbitrum-one`). Solana is not supported yet +* `--where ` — Filter rows (JSON5: unquoted keys, single quotes, trailing commas, `//` comments). Group fields under `block`, `transaction`, `log`. Match any field with a value, array, or `_eq`/`_in`; numeric fields also take `_gt`/`_gte`/`_lt`/`_lte`. Example: --where='{ block: { number: { _gte: 1000, _lte: 2000 } }, log: { srcAddress: "0xa0b8..." } }' + + + ## `envio skills` Manage Envio-provided Claude Code skills under `.claude/skills/` diff --git a/packages/cli/examples/script.rs b/packages/cli/examples/script.rs index 3f29d060e..2f2bdc6d4 100644 --- a/packages/cli/examples/script.rs +++ b/packages/cli/examples/script.rs @@ -22,13 +22,16 @@ async fn main() -> Result<()> { ) .context("Failed parsing command line arguments")?; - // Guard against accidental misuse — only `script` subcommands go through - // this path. Everything else (init/codegen/dev/start/...) runs via the - // NAPI host and would be missing the JS dispatch side if invoked here. - if !matches!(command_line_args.command, CommandType::Script(_)) { + // Guard against accidental misuse — only commands that finish entirely + // in Rust (no JS dispatch needed) go through this path. Everything else + // (init/codegen/dev/start/...) runs via the NAPI host. + if !matches!( + command_line_args.command, + CommandType::Script(_) | CommandType::Data(_) + ) { anyhow::bail!( - "This example only supports `script` subcommands. Run envio via the NAPI host \ - (packages/envio/bin.mjs) for init/codegen/dev/start/etc." + "This example only supports `script` and `data` subcommands. Run envio via the NAPI \ + host (packages/envio/bin.mjs) for init/codegen/dev/start/etc." ); } diff --git a/packages/cli/src/cli_args/clap_definitions.rs b/packages/cli/src/cli_args/clap_definitions.rs index ed033045e..fab6f21a5 100644 --- a/packages/cli/src/cli_args/clap_definitions.rs +++ b/packages/cli/src/cli_args/clap_definitions.rs @@ -61,6 +61,18 @@ pub enum CommandType { ///Fetch raw Prometheus metrics from the running indexer's /metrics endpoint Metrics, + ///Query raw blockchain data — blocks, logs, transactions on EVM chains + ///using the same `where` syntax as indexer filters. + /// + ///Output is TOON (token-oriented) tabular form. + /// + ///Example — earliest USDC transfers on Base: + ///`envio data block.number log.srcAddress --chain=base --where='{ block: { number: { _gte: 0 } }, log: { srcAddress: "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913" } }'` + /// + ///Example — current archive height: + ///`envio data knownHeight --chain=arbitrum-one` + Data(DataArgs), + ///Manage Envio-provided Claude Code skills under `.claude/skills/` #[command(subcommand)] Skills(SkillsSubcommand), @@ -130,6 +142,28 @@ pub enum JsonSchema { Svm, } +#[derive(Debug, Args)] +pub struct DataArgs { + ///Fields to fetch (e.g. `block.number`, `log.srcAddress`, + ///`transaction.transactionIndex`). Use `knownHeight` to get the + ///chain's current archive height. + #[arg(value_name = "FIELD")] + pub fields: Vec, + + ///Chain id (e.g. `8453`) or kebab-case name (e.g. `base`, + ///`arbitrum-one`). Solana is not supported yet. + #[arg(long)] + pub chain: String, + + ///Filter rows (JSON5: unquoted keys, single quotes, trailing commas, + ///`//` comments). Group fields under `block`, `transaction`, `log`. + ///Match any field with a value, array, or `_eq`/`_in`; numeric fields + ///also take `_gt`/`_gte`/`_lt`/`_lte`. Example: + ///--where='{ block: { number: { _gte: 1000, _lte: 2000 } }, log: { srcAddress: "0xa0b8..." } }' + #[arg(long = "where")] + pub where_filter: Option, +} + #[derive(Debug, Args)] pub struct DevArgs { ///Force restart: clear the database and re-index from scratch. Required when config/schema/ABI changes are incompatible with the existing indexer state. diff --git a/packages/cli/src/cli_args/snapshots/envio__cli_args__clap_definitions__test__envio_help_snapshot.snap b/packages/cli/src/cli_args/snapshots/envio__cli_args__clap_definitions__test__envio_help_snapshot.snap index 5cdbf1926..f272b2d28 100644 --- a/packages/cli/src/cli_args/snapshots/envio__cli_args__clap_definitions__test__envio_help_snapshot.snap +++ b/packages/cli/src/cli_args/snapshots/envio__cli_args__clap_definitions__test__envio_help_snapshot.snap @@ -12,6 +12,7 @@ Commands: local Prepare local environment for envio testing start Start the indexer. Runs codegen automatically before launching so the on-disk types stay in sync with `config.yaml` and `schema.graphql` metrics Fetch raw Prometheus metrics from the running indexer's /metrics endpoint + data Query raw blockchain data — blocks, logs, transactions on EVM chains using the same `where` syntax as indexer filters skills Manage Envio-provided Claude Code skills under `.claude/skills/` tools Tools for people and AI agents (search-docs, fetch-docs). Run `envio tools help` for details config Inspect the indexer config diff --git a/packages/cli/src/config_parsing/hypersync_endpoints.rs b/packages/cli/src/config_parsing/hypersync_endpoints.rs index 3be5ed75b..56d926b1e 100644 --- a/packages/cli/src/config_parsing/hypersync_endpoints.rs +++ b/packages/cli/src/config_parsing/hypersync_endpoints.rs @@ -34,11 +34,11 @@ mod test { } } -/// Integration tests that require network access. -/// Run with: cargo test --features integration_tests +/// Hypersync endpoint health checks that require network access. +/// Run with: cargo test --features hypersync_health #[cfg(test)] -#[cfg(feature = "integration_tests")] -mod integration_tests { +#[cfg(feature = "hypersync_health")] +mod hypersync_health { use super::{network_to_hypersync_url, HypersyncChain}; use crate::scripts::print_missing_networks::Diff; use strum::IntoEnumIterator; diff --git a/packages/cli/src/data/chain.rs b/packages/cli/src/data/chain.rs new file mode 100644 index 000000000..ae4b4d5ae --- /dev/null +++ b/packages/cli/src/data/chain.rs @@ -0,0 +1,93 @@ +use anyhow::{anyhow, Result}; +use std::str::FromStr; + +use crate::config_parsing::chain_helpers::Network; + +#[derive(Debug, Clone)] +pub struct Chain { + pub base_url: String, + pub display: String, +} + +pub fn resolve(input: &str) -> Result { + let normalized = input.trim().to_ascii_lowercase(); + + let unsupported_family = match normalized.as_str() { + "solana" | "svm" => Some("Solana"), + "fuel" | "fuel-testnet" => Some("Fuel"), + _ => None, + }; + if let Some(family) = unsupported_family { + return Err(anyhow!( + "`--chain={input}` is not supported yet.\n\ + {family} support is on the roadmap. For now use an EVM chain (e.g. `--chain=base`).", + )); + } + + let chain_id = if let Ok(id) = normalized.parse::() { + id + } else { + let network = Network::from_str(&normalized).map_err(|_| { + anyhow!( + "Unknown chain `{input}`. Pass a numeric chain id (e.g. `--chain=8453`) or\n\ + a kebab-case network name (e.g. `--chain=base`, `--chain=arbitrum-one`)." + ) + })?; + network.get_network_id() + }; + + Ok(Chain { + base_url: format!("https://{chain_id}.hypersync.xyz"), + display: normalized, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn resolves_evm_by_name() { + let c = resolve("base").unwrap(); + assert_eq!( + (c.base_url.as_str(), c.display.as_str()), + ("https://8453.hypersync.xyz", "base") + ); + } + + #[test] + fn resolves_evm_by_id() { + let c = resolve("42161").unwrap(); + assert_eq!( + (c.base_url.as_str(), c.display.as_str()), + ("https://42161.hypersync.xyz", "42161") + ); + } + + #[test] + fn display_normalizes_user_input() { + let c = resolve(" BASE ").unwrap(); + assert_eq!(c.display, "base"); + } + + #[test] + fn fuel_errors_not_supported() { + let err = resolve("fuel").unwrap_err().to_string(); + insta::assert_snapshot!(err, @r#"`--chain=fuel` is not supported yet. +Fuel support is on the roadmap. For now use an EVM chain (e.g. `--chain=base`)."#); + } + + #[test] + fn solana_errors_not_supported() { + let err = resolve("solana").unwrap_err().to_string(); + insta::assert_snapshot!(err, @r#"`--chain=solana` is not supported yet. +Solana support is on the roadmap. For now use an EVM chain (e.g. `--chain=base`)."#); + } + + #[test] + fn unknown_chain_errors_with_examples() { + let err = resolve("bogus-network").unwrap_err().to_string(); + insta::assert_snapshot!(err, @r#"Unknown chain `bogus-network`. Pass a numeric chain id (e.g. `--chain=8453`) or +a kebab-case network name (e.g. `--chain=base`, `--chain=arbitrum-one`)."#); + } +} diff --git a/packages/cli/src/data/client_filter.rs b/packages/cli/src/data/client_filter.rs new file mode 100644 index 000000000..22d690fb2 --- /dev/null +++ b/packages/cli/src/data/client_filter.rs @@ -0,0 +1,450 @@ +use anyhow::{anyhow, bail, Result}; +use arrow::array::{Array, AsArray, RecordBatch}; +use arrow::datatypes::DataType; +use hypersync_client::ArrowResponse; +use ruint::aliases::U256; +use serde_json::Value; + +use super::mapping::{Section, ValueKind}; +use super::where_filter::{ClientFilter, CmpOp, Cond}; + +/// Per-section keep masks, aligned with the row order produced when iterating a +/// section's record batches in order. `None` means the section has no client +/// filters and every row is kept. +#[derive(Default)] +pub struct Masks { + pub block: Option>, + pub transaction: Option>, + pub log: Option>, +} + +pub fn compute_masks(response: &ArrowResponse, filters: &[ClientFilter]) -> Result { + let mut masks = Masks::default(); + if filters.is_empty() { + return Ok(masks); + } + + for section in [Section::Block, Section::Transaction, Section::Log] { + let compiled = filters + .iter() + .filter(|f| f.field.section() == section) + .map(CompiledFilter::compile) + .collect::>>()?; + if compiled.is_empty() { + continue; + } + let batches = match section { + Section::Block => &response.data.blocks, + Section::Transaction => &response.data.transactions, + Section::Log => &response.data.logs, + }; + let mask = compute_section_mask(batches, &compiled)?; + match section { + Section::Block => masks.block = Some(mask), + Section::Transaction => masks.transaction = Some(mask), + Section::Log => masks.log = Some(mask), + } + } + + Ok(masks) +} + +struct CompiledFilter { + column: String, + kind: ValueKind, + conds: Vec, +} + +enum CompiledCond { + In(Vec), + Cmp(CmpOp, U256), +} + +impl CompiledFilter { + fn compile(f: &ClientFilter) -> Result { + let kind = f.field.spec().value_kind; + let label = format!( + "{}.{}", + f.field.section().as_indexer_str(), + f.field.camel_name(), + ); + let conds = f + .conds + .iter() + .map(|cond| match cond { + Cond::In(vals) => Ok(CompiledCond::In( + vals.iter() + .map(|v| filter_cell(v, kind, &label)) + .collect::>>()?, + )), + Cond::Cmp(op, v) => Ok(CompiledCond::Cmp(*op, filter_u256(v, &label)?)), + }) + .collect::>>()?; + Ok(Self { + column: f.field.column_name(), + kind, + conds, + }) + } +} + +fn compute_section_mask(batches: &[RecordBatch], filters: &[CompiledFilter]) -> Result> { + let mut mask = Vec::new(); + for batch in batches { + let cols: Vec<&dyn Array> = filters + .iter() + .map(|f| { + batch + .column_by_name(&f.column) + .map(|c| c.as_ref()) + .ok_or_else(|| { + anyhow!("filter column `{}` missing from query response", f.column) + }) + }) + .collect::>>()?; + for row in 0..batch.num_rows() { + let keep = filters.iter().zip(cols.iter()).all(|(f, col)| { + let cell = read_cell(*col, row, f.kind); + cell_passes(&cell, &f.conds) + }); + mask.push(keep); + } + } + Ok(mask) +} + +#[derive(Debug, Clone, PartialEq)] +enum Cell { + Num(U256), + Hex(String), + Bool(bool), + Null, +} + +fn read_cell(col: &dyn Array, row: usize, kind: ValueKind) -> Cell { + if col.is_null(row) { + return Cell::Null; + } + match col.data_type() { + DataType::UInt64 => Cell::Num(U256::from( + col.as_primitive::() + .value(row), + )), + DataType::UInt8 => Cell::Num(U256::from( + col.as_primitive::().value(row), + )), + DataType::Boolean => Cell::Bool(col.as_boolean().value(row)), + DataType::Binary => { + let bytes = col.as_binary::().value(row); + match kind { + ValueKind::Numeric => Cell::Num(U256::try_from_be_slice(bytes).unwrap_or_default()), + ValueKind::Hex | ValueKind::Bool => Cell::Hex(faster_hex::hex_string(bytes)), + } + } + dt => unreachable!("unexpected arrow data type {dt:?} for envio data column"), + } +} + +fn cell_passes(cell: &Cell, conds: &[CompiledCond]) -> bool { + conds.iter().all(|cond| match cond { + CompiledCond::In(vals) => vals.iter().any(|v| v == cell), + CompiledCond::Cmp(op, target) => match cell { + Cell::Num(n) => match op { + CmpOp::Gt => n > target, + CmpOp::Gte => n >= target, + CmpOp::Lt => n < target, + CmpOp::Lte => n <= target, + }, + _ => false, + }, + }) +} + +fn filter_cell(v: &Value, kind: ValueKind, label: &str) -> Result { + match kind { + ValueKind::Numeric => Ok(Cell::Num(filter_u256(v, label)?)), + ValueKind::Hex => match v { + Value::String(s) => Ok(Cell::Hex(normalize_hex(s, label)?)), + other => bail!( + "Filter on `{label}` expects a hex string, got {}", + json_type(other), + ), + }, + ValueKind::Bool => match v { + Value::Bool(b) => Ok(Cell::Bool(*b)), + other => bail!( + "Filter on `{label}` expects true or false, got {}", + json_type(other), + ), + }, + } +} + +fn filter_u256(v: &Value, label: &str) -> Result { + match v { + Value::Number(n) => match n.as_u64() { + Some(u) => Ok(U256::from(u)), + None => U256::from_str_radix(&n.to_string(), 10) + .map_err(|_| anyhow!("Filter on `{label}` expects an integer, got {n}")), + }, + Value::String(s) => parse_u256_str(s) + .ok_or_else(|| anyhow!("Filter on `{label}` expects an integer, got \"{s}\"")), + other => bail!( + "Filter on `{label}` expects a number, got {}", + json_type(other), + ), + } +} + +fn parse_u256_str(s: &str) -> Option { + let s = s.trim(); + match s.strip_prefix("0x").or_else(|| s.strip_prefix("0X")) { + Some(hex) => U256::from_str_radix(hex, 16).ok(), + None => U256::from_str_radix(s, 10).ok(), + } +} + +fn normalize_hex(s: &str, label: &str) -> Result { + let t = s.trim(); + let hex = t + .strip_prefix("0x") + .or_else(|| t.strip_prefix("0X")) + .unwrap_or(t); + if hex.is_empty() || !hex.bytes().all(|b| b.is_ascii_hexdigit()) { + bail!("Filter on `{label}` expects a hex string, got \"{s}\""); + } + Ok(hex.to_ascii_lowercase()) +} + +fn json_type(v: &Value) -> &'static str { + match v { + Value::Null => "null", + Value::Bool(_) => "bool", + Value::Number(_) => "number", + Value::String(_) => "string", + Value::Array(_) => "array", + Value::Object(_) => "object", + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::data::mapping::lookup; + use crate::data::where_filter::WhereFilter; + use arrow::array::{BinaryArray, RecordBatch, UInt64Array}; + use arrow::datatypes::{DataType, Field, Schema}; + use pretty_assertions::assert_eq; + use std::sync::Arc; + + fn client_filters(raw: &str) -> Vec { + WhereFilter::parse(Some(raw)).unwrap().client_filters + } + + fn log_batch_value(values: &[u64]) -> RecordBatch { + let schema = Schema::new(vec![Field::new("block_number", DataType::UInt64, false)]); + RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(UInt64Array::from(values.to_vec()))], + ) + .unwrap() + } + + #[test] + fn numeric_range_mask() { + let filters = client_filters("{ log: { blockNumber: { _gte: 10, _lt: 20 } } }"); + let batches = vec![log_batch_value(&[5, 10, 15, 19, 20, 25])]; + let mask = compute_section_mask( + &batches, + &filters + .iter() + .map(CompiledFilter::compile) + .collect::>>() + .unwrap(), + ) + .unwrap(); + assert_eq!(mask, vec![false, true, true, true, false, false]); + } + + #[test] + fn numeric_in_mask() { + let filters = client_filters("{ transaction: { value: { _in: [100, 300] } } }"); + let schema = Schema::new(vec![Field::new("value", DataType::UInt64, false)]); + let batch = RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(UInt64Array::from(vec![100u64, 200, 300, 400]))], + ) + .unwrap(); + let mask = compute_section_mask( + &[batch], + &filters + .iter() + .map(CompiledFilter::compile) + .collect::>>() + .unwrap(), + ) + .unwrap(); + assert_eq!(mask, vec![true, false, true, false]); + } + + #[test] + fn block_number_in_mask() { + let filters = client_filters("{ block: { number: { _in: [10, 12] } } }"); + let schema = Schema::new(vec![Field::new("number", DataType::UInt64, false)]); + let batch = RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(UInt64Array::from(vec![10u64, 11, 12]))], + ) + .unwrap(); + let mask = compute_section_mask( + &[batch], + &filters + .iter() + .map(CompiledFilter::compile) + .collect::>>() + .unwrap(), + ) + .unwrap(); + assert_eq!(mask, vec![true, false, true]); + } + + #[test] + fn hex_eq_mask() { + let filters = client_filters("{ log: { data: '0xABCD' } }"); + let schema = Schema::new(vec![Field::new("data", DataType::Binary, false)]); + let batch = RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(BinaryArray::from(vec![ + [0xab, 0xcd].as_slice(), + [0x00, 0x01].as_slice(), + ]))], + ) + .unwrap(); + let mask = compute_section_mask( + &[batch], + &filters + .iter() + .map(CompiledFilter::compile) + .collect::>>() + .unwrap(), + ) + .unwrap(); + assert_eq!(mask, vec![true, false]); + } + + #[test] + fn data_field_is_client_side() { + // `log.data` has no Hypersync builder, so it must route to client filters. + let f = WhereFilter::parse(Some("{ log: { data: '0xabcd' } }")).unwrap(); + let routed = ( + f.server_filters.len(), + f.client_filters.len(), + f.client_filters[0].field.section(), + ); + assert_eq!(routed, (0, 1, Section::Log)); + } + + #[test] + fn address_stays_server_side() { + let f = WhereFilter::parse(Some("{ log: { srcAddress: '0xa0b8' } }")).unwrap(); + assert_eq!((f.server_filters.len(), f.client_filters.len()), (1, 0)); + } + + #[test] + fn comparison_on_numeric_field_is_client_side() { + let f = WhereFilter::parse(Some("{ transaction: { value: { _gt: 1000 } } }")).unwrap(); + assert_eq!((f.server_filters.len(), f.client_filters.len()), (0, 1)); + } + + #[test] + fn comparison_on_hex_field_is_rejected() { + let err = WhereFilter::parse(Some("{ log: { srcAddress: { _gt: '0xa' } } }")) + .unwrap_err() + .to_string(); + insta::assert_snapshot!(err, @"Comparison operators are only supported on numeric fields; `log.srcAddress` is not numeric. Use `_eq` or `_in`."); + } + + #[test] + fn lookup_is_used_for_section() { + // sanity: the helper used by classification resolves to the Log section. + assert_eq!( + lookup(Section::Log, "data").unwrap().section(), + Section::Log, + ); + } + + // End-to-end: parse a `--where` with several client-only filters, build an + // Arrow response, then mask + render. Exercises numeric range, `_in`, and a + // big-endian binary numeric comparison across two independent sections. + #[test] + fn advanced_client_filtering_end_to_end() { + use crate::data::field_selection::Selection; + use crate::data::toon::render_arrow_response; + use hypersync_client::{ArrowResponse, ArrowResponseData}; + + let selection = Selection::parse(&[ + "log.blockNumber".into(), + "log.logIndex".into(), + "transaction.value".into(), + ]) + .unwrap(); + + let filter = WhereFilter::parse(Some( + "{ log: { blockNumber: { _gte: 10, _lt: 20 }, logIndex: { _in: [0, 2] } }, \ + transaction: { value: { _gt: 100 } } }", + )) + .unwrap(); + assert_eq!( + (filter.server_filters.len(), filter.client_filters.len()), + (0, 3), + ); + + let log_schema = Schema::new(vec![ + Field::new("block_number", DataType::UInt64, false), + Field::new("log_index", DataType::UInt64, false), + ]); + let logs = RecordBatch::try_new( + Arc::new(log_schema), + vec![ + Arc::new(UInt64Array::from(vec![5u64, 10, 15, 15, 20, 19])), + Arc::new(UInt64Array::from(vec![0u64, 0, 1, 2, 2, 0])), + ], + ) + .unwrap(); + + // `transaction.value` is a numeric field stored as big-endian bytes. + let value_bytes: Vec> = [50u64, 150, 100, 200] + .iter() + .map(|n| n.to_be_bytes().to_vec()) + .collect(); + let tx_schema = Schema::new(vec![Field::new("value", DataType::Binary, false)]); + let transactions = RecordBatch::try_new( + Arc::new(tx_schema), + vec![Arc::new(BinaryArray::from( + value_bytes.iter().map(|v| v.as_slice()).collect::>(), + ))], + ) + .unwrap(); + + let response = ArrowResponse { + archive_height: Some(1000), + next_block: 1000, + total_execution_time: 0, + data: ArrowResponseData { + logs: vec![logs], + transactions: vec![transactions], + ..Default::default() + }, + rollback_guard: None, + }; + + let masks = compute_masks(&response, &filter.client_filters).unwrap(); + let out = render_arrow_response(&selection, &response, &masks); + + assert_eq!( + out, + "logs[3]{blockNumber,logIndex}:\n 10,0\n 15,2\n 19,0\n\ + transactions[2]{value}:\n 150\n 200\n", + ); + } +} diff --git a/packages/cli/src/data/field_selection.rs b/packages/cli/src/data/field_selection.rs new file mode 100644 index 000000000..179de06d9 --- /dev/null +++ b/packages/cli/src/data/field_selection.rs @@ -0,0 +1,189 @@ +use anyhow::{anyhow, Result}; + +use hypersync_client::net_types::FieldSelection as NetFieldSelection; + +use super::mapping::{self, Section, TypedField}; + +#[derive(Debug, Clone, Default)] +pub struct Selection { + pub columns: Vec, + pub known_height: bool, +} + +#[derive(Debug, Clone)] +pub struct Column { + pub section: Section, + pub field: TypedField, + /// The field name exactly as the user typed it, used verbatim in the TOON + /// header so the output mirrors the request (e.g. `block.NUMBER` → `NUMBER`). + pub display_name: String, +} + +impl Selection { + pub fn parse(positionals: &[String]) -> Result { + if positionals.is_empty() { + return Err(anyhow!( + "No fields requested. Pass at least one field like `block.number` or `knownHeight`." + )); + } + + let mut sel = Selection::default(); + for raw in positionals { + if raw == "knownHeight" { + sel.known_height = true; + continue; + } + + let (section_raw, field_raw) = raw.split_once('.').ok_or_else(|| { + anyhow!( + "Bad field `{raw}`. Use `
.` (e.g. `block.number`) or `knownHeight`.\n\ + Valid sections: {sections}.", + sections = mapping::ALLOWED_SECTIONS.join(", "), + ) + })?; + + let section = mapping::parse_section(section_raw).ok_or_else(|| { + anyhow!( + "Unknown section `{section_raw}` in `{raw}`. Valid sections for this chain: {sections}.", + sections = mapping::ALLOWED_SECTIONS.join(", "), + ) + })?; + + let field = mapping::lookup(section, field_raw).ok_or_else(|| { + let valid = mapping::valid_indexer_names(section).join(", "); + anyhow!("Unknown field `{raw}`. Valid `{section_raw}.*` fields: {valid}.") + })?; + + sel.columns.push(Column { + section, + field, + display_name: field_raw.to_string(), + }); + } + + Ok(sel) + } + + pub fn has_data_fields(&self) -> bool { + !self.columns.is_empty() + } + + /// Builds the Hypersync field selection, also requesting `extra` fields so + /// client-side filter fields are fetched even when not selected for output. + pub fn build_net_field_selection_with(&self, extra: &[TypedField]) -> NetFieldSelection { + let mut fs = NetFieldSelection::default(); + for col in &self.columns { + insert_field(&mut fs, col.field); + } + for field in extra { + insert_field(&mut fs, *field); + } + fs + } +} + +fn insert_field(fs: &mut NetFieldSelection, field: TypedField) { + match field { + TypedField::Block(f) => { + fs.block.insert(f); + } + TypedField::Transaction(f) => { + fs.transaction.insert(f); + } + TypedField::Log(f) => { + fs.log.insert(f); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn parses_fields_and_known_height() { + let sel = Selection::parse(&[ + "block.number".into(), + "block.hash".into(), + "log.srcAddress".into(), + "transaction.transactionIndex".into(), + "knownHeight".into(), + ]) + .unwrap(); + + let cols: Vec<(Section, String)> = sel + .columns + .iter() + .map(|c| (c.section, c.field.camel_name())) + .collect(); + + assert_eq!( + (cols, sel.known_height), + ( + vec![ + (Section::Block, "number".to_string()), + (Section::Block, "hash".to_string()), + (Section::Log, "srcAddress".to_string()), + (Section::Transaction, "transactionIndex".to_string()), + ], + true, + ), + ); + } + + #[test] + fn known_height_only_has_no_data_fields() { + let sel = Selection::parse(&["knownHeight".into()]).unwrap(); + assert_eq!((sel.has_data_fields(), sel.known_height), (false, true)); + } + + #[test] + fn rejects_missing_dot() { + let err = Selection::parse(&["blocknumber".into()]) + .unwrap_err() + .to_string(); + insta::assert_snapshot!(err, @r#"Bad field `blocknumber`. Use `
.` (e.g. `block.number`) or `knownHeight`. +Valid sections: block, transaction, log."#); + } + + #[test] + fn rejects_unknown_section() { + let err = Selection::parse(&["receipt.txId".into()]) + .unwrap_err() + .to_string(); + insta::assert_snapshot!(err, @"Unknown section `receipt` in `receipt.txId`. Valid sections for this chain: block, transaction, log."); + } + + #[test] + fn rejects_unknown_field() { + let err = Selection::parse(&["log.foo".into()]) + .unwrap_err() + .to_string(); + insta::assert_snapshot!(err, @"Unknown field `log.foo`. Valid `log.*` fields: transactionHash, blockHash, blockNumber, transactionIndex, logIndex, srcAddress, data, removed, topic0, topic1, topic2, topic3."); + } + + #[test] + fn accepts_snake_case_fields() { + let sel = Selection::parse(&[ + "block.gas_limit".into(), + "log.src_address".into(), + "transaction.transaction_index".into(), + ]) + .unwrap(); + let names: Vec = sel.columns.iter().map(|c| c.field.camel_name()).collect(); + assert_eq!(names, vec!["gasLimit", "srcAddress", "transactionIndex"]); + } + + #[test] + fn accepts_all_lowercase() { + let sel = Selection::parse(&["block.gaslimit".into(), "log.blocknumber".into()]).unwrap(); + assert_eq!(sel.columns.len(), 2); + } + + #[test] + fn accepts_uppercase() { + let sel = Selection::parse(&["block.GAS_LIMIT".into(), "log.TOPIC0".into()]).unwrap(); + assert_eq!(sel.columns.len(), 2); + } +} diff --git a/packages/cli/src/data/mapping.rs b/packages/cli/src/data/mapping.rs new file mode 100644 index 000000000..e705b27bc --- /dev/null +++ b/packages/cli/src/data/mapping.rs @@ -0,0 +1,407 @@ +use hypersync_client::net_types::{ + block::BlockField, log::LogField, transaction::TransactionField, +}; +use strum::IntoEnumIterator; + +/// How a field's value is parsed, compared, and rendered. Numeric fields are +/// the only ones that support `_gt`/`_gte`/`_lt`/`_lte` comparisons. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ValueKind { + Numeric, + Hex, + Bool, +} + +/// The Hypersync builder a field maps to when pushed server-side. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ServerFilter { + LogAddress, + LogTopic0, + LogTopic1, + LogTopic2, + LogTopic3, + TxFrom, + TxTo, + TxSighash, + TxHash, + TxContractAddress, + TxStatus, + TxType, + BlockHash, + BlockMiner, +} + +/// Everything the `--where` pipeline needs to know about a field: how to match +/// its name, how its values behave client-side, and whether it can be pushed +/// server-side. +pub struct FieldSpec { + pub aliases: &'static [&'static str], + pub value_kind: ValueKind, + pub server: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum Section { + Block, + Transaction, + Log, +} + +impl Section { + pub fn as_indexer_str(self) -> &'static str { + match self { + Section::Block => "block", + Section::Transaction => "transaction", + Section::Log => "log", + } + } +} + +#[derive(Debug, Clone, Copy)] +pub enum TypedField { + Block(BlockField), + Transaction(TransactionField), + Log(LogField), +} + +impl TypedField { + pub fn column_name(self) -> String { + match self { + TypedField::Block(f) => f.to_string(), + TypedField::Transaction(f) => f.to_string(), + TypedField::Log(f) => f.to_string(), + } + } + + pub fn camel_name(self) -> String { + to_camel(&self.column_name()) + } + + pub fn section(self) -> Section { + match self { + TypedField::Block(_) => Section::Block, + TypedField::Transaction(_) => Section::Transaction, + TypedField::Log(_) => Section::Log, + } + } + + pub fn spec(self) -> FieldSpec { + FieldSpec { + aliases: self.aliases(), + value_kind: self.value_kind(), + server: self.server_filter(), + } + } + + fn aliases(self) -> &'static [&'static str] { + match self { + TypedField::Log(LogField::Address) => &["srcAddress"], + _ => &[], + } + } + + /// Which Hypersync builder this field maps to, or `None` if it can only be + /// filtered client-side. Single source of truth for server-side routing. + fn server_filter(self) -> Option { + use ServerFilter::*; + Some(match self { + TypedField::Log(LogField::Address) => LogAddress, + TypedField::Log(LogField::Topic0) => LogTopic0, + TypedField::Log(LogField::Topic1) => LogTopic1, + TypedField::Log(LogField::Topic2) => LogTopic2, + TypedField::Log(LogField::Topic3) => LogTopic3, + TypedField::Transaction(TransactionField::From) => TxFrom, + TypedField::Transaction(TransactionField::To) => TxTo, + TypedField::Transaction(TransactionField::Sighash) => TxSighash, + TypedField::Transaction(TransactionField::Hash) => TxHash, + TypedField::Transaction(TransactionField::ContractAddress) => TxContractAddress, + TypedField::Transaction(TransactionField::Status) => TxStatus, + TypedField::Transaction(TransactionField::Type) => TxType, + TypedField::Block(BlockField::Hash) => BlockHash, + TypedField::Block(BlockField::Miner) => BlockMiner, + _ => return None, + }) + } + + fn value_kind(self) -> ValueKind { + use ValueKind::{Bool, Hex, Numeric}; + match self { + TypedField::Block(f) => match f { + BlockField::Number + | BlockField::Size + | BlockField::GasLimit + | BlockField::GasUsed + | BlockField::Timestamp + | BlockField::Nonce + | BlockField::Difficulty + | BlockField::TotalDifficulty + | BlockField::BaseFeePerGas + | BlockField::BlobGasUsed + | BlockField::ExcessBlobGas + | BlockField::L1BlockNumber + | BlockField::SendCount => Numeric, + BlockField::Hash + | BlockField::ParentHash + | BlockField::Sha3Uncles + | BlockField::LogsBloom + | BlockField::TransactionsRoot + | BlockField::StateRoot + | BlockField::ReceiptsRoot + | BlockField::Miner + | BlockField::ExtraData + | BlockField::MixHash + | BlockField::Uncles + | BlockField::ParentBeaconBlockRoot + | BlockField::WithdrawalsRoot + | BlockField::Withdrawals + | BlockField::SendRoot => Hex, + }, + TypedField::Transaction(f) => match f { + TransactionField::BlockNumber + | TransactionField::Gas + | TransactionField::Nonce + | TransactionField::TransactionIndex + | TransactionField::Value + | TransactionField::CumulativeGasUsed + | TransactionField::EffectiveGasPrice + | TransactionField::GasUsed + | TransactionField::GasPrice + | TransactionField::V + | TransactionField::R + | TransactionField::S + | TransactionField::MaxPriorityFeePerGas + | TransactionField::MaxFeePerGas + | TransactionField::ChainId + | TransactionField::Type + | TransactionField::Status + | TransactionField::YParity + | TransactionField::L1Fee + | TransactionField::L1GasPrice + | TransactionField::L1GasUsed + | TransactionField::L1FeeScalar + | TransactionField::GasUsedForL1 + | TransactionField::MaxFeePerBlobGas + | TransactionField::BlobGasPrice + | TransactionField::BlobGasUsed + | TransactionField::DepositNonce + | TransactionField::DepositReceiptVersion + | TransactionField::Mint + | TransactionField::L1BaseFeeScalar + | TransactionField::L1BlobBaseFee + | TransactionField::L1BlobBaseFeeScalar + | TransactionField::L1BlockNumber => Numeric, + TransactionField::BlockHash + | TransactionField::Hash + | TransactionField::Input + | TransactionField::LogsBloom + | TransactionField::From + | TransactionField::To + | TransactionField::ContractAddress + | TransactionField::Root + | TransactionField::AccessList + | TransactionField::AuthorizationList + | TransactionField::BlobVersionedHashes + | TransactionField::Sighash + | TransactionField::SourceHash => Hex, + }, + TypedField::Log(f) => match f { + LogField::BlockNumber | LogField::TransactionIndex | LogField::LogIndex => Numeric, + LogField::Removed => Bool, + LogField::TransactionHash + | LogField::BlockHash + | LogField::Address + | LogField::Data + | LogField::Topic0 + | LogField::Topic1 + | LogField::Topic2 + | LogField::Topic3 => Hex, + }, + } + } +} + +fn normalize(s: &str) -> String { + s.chars() + .filter(|c| *c != '_') + .map(|c| c.to_ascii_lowercase()) + .collect() +} + +pub fn lookup(section: Section, user_input: &str) -> Option { + let key = normalize(user_input); + let matches = |field: TypedField| { + normalize(&field.column_name()) == key + || field.spec().aliases.iter().any(|a| normalize(a) == key) + }; + match section { + Section::Block => BlockField::iter() + .map(TypedField::Block) + .find(|f| matches(*f)), + Section::Transaction => TransactionField::iter() + .map(TypedField::Transaction) + .find(|f| matches(*f)), + Section::Log => LogField::iter().map(TypedField::Log).find(|f| matches(*f)), + } +} + +fn to_camel(snake: &str) -> String { + if snake == "address" { + return "srcAddress".to_string(); + } + let mut out = String::with_capacity(snake.len()); + let mut capitalize_next = false; + for ch in snake.chars() { + if ch == '_' { + capitalize_next = true; + } else if capitalize_next { + out.push(ch.to_ascii_uppercase()); + capitalize_next = false; + } else { + out.push(ch); + } + } + out +} + +pub fn valid_indexer_names(section: Section) -> Vec { + match section { + Section::Block => BlockField::iter().map(|f| to_camel(f.as_ref())).collect(), + Section::Transaction => TransactionField::iter() + .map(|f| to_camel(f.as_ref())) + .collect(), + Section::Log => LogField::iter().map(|f| to_camel(f.as_ref())).collect(), + } +} + +pub fn parse_section(raw: &str) -> Option
{ + match raw { + "block" => Some(Section::Block), + "transaction" => Some(Section::Transaction), + "log" => Some(Section::Log), + _ => None, + } +} + +pub const ALLOWED_SECTIONS: &[&str] = &["block", "transaction", "log"]; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn lookup_camel_case() { + assert!(matches!( + lookup(Section::Block, "gasLimit"), + Some(TypedField::Block(BlockField::GasLimit)) + )); + assert!(matches!( + lookup(Section::Transaction, "transactionIndex"), + Some(TypedField::Transaction(TransactionField::TransactionIndex)) + )); + } + + #[test] + fn lookup_snake_case() { + assert!(matches!( + lookup(Section::Block, "gas_limit"), + Some(TypedField::Block(BlockField::GasLimit)) + )); + assert!(matches!( + lookup(Section::Transaction, "transaction_index"), + Some(TypedField::Transaction(TransactionField::TransactionIndex)) + )); + } + + #[test] + fn lookup_all_lowercase() { + assert!(matches!( + lookup(Section::Block, "gaslimit"), + Some(TypedField::Block(BlockField::GasLimit)) + )); + assert!(matches!( + lookup(Section::Block, "sha3uncles"), + Some(TypedField::Block(BlockField::Sha3Uncles)) + )); + } + + #[test] + fn lookup_uppercase_mixed() { + assert!(matches!( + lookup(Section::Block, "GAS_LIMIT"), + Some(TypedField::Block(BlockField::GasLimit)) + )); + assert!(matches!( + lookup(Section::Block, "GASLIMIT"), + Some(TypedField::Block(BlockField::GasLimit)) + )); + } + + #[test] + fn lookup_src_address_alias() { + assert!(matches!( + lookup(Section::Log, "srcAddress"), + Some(TypedField::Log(LogField::Address)) + )); + assert!(matches!( + lookup(Section::Log, "src_address"), + Some(TypedField::Log(LogField::Address)) + )); + assert!(matches!( + lookup(Section::Log, "SRCADDRESS"), + Some(TypedField::Log(LogField::Address)) + )); + } + + #[test] + fn lookup_address_directly() { + assert!(matches!( + lookup(Section::Log, "address"), + Some(TypedField::Log(LogField::Address)) + )); + } + + #[test] + fn lookup_topic_fields() { + assert!(matches!( + lookup(Section::Log, "topic0"), + Some(TypedField::Log(LogField::Topic0)) + )); + assert!(matches!( + lookup(Section::Log, "TOPIC3"), + Some(TypedField::Log(LogField::Topic3)) + )); + } + + #[test] + fn lookup_unknown_returns_none() { + assert!(lookup(Section::Block, "bogus").is_none()); + assert!(lookup(Section::Log, "nonexistent").is_none()); + } + + #[test] + fn valid_names_use_camel_case() { + let names = valid_indexer_names(Section::Block); + assert!(names.contains(&"gasLimit".to_string()), "{names:?}"); + assert!(names.contains(&"baseFeePerGas".to_string()), "{names:?}"); + } + + #[test] + fn valid_names_include_src_address() { + let names = valid_indexer_names(Section::Log); + assert!(names.contains(&"srcAddress".to_string()), "{names:?}"); + } + + #[test] + fn column_name_is_snake_case() { + let f = lookup(Section::Block, "gasLimit").unwrap(); + assert_eq!(f.column_name(), "gas_limit"); + } + + #[test] + fn normalize_strips_underscores_and_lowercases() { + assert_eq!(normalize("gas_Limit"), "gaslimit"); + assert_eq!(normalize("GAS_LIMIT"), "gaslimit"); + assert_eq!(normalize("gasLimit"), "gaslimit"); + assert_eq!(normalize("topic0"), "topic0"); + } +} diff --git a/packages/cli/src/data/mod.rs b/packages/cli/src/data/mod.rs new file mode 100644 index 000000000..ca88caa8a --- /dev/null +++ b/packages/cli/src/data/mod.rs @@ -0,0 +1,6 @@ +pub(crate) mod chain; +pub(crate) mod client_filter; +pub(crate) mod field_selection; +pub(crate) mod mapping; +pub(crate) mod toon; +pub(crate) mod where_filter; diff --git a/packages/cli/src/data/toon.rs b/packages/cli/src/data/toon.rs new file mode 100644 index 000000000..76926a6eb --- /dev/null +++ b/packages/cli/src/data/toon.rs @@ -0,0 +1,239 @@ +use std::fmt::Write; + +use arrow::array::{Array, AsArray, RecordBatch}; +use arrow::datatypes::DataType; +use hypersync_client::ArrowResponse; + +use super::client_filter::Masks; +use super::field_selection::{Column, Selection}; +use super::mapping::{Section, ValueKind}; + +pub fn render_table(name: &str, columns: &[impl AsRef], rows: &[Vec]) -> String { + let mut out = String::new(); + let _ = write!(out, "{name}[{n}]{{", n = rows.len()); + for (i, c) in columns.iter().enumerate() { + if i > 0 { + out.push(','); + } + out.push_str(c.as_ref()); + } + out.push_str("}:\n"); + for row in rows { + out.push_str(" "); + for (i, cell) in row.iter().enumerate() { + if i > 0 { + out.push(','); + } + out.push_str(&escape_cell(cell)); + } + out.push('\n'); + } + out +} + +fn escape_cell(s: &str) -> String { + let needs_quoting = s.contains(',') + || s.contains('\n') + || s.contains('"') + || s.starts_with(' ') + || s.ends_with(' '); + if !needs_quoting { + return s.to_string(); + } + let escaped = s.replace('\\', "\\\\").replace('"', "\\\""); + format!("\"{escaped}\"") +} + +/// A section's rendered rows, kept separate from the final TOON string so pages +/// can be accumulated before rendering a single table per section. +pub struct SectionTable { + pub section: Section, + pub plural: &'static str, + pub col_names: Vec, + pub rows: Vec>, +} + +pub fn collect_sections( + selection: &Selection, + response: &ArrowResponse, + masks: &Masks, +) -> Vec { + let mut section_order: Vec
= Vec::new(); + for col in &selection.columns { + if !section_order.contains(&col.section) { + section_order.push(col.section); + } + } + + let mut tables = Vec::new(); + for section in §ion_order { + let cols: Vec<&Column> = selection + .columns + .iter() + .filter(|c| c.section == *section) + .collect(); + let col_names: Vec = cols.iter().map(|c| c.display_name.clone()).collect(); + let (plural, batches, mask) = match section { + Section::Block => ("blocks", &response.data.blocks, masks.block.as_deref()), + Section::Transaction => ( + "transactions", + &response.data.transactions, + masks.transaction.as_deref(), + ), + Section::Log => ("logs", &response.data.logs, masks.log.as_deref()), + }; + + let column_keys: Vec = cols.iter().map(|c| c.field.column_name()).collect(); + let column_kinds: Vec = cols.iter().map(|c| c.field.spec().value_kind).collect(); + let rows = extract_rows(batches, &column_keys, &column_kinds, mask); + tables.push(SectionTable { + section: *section, + plural, + col_names, + rows, + }); + } + tables +} + +pub fn render_sections(tables: &[SectionTable]) -> String { + let mut out = String::new(); + for table in tables { + out.push_str(&render_table(table.plural, &table.col_names, &table.rows)); + } + out +} + +#[cfg(test)] +pub fn render_arrow_response( + selection: &Selection, + response: &ArrowResponse, + masks: &Masks, +) -> String { + render_sections(&collect_sections(selection, response, masks)) +} + +fn extract_rows( + batches: &[RecordBatch], + column_keys: &[String], + column_kinds: &[ValueKind], + mask: Option<&[bool]>, +) -> Vec> { + let mut rows = Vec::new(); + let mut row_offset = 0; + for batch in batches { + let arrays: Vec> = column_keys + .iter() + .map(|key| batch.column_by_name(key).map(|c| c.as_ref())) + .collect(); + for row_idx in 0..batch.num_rows() { + let keep = mask.is_none_or(|m| m[row_offset + row_idx]); + if !keep { + continue; + } + let row: Vec = arrays + .iter() + .zip(column_kinds.iter()) + .map(|(arr, kind)| match arr { + Some(col) => cell_to_string(*col, row_idx, *kind), + None => String::new(), + }) + .collect(); + rows.push(row); + } + row_offset += batch.num_rows(); + } + rows +} + +fn cell_to_string(col: &dyn Array, row: usize, kind: ValueKind) -> String { + if col.is_null(row) { + return String::new(); + } + match col.data_type() { + DataType::UInt64 => col + .as_primitive::() + .value(row) + .to_string(), + DataType::UInt8 => col + .as_primitive::() + .value(row) + .to_string(), + DataType::Boolean => col.as_boolean().value(row).to_string(), + DataType::Binary => { + let bytes = col.as_binary::().value(row); + match kind { + ValueKind::Numeric => binary_as_decimal(bytes), + ValueKind::Hex | ValueKind::Bool => format!("0x{}", faster_hex::hex_string(bytes)), + } + } + dt => unreachable!("unexpected arrow data type {dt:?} for envio data column"), + } +} + +fn binary_as_decimal(bytes: &[u8]) -> String { + if bytes.is_empty() { + return "0".to_string(); + } + let val = ruint::aliases::U256::try_from_be_slice(bytes).unwrap_or_default(); + val.to_string() +} + +pub fn render_height(value: u64) -> String { + format!("knownHeight: {value}\n") +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn quotes_cells_with_commas() { + let s = render_table("t", &["a"], &[vec!["x,y".into()]]); + assert_eq!(s, "t[1]{a}:\n \"x,y\"\n"); + } + + #[test] + fn header_uses_field_name_as_typed() { + use super::super::client_filter::Masks; + use super::super::field_selection::Selection; + use arrow::array::UInt64Array; + use arrow::datatypes::{Field, Schema}; + use hypersync_client::{ArrowResponse, ArrowResponseData}; + use std::sync::Arc; + + let selection = Selection::parse(&["block.hash".into(), "block.NUMBER".into()]).unwrap(); + let schema = Schema::new(vec![ + Field::new("hash", DataType::Binary, false), + Field::new("number", DataType::UInt64, false), + ]); + let blocks = RecordBatch::try_new( + Arc::new(schema), + vec![ + Arc::new(arrow::array::BinaryArray::from(vec![[0xaa].as_slice()])), + Arc::new(UInt64Array::from(vec![100u64])), + ], + ) + .unwrap(); + let response = ArrowResponse { + archive_height: Some(100), + next_block: 100, + total_execution_time: 0, + data: ArrowResponseData { + blocks: vec![blocks], + ..Default::default() + }, + rollback_guard: None, + }; + + let out = render_arrow_response(&selection, &response, &Masks::default()); + assert_eq!(out, "blocks[1]{hash,NUMBER}:\n 0xaa,100\n"); + } + + #[test] + fn empty_batches_render_zero_rows() { + let s = extract_rows(&[], &["number".to_string()], &[ValueKind::Numeric], None); + assert!(s.is_empty()); + } +} diff --git a/packages/cli/src/data/where_filter.rs b/packages/cli/src/data/where_filter.rs new file mode 100644 index 000000000..0f7098071 --- /dev/null +++ b/packages/cli/src/data/where_filter.rs @@ -0,0 +1,745 @@ +use anyhow::{anyhow, bail, Context, Result}; +use serde_json::Value; + +use hypersync_client::net_types::{ + BlockFilter, FieldSelection, LogFilter, Query, TransactionFilter, +}; + +use super::mapping::{self, Section, ServerFilter, TypedField, ValueKind}; + +#[derive(Debug, Clone)] +pub struct FieldFilter { + pub field: TypedField, + pub values: Vec, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CmpOp { + Gt, + Gte, + Lt, + Lte, +} + +impl CmpOp { + pub fn as_str(self) -> &'static str { + match self { + CmpOp::Gt => "_gt", + CmpOp::Gte => "_gte", + CmpOp::Lt => "_lt", + CmpOp::Lte => "_lte", + } + } +} + +#[derive(Debug, Clone)] +pub enum Cond { + In(Vec), + Cmp(CmpOp, Value), +} + +/// A filter evaluated client-side because the field or operator can't be pushed +/// to the Hypersync query. All conditions are AND'd together. +#[derive(Debug, Clone)] +pub struct ClientFilter { + pub field: TypedField, + pub conds: Vec, +} + +#[derive(Debug, Clone, Default)] +pub struct WhereFilter { + pub from_block: Option, + pub to_block_exclusive: Option, + pub server_filters: Vec, + pub client_filters: Vec, +} + +impl WhereFilter { + pub fn parse(raw: Option<&str>) -> Result { + let Some(raw) = raw else { + return Ok(Self::default()); + }; + let trimmed = raw.trim(); + if trimmed.is_empty() { + return Ok(Self::default()); + } + + let value: Value = parse_where(trimmed)?; + let root = match value { + Value::Object(map) => map, + _ => bail!("`--where` must be an object/mapping at the top level."), + }; + + let mut out = WhereFilter::default(); + for (section_raw, body) in root { + if section_raw == "knownHeight" { + bail!("`knownHeight` is not a filter — pass it as a positional field instead."); + } + let section = mapping::parse_section(§ion_raw).ok_or_else(|| { + anyhow!( + "Unknown section `{section_raw}` in --where. Valid sections: {sections}.", + sections = mapping::ALLOWED_SECTIONS.join(", "), + ) + })?; + let fields = match body { + Value::Object(m) => m, + other => bail!( + "Expected object under `{section_raw}` in --where, got {t}.", + t = type_name(&other), + ), + }; + for (field_raw, field_body) in fields { + let typed_field = mapping::lookup(section, &field_raw).ok_or_else(|| { + let valid = mapping::valid_indexer_names(section).join(", "); + anyhow!("Unknown field `{section_raw}.{field_raw}` in --where. Valid: {valid}.") + })?; + apply_field(&mut out, section, &field_raw, typed_field, field_body)?; + } + } + + if let (Some(from), Some(to_excl)) = (out.from_block, out.to_block_exclusive) { + if to_excl <= from { + bail!("Block range is empty: from_block={from}, to_block(exclusive)={to_excl}."); + } + } + + Ok(out) + } + + pub fn has_section_filters(&self) -> bool { + !self.server_filters.is_empty() || !self.client_filters.is_empty() + } + + fn narrow_from(&mut self, n: u64) { + self.from_block = Some(self.from_block.map_or(n, |cur| cur.max(n))); + } + + fn narrow_to_excl(&mut self, n: u64) { + self.to_block_exclusive = Some(self.to_block_exclusive.map_or(n, |cur| cur.min(n))); + } + + /// Fields referenced by client-side filters. These must be fetched even when + /// not part of the user's output selection so the predicate can be evaluated. + pub fn client_filter_fields(&self) -> Vec { + self.client_filters.iter().map(|f| f.field).collect() + } + + pub fn build_net_query(&self, field_selection: FieldSelection) -> Result { + let mut query = Query::new().from_block(self.from_block.unwrap_or(0)); + if let Some(to) = self.to_block_exclusive { + query = query.to_block_excl(to); + } + + let wants_log_fields = !field_selection.log.is_empty(); + let wants_tx_fields = !field_selection.transaction.is_empty(); + let wants_block_fields = !field_selection.block.is_empty(); + + query.field_selection = field_selection; + + let mut logs = LogFilter::all(); + let mut transactions = TransactionFilter::all(); + let mut blocks = BlockFilter::all(); + let (mut has_log, mut has_tx, mut has_block) = (false, false, false); + for f in &self.server_filters { + match f.field.section() { + Section::Log => { + logs = apply_log_filter(logs, f)?; + has_log = true; + } + Section::Transaction => { + transactions = apply_tx_filter(transactions, f)?; + has_tx = true; + } + Section::Block => { + blocks = apply_block_filter(blocks, f)?; + has_block = true; + } + } + } + + // HyperSync returns only rows that match a selection, so request one for + // every entity the user wants data for. Without a filter the `all()` + // selection matches every row of that kind. + if has_log || wants_log_fields { + query = query.where_logs(logs); + } + if has_tx || wants_tx_fields { + query = query.where_transactions(transactions); + } + if has_block { + query = query.where_blocks(blocks); + } + + // Block headers are otherwise returned only for blocks joined to a + // matching log/transaction/block filter. When block fields are wanted but + // no filter scopes the blocks, ask the server for every block in the range. + if wants_block_fields && !has_log && !has_tx && !has_block { + query = query.include_all_blocks(); + } + + Ok(query) + } +} + +fn server_tag(field: TypedField) -> ServerFilter { + field + .spec() + .server + .expect("server_filters only holds server-filterable fields") +} + +fn str_refs(values: &[String]) -> Vec<&str> { + values.iter().map(String::as_str).collect() +} + +fn apply_log_filter(filter: LogFilter, f: &FieldFilter) -> Result { + let owned = filter_values_as_strs(&f.values); + let refs = str_refs(&owned); + let (filter, ctx) = match server_tag(f.field) { + ServerFilter::LogAddress => (filter.and_address(refs), "invalid address"), + ServerFilter::LogTopic0 => (filter.and_topic0(refs), "invalid topic0"), + ServerFilter::LogTopic1 => (filter.and_topic1(refs), "invalid topic1"), + ServerFilter::LogTopic2 => (filter.and_topic2(refs), "invalid topic2"), + ServerFilter::LogTopic3 => (filter.and_topic3(refs), "invalid topic3"), + _ => unreachable!("non-log server tag in log section"), + }; + filter.context(ctx) +} + +fn apply_tx_filter(filter: TransactionFilter, f: &FieldFilter) -> Result { + match server_tag(f.field) { + ServerFilter::TxStatus => { + let [value] = f.values.as_slice() else { + bail!("`transaction.status` accepts a single value server-side."); + }; + Ok(filter.and_status(value_to_u8(value)?)) + } + ServerFilter::TxType => Ok(filter.and_type(values_to_u8(&f.values)?)), + tag => { + let owned = filter_values_as_strs(&f.values); + let refs = str_refs(&owned); + let (filter, ctx) = match tag { + ServerFilter::TxFrom => (filter.and_from(refs), "invalid from address"), + ServerFilter::TxTo => (filter.and_to(refs), "invalid to address"), + ServerFilter::TxSighash => (filter.and_sighash(refs), "invalid sighash"), + ServerFilter::TxHash => (filter.and_hash(refs), "invalid transaction hash"), + ServerFilter::TxContractAddress => ( + filter.and_contract_address(refs), + "invalid contract address", + ), + _ => unreachable!("non-transaction server tag in transaction section"), + }; + filter.context(ctx) + } + } +} + +fn apply_block_filter(filter: BlockFilter, f: &FieldFilter) -> Result { + let owned = filter_values_as_strs(&f.values); + let refs = str_refs(&owned); + let (filter, ctx) = match server_tag(f.field) { + ServerFilter::BlockHash => (filter.and_hash(refs), "invalid block hash"), + ServerFilter::BlockMiner => (filter.and_miner(refs), "invalid miner address"), + _ => unreachable!("non-block server tag in block section"), + }; + filter.context(ctx) +} + +fn parse_where(raw: &str) -> Result { + json5::from_str::(raw).context( + "Failed to parse --where. Expected JSON-like object, e.g.\n\ + --where='{ block: { number: { _gte: 1000, _lte: 2000 } }, log: { srcAddress: \"0xabc\" } }'", + ) +} + +fn type_name(v: &Value) -> &'static str { + match v { + Value::Null => "null", + Value::Bool(_) => "bool", + Value::Number(_) => "number", + Value::String(_) => "string", + Value::Array(_) => "array", + Value::Object(_) => "object", + } +} + +fn filter_values_as_strs(values: &[Value]) -> Vec { + values + .iter() + .map(|v| match v { + Value::String(s) => s.clone(), + other => other.to_string(), + }) + .collect() +} + +fn apply_field( + out: &mut WhereFilter, + section: Section, + indexer_name: &str, + typed_field: TypedField, + body: Value, +) -> Result<()> { + use hypersync_client::net_types::block::BlockField; + if matches!(typed_field, TypedField::Block(BlockField::Number)) { + return apply_block_number(out, typed_field, body); + } + + let conds = parse_conditions(section, indexer_name, typed_field, body)?; + let has_cmp = conds.iter().any(|c| matches!(c, Cond::Cmp(..))); + let membership: usize = conds + .iter() + .map(|c| match c { + Cond::In(v) => v.len(), + Cond::Cmp(..) => 0, + }) + .sum(); + // `transaction.status` maps to a single `u8` server-side, so a multi-value + // set must fall back to client-side filtering. + let server = typed_field.spec().server; + let status_multi = server == Some(ServerFilter::TxStatus) && membership != 1; + + if server.is_some() && !has_cmp && !status_multi { + let values = conds + .into_iter() + .flat_map(|c| match c { + Cond::In(v) => v, + Cond::Cmp(..) => Vec::new(), + }) + .collect(); + out.server_filters.push(FieldFilter { + field: typed_field, + values, + }); + } else { + out.client_filters.push(ClientFilter { + field: typed_field, + conds, + }); + } + Ok(()) +} + +fn parse_conditions( + section: Section, + name: &str, + field: TypedField, + body: Value, +) -> Result> { + let label = || format!("{}.{name}", section.as_indexer_str()); + match body { + Value::String(_) | Value::Number(_) | Value::Bool(_) => Ok(vec![Cond::In(vec![body])]), + Value::Array(arr) => Ok(vec![Cond::In(arr)]), + Value::Object(map) => { + let mut conds = Vec::new(); + for (k, v) in map { + let cond = match k.as_str() { + "_eq" => Cond::In(vec![v]), + "_in" => { + let arr = v.as_array().ok_or_else(|| { + anyhow!("`_in` on `{}` expects an array, got {}", label(), type_name(&v)) + })?; + Cond::In(arr.clone()) + } + "_gt" | "_gte" | "_lt" | "_lte" => { + if field.spec().value_kind != ValueKind::Numeric { + bail!( + "Comparison operators are only supported on numeric fields; `{}` is not numeric. Use `_eq` or `_in`.", + label(), + ); + } + let op = match k.as_str() { + "_gt" => CmpOp::Gt, + "_gte" => CmpOp::Gte, + "_lt" => CmpOp::Lt, + _ => CmpOp::Lte, + }; + Cond::Cmp(op, v) + } + other => bail!( + "Unsupported operator `{other}` on `{}`. Use a scalar, an array, `_eq`, `_in`, `_gt`, `_gte`, `_lt`, or `_lte`.", + label(), + ), + }; + conds.push(cond); + } + Ok(conds) + } + Value::Null => bail!("`{}` cannot be null", label()), + } +} + +/// `block.number` scopes the scan window rather than mapping to a row filter, so +/// it desugars to `from_block`/`to_block`. A scalar or `_eq` pins a single block; +/// an array or `_in` scans `[min, max]` and drops the rest with a client filter. +fn apply_block_number(out: &mut WhereFilter, field: TypedField, body: Value) -> Result<()> { + let to_block = |v: &Value| { + value_to_u64(v) + .with_context(|| format!("`block.number` expects a non-negative integer, got {v}")) + }; + for cond in parse_conditions(Section::Block, "number", field, body)? { + match cond { + Cond::Cmp(op, v) => { + let n = to_block(&v)?; + match op { + CmpOp::Gte => out.narrow_from(n), + CmpOp::Gt => out.narrow_from(n.saturating_add(1)), + CmpOp::Lte => out.narrow_to_excl(n.saturating_add(1)), + CmpOp::Lt => out.narrow_to_excl(n), + } + } + Cond::In(vals) => { + let nums = vals.iter().map(to_block).collect::>>()?; + match (nums.iter().min(), nums.iter().max()) { + (Some(&min), Some(&max)) => { + out.narrow_from(min); + out.narrow_to_excl(max.saturating_add(1)); + // A single value is already an exact range; only a set + // needs the leftover blocks dropped client-side. + if nums.len() > 1 { + out.client_filters.push(ClientFilter { + field, + conds: vec![Cond::In(vals)], + }); + } + } + // Empty `_in` matches no block; the client filter drops all rows. + _ => out.client_filters.push(ClientFilter { + field, + conds: vec![Cond::In(vals)], + }), + } + } + } + } + Ok(()) +} + +fn value_to_u64(v: &Value) -> Result { + match v { + Value::Number(n) => n + .as_u64() + .ok_or_else(|| anyhow!("Expected non-negative integer, got {n}")), + Value::String(s) => s.parse::().context("Failed to parse integer"), + _ => Err(anyhow!("Expected integer, got {}", type_name(v))), + } +} + +fn value_to_u8(v: &Value) -> Result { + match v { + Value::Number(n) => n + .as_u64() + .and_then(|x| u8::try_from(x).ok()) + .ok_or_else(|| anyhow!("Expected an integer between 0 and 255, got {n}")), + Value::String(s) => { + let s = s.trim(); + match s.strip_prefix("0x").or_else(|| s.strip_prefix("0X")) { + Some(hex) => u8::from_str_radix(hex, 16), + None => s.parse::(), + } + .with_context(|| format!("Expected an integer between 0 and 255, got \"{s}\"")) + } + _ => Err(anyhow!( + "Expected an integer between 0 and 255, got {}", + type_name(v) + )), + } +} + +fn values_to_u8(values: &[Value]) -> Result> { + values.iter().map(value_to_u8).collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + fn pf(raw: &str) -> WhereFilter { + WhereFilter::parse(Some(raw)).unwrap() + } + + #[test] + fn json5_block_range_and_address() { + let f = + pf("{ block: { number: { _gte: 1000, _lte: 2000 } }, log: { srcAddress: '0xa0b8' } }"); + assert_eq!( + (f.from_block, f.to_block_exclusive), + (Some(1000), Some(2001)) + ); + assert_eq!( + ( + f.server_filters.len(), + f.server_filters[0].field.camel_name() + ), + (1, "srcAddress".to_string()) + ); + } + + #[test] + fn gt_and_lt_off_by_one() { + let f = pf("{ block: { number: { _gt: 100, _lt: 200 } } }"); + assert_eq!((f.from_block, f.to_block_exclusive), (Some(101), Some(200))); + } + + #[test] + fn block_number_eq_pins_single_block() { + let f = pf("{ block: { number: { _eq: 100 } } }"); + assert_eq!( + (f.from_block, f.to_block_exclusive, f.client_filters.len()), + (Some(100), Some(101), 0), + ); + } + + #[test] + fn block_number_scalar_shorthand_pins_single_block() { + let f = pf("{ block: { number: 100 } }"); + assert_eq!( + (f.from_block, f.to_block_exclusive, f.client_filters.len()), + (Some(100), Some(101), 0), + ); + } + + #[test] + fn block_number_in_scans_range_and_filters_rest_client_side() { + let f = pf("{ block: { number: { _in: [100, 50, 200] } } }"); + assert_eq!( + ( + f.from_block, + f.to_block_exclusive, + f.client_filters.len(), + f.client_filters[0].field.camel_name(), + ), + (Some(50), Some(201), 1, "number".to_string()), + ); + } + + #[test] + fn block_number_array_shorthand_matches_in() { + let f = pf("{ block: { number: [100, 50, 200] } }"); + assert_eq!( + (f.from_block, f.to_block_exclusive, f.client_filters.len()), + (Some(50), Some(201), 1), + ); + } + + #[test] + fn block_number_single_element_in_needs_no_client_filter() { + let f = pf("{ block: { number: { _in: [42] } } }"); + assert_eq!( + (f.from_block, f.to_block_exclusive, f.client_filters.len()), + (Some(42), Some(43), 0), + ); + } + + #[test] + fn block_number_in_builds_full_block_scan_over_range() { + use hypersync_client::net_types::block::BlockField; + let mut fs = FieldSelection::default(); + fs.block.insert(BlockField::Number); + let q = pf("{ block: { number: { _in: [10, 12] } } }") + .build_net_query(fs) + .unwrap(); + assert_eq!( + (q.from_block, q.to_block, q.include_all_blocks), + (10, Some(13), true), + ); + } + + #[test] + fn block_number_unsupported_operator_errors() { + let err = WhereFilter::parse(Some("{ block: { number: { _like: 5 } } }")) + .unwrap_err() + .to_string(); + insta::assert_snapshot!(err, @"Unsupported operator `_like` on `block.number`. Use a scalar, an array, `_eq`, `_in`, `_gt`, `_gte`, `_lt`, or `_lte`."); + } + + #[test] + fn empty_where_defaults() { + let f = WhereFilter::parse(None).unwrap(); + let q = f.build_net_query(FieldSelection::default()).unwrap(); + assert_eq!((q.from_block, q.to_block), (0, None)); + } + + #[test] + fn block_only_selection_requests_all_blocks() { + use hypersync_client::net_types::block::BlockField; + let mut fs = FieldSelection::default(); + fs.block.insert(BlockField::Hash); + let q = WhereFilter::parse(None) + .unwrap() + .build_net_query(fs) + .unwrap(); + assert_eq!( + ( + q.include_all_blocks, + q.blocks.len(), + q.logs.len(), + q.transactions.len(), + ), + (true, 0, 0, 0), + ); + } + + #[test] + fn log_only_selection_requests_all_logs() { + use hypersync_client::net_types::log::LogField; + let mut fs = FieldSelection::default(); + fs.log.insert(LogField::Data); + let q = WhereFilter::parse(None) + .unwrap() + .build_net_query(fs) + .unwrap(); + assert_eq!( + (q.include_all_blocks, q.logs.len(), q.transactions.len()), + (false, 1, 0), + ); + } + + #[test] + fn transaction_only_selection_requests_all_transactions() { + use hypersync_client::net_types::transaction::TransactionField; + let mut fs = FieldSelection::default(); + fs.transaction.insert(TransactionField::Hash); + let q = WhereFilter::parse(None) + .unwrap() + .build_net_query(fs) + .unwrap(); + assert_eq!( + (q.include_all_blocks, q.transactions.len(), q.logs.len()), + (false, 1, 0), + ); + } + + #[test] + fn block_fields_scoped_by_log_filter_skip_all_blocks() { + use hypersync_client::net_types::block::BlockField; + let mut fs = FieldSelection::default(); + fs.block.insert(BlockField::Hash); + let q = pf("{ log: { srcAddress: '0xdAC17F958D2ee523a2206206994597C13D831ec7' } }") + .build_net_query(fs) + .unwrap(); + assert_eq!((q.include_all_blocks, q.logs.len()), (false, 1)); + } + + #[test] + fn known_height_in_where_errors() { + let err = WhereFilter::parse(Some("{ knownHeight: 100 }")) + .unwrap_err() + .to_string(); + insta::assert_snapshot!(err, @"`knownHeight` is not a filter — pass it as a positional field instead."); + } + + #[test] + fn unknown_field_errors() { + let err = WhereFilter::parse(Some("{ log: { foo: 'x' } }")) + .unwrap_err() + .to_string(); + insta::assert_snapshot!(err, @"Unknown field `log.foo` in --where. Valid: transactionHash, blockHash, blockNumber, transactionIndex, logIndex, srcAddress, data, removed, topic0, topic1, topic2, topic3."); + } + + #[test] + fn empty_range_errors() { + let err = WhereFilter::parse(Some("{ block: { number: { _gte: 100, _lt: 100 } } }")) + .unwrap_err() + .to_string(); + insta::assert_snapshot!(err, @"Block range is empty: from_block=100, to_block(exclusive)=100."); + } + + #[test] + fn malformed_input_error() { + let err = WhereFilter::parse(Some("{ block: }")) + .unwrap_err() + .to_string(); + insta::assert_snapshot!(err, @r#"Failed to parse --where. Expected JSON-like object, e.g. +--where='{ block: { number: { _gte: 1000, _lte: 2000 } }, log: { srcAddress: "0xabc" } }'"#); + } + + #[test] + fn transaction_filters() { + let f = pf("{ transaction: { from: '0xa0b8', sighash: '0xdead' } }"); + assert_eq!(f.server_filters.len(), 2); + } + + #[test] + fn hash_and_contract_address_are_server_side() { + let f = pf("{ transaction: { hash: '0xa0b8', contractAddress: '0xdead' } }"); + assert_eq!((f.server_filters.len(), f.client_filters.len()), (2, 0)); + } + + #[test] + fn numeric_field_with_membership_is_client_side() { + // `transaction.gas` has no Hypersync builder → client-side even for `_in`. + let f = pf("{ transaction: { gas: [21000, 50000] } }"); + assert_eq!((f.server_filters.len(), f.client_filters.len()), (0, 1)); + } + + #[test] + fn status_and_type_are_server_side() { + let f = pf("{ transaction: { status: 1, type: [0, 2] } }"); + let q = f.build_net_query(FieldSelection::default()).unwrap(); + assert_eq!( + ( + f.server_filters.len(), + f.client_filters.len(), + q.transactions.len(), + ), + (2, 0, 1), + ); + } + + #[test] + fn multi_value_status_falls_back_to_client() { + // `and_status` takes a single value, so a set must filter client-side. + let f = pf("{ transaction: { status: { _in: [0, 1] } } }"); + assert_eq!((f.server_filters.len(), f.client_filters.len()), (0, 1)); + } + + #[test] + fn block_hash_and_miner_are_server_side() { + let f = pf("{ block: { \ + hash: '0x1111111111111111111111111111111111111111111111111111111111111111', \ + miner: '0x2222222222222222222222222222222222222222' } }"); + let q = f.build_net_query(FieldSelection::default()).unwrap(); + assert_eq!( + ( + f.server_filters.len(), + f.client_filters.len(), + q.blocks.len(), + ), + (2, 0, 1), + ); + } + + #[test] + fn other_block_field_is_client_side() { + // `block.timestamp` has no Hypersync builder → client-side. + let f = pf("{ block: { timestamp: { _gte: 1000 } } }"); + assert_eq!((f.server_filters.len(), f.client_filters.len()), (0, 1)); + } + + #[test] + fn trailing_commas_and_comments() { + let f = pf( + "{ // comment\n block: { number: { _gte: 100, } },\n log: { srcAddress: '0xa', }, }", + ); + assert_eq!((f.from_block, f.server_filters.len()), (Some(100), 1)); + } + + #[test] + fn case_insensitive_block_range() { + let f = pf("{ block: { NUMBER: { _gte: 500 } } }"); + assert_eq!(f.from_block, Some(500)); + } + + #[test] + fn case_insensitive_where_fields() { + let f = pf("{ log: { src_address: '0xa', TOPIC0: '0xb' } }"); + let names: Vec = f + .server_filters + .iter() + .map(|f| f.field.camel_name()) + .collect(); + assert_eq!(names, vec!["srcAddress", "topic0"]); + } +} diff --git a/packages/cli/src/executor/data.rs b/packages/cli/src/executor/data.rs new file mode 100644 index 000000000..072fee1ba --- /dev/null +++ b/packages/cli/src/executor/data.rs @@ -0,0 +1,324 @@ +use anyhow::{anyhow, bail, Context, Result}; +use serde_json::Value; +use std::time::Duration; + +use crate::cli_args::clap_definitions::DataArgs; +use crate::data::{ + chain::{self, Chain}, + client_filter, + field_selection::Selection, + mapping::Section, + toon, + where_filter::{ClientFilter, Cond, FieldFilter, WhereFilter}, +}; + +enum PaginationState { + RangeDone, + ReachedHead, + MorePages, +} + +/// Below this many rows a single page is too sparse to bother the user with a +/// manual next-page command, so we fetch the following pages automatically. +const AUTO_PAGINATE_MIN_ROWS: usize = 100; + +pub async fn run(args: DataArgs) -> Result<()> { + let token = resolve_api_token().ok_or_else(missing_token_error)?; + + let chain = chain::resolve(&args.chain)?; + let selection = Selection::parse(&args.fields)?; + let mut filter = WhereFilter::parse(args.where_filter.as_deref())?; + let client = build_client(&chain, &token)?; + + if selection.known_height + && !selection.has_data_fields() + && filter.from_block.is_none() + && filter.to_block_exclusive.is_none() + && !filter.has_section_filters() + { + let height = client + .get_height() + .await + .context("Failed fetching chain height")?; + print!("{}", toon::render_height(height)); + eprintln!(); + eprintln!("Chain {} is at height {}.", chain.display, height); + return Ok(()); + } + + if !selection.has_data_fields() && !selection.known_height { + bail!("No data fields requested. Pass at least one positional field."); + } + + let extra_fields = filter.client_filter_fields(); + let mut tables: Vec = Vec::new(); + let mut total_rows = 0usize; + + let (state, archive_height, next_block) = loop { + let field_selection = selection.build_net_field_selection_with(&extra_fields); + let query = filter.build_net_query(field_selection)?; + let response = client + .get_arrow(&query) + .await + .context("Failed querying blockchain data")?; + + let masks = client_filter::compute_masks(&response, &filter.client_filters)?; + for table in toon::collect_sections(&selection, &response, &masks) { + total_rows += table.rows.len(); + merge_table(&mut tables, table); + } + + let archive_height = response.archive_height.unwrap_or(0); + let next_block = response.next_block; + + let state = if matches!(filter.to_block_exclusive, Some(end) if next_block >= end) { + PaginationState::RangeDone + } else if next_block >= archive_height { + PaginationState::ReachedHead + } else { + PaginationState::MorePages + }; + + if matches!(state, PaginationState::MorePages) && total_rows < AUTO_PAGINATE_MIN_ROWS { + filter.from_block = Some(next_block); + continue; + } + break (state, archive_height, next_block); + }; + + let mut out = toon::render_sections(&tables); + if selection.known_height { + out.push_str(&toon::render_height(archive_height)); + } + print!("{out}"); + + match state { + PaginationState::RangeDone => {} + PaginationState::ReachedHead => { + eprintln!(); + eprintln!( + "Reached the chain head at block {next_block}. \ + Rerun the following command later to fetch newly available data:" + ); + print_next_command(&args, &chain, &filter, next_block); + } + PaginationState::MorePages => { + eprintln!(); + eprintln!( + "Got a response up to block {next_block}. \ + To get the next page, run the following command:" + ); + print_next_command(&args, &chain, &filter, next_block); + } + } + + Ok(()) +} + +fn merge_table(tables: &mut Vec, table: toon::SectionTable) { + match tables.iter_mut().find(|t| t.section == table.section) { + Some(existing) => existing.rows.extend(table.rows), + None => tables.push(table), + } +} + +fn print_next_command(args: &DataArgs, chain: &Chain, filter: &WhereFilter, next_block: u64) { + eprintln!(" envio data {} \\", args.fields.join(" ")); + eprintln!(" --chain={} \\", chain.display); + eprintln!( + " --where='{body}'", + body = render_where_hint(filter, next_block), + ); +} + +fn build_client(chain: &Chain, token: &str) -> Result { + let user_agent = format!( + "envio-data/{}", + crate::config_parsing::system_config::VERSION, + ); + hypersync_client::Client::new_with_agent( + hypersync_client::ClientConfig { + url: chain.base_url.clone(), + api_token: token.to_string(), + http_req_timeout_millis: Duration::from_secs(60).as_millis() as u64, + ..Default::default() + }, + user_agent, + ) + .context("Failed building blockchain data client") +} + +fn render_where_hint(filter: &WhereFilter, next_block: u64) -> String { + let mut block: Vec = Vec::new(); + let mut transaction: Vec = Vec::new(); + let mut log: Vec = Vec::new(); + + // A `block.number` set scans [min, max] and drops the rest client-side, so + // the next page only needs the values not yet covered by this response. + if let Some(set) = block_number_set(filter) { + let remaining: Vec = set + .iter() + .filter(|v| value_as_u64(v).is_none_or(|n| n >= next_block)) + .cloned() + .collect(); + block.push(format!( + "number: {{ _in: {} }}", + json_string(&Value::Array(remaining)) + )); + } else { + let mut range = format!("number: {{ _gte: {next_block}"); + if let Some(end_excl) = filter.to_block_exclusive { + let lte = end_excl.saturating_sub(1); + range.push_str(&format!(", _lte: {lte}")); + } + range.push_str(" }"); + block.push(range); + } + + for f in &filter.server_filters { + let entry = render_server_field(f); + match f.field.section() { + Section::Block => block.push(entry), + Section::Transaction => transaction.push(entry), + Section::Log => log.push(entry), + } + } + for c in &filter.client_filters { + if is_block_number(c) { + continue; + } + let entry = render_client_field(c); + match c.field.section() { + Section::Block => block.push(entry), + Section::Transaction => transaction.push(entry), + Section::Log => log.push(entry), + } + } + + let parts: Vec = [("block", block), ("transaction", transaction), ("log", log)] + .into_iter() + .filter(|(_, entries)| !entries.is_empty()) + .map(|(name, entries)| format!("{name}: {{ {} }}", entries.join(", "))) + .collect(); + format!("{{ {} }}", parts.join(", ")) +} + +fn is_block_number(c: &ClientFilter) -> bool { + c.field.section() == Section::Block && c.field.camel_name() == "number" +} + +fn block_number_set(filter: &WhereFilter) -> Option<&[Value]> { + filter + .client_filters + .iter() + .find_map(|c| match c.conds.as_slice() { + [Cond::In(vals)] if is_block_number(c) => Some(vals.as_slice()), + _ => None, + }) +} + +fn render_server_field(f: &FieldFilter) -> String { + let v = if f.values.len() == 1 { + json_string(&f.values[0]) + } else { + json_string(&Value::Array(f.values.clone())) + }; + format!("{name}: {v}", name = f.field.camel_name()) +} + +fn render_client_field(c: &ClientFilter) -> String { + let name = c.field.camel_name(); + if let [Cond::In(vals)] = c.conds.as_slice() { + let v = if vals.len() == 1 { + json_string(&vals[0]) + } else { + json_string(&Value::Array(vals.clone())) + }; + return format!("{name}: {v}"); + } + let ops: Vec = c + .conds + .iter() + .map(|cond| match cond { + Cond::In(vals) if vals.len() == 1 => format!("_eq: {}", json_string(&vals[0])), + Cond::In(vals) => format!("_in: {}", json_string(&Value::Array(vals.clone()))), + Cond::Cmp(op, v) => format!("{}: {}", op.as_str(), json_string(v)), + }) + .collect(); + format!("{name}: {{ {} }}", ops.join(", ")) +} + +fn json_string(v: &Value) -> String { + serde_json::to_string(v).unwrap_or_else(|_| "".into()) +} + +fn value_as_u64(v: &Value) -> Option { + match v { + Value::Number(n) => n.as_u64(), + Value::String(s) => s.trim().parse::().ok(), + _ => None, + } +} + +fn resolve_api_token() -> Option { + let from_env = std::env::var("ENVIO_API_TOKEN").ok(); + let from_dotenv = || { + use dotenvy::{EnvLoader, EnvSequence}; + EnvLoader::with_path(".env") + .sequence(EnvSequence::InputOnly) + .load() + .ok() + .and_then(|m| m.var("ENVIO_API_TOKEN").ok()) + }; + from_env + .or_else(from_dotenv) + .filter(|s| !s.trim().is_empty()) +} + +fn missing_token_error() -> anyhow::Error { + anyhow!( + "ENVIO_API_TOKEN is not set.\n\ + Set the ENVIO_API_TOKEN environment variable in your .env file.\n\ + Get a free API token at: https://envio.dev/app/api-tokens" + ) +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn hint_round_trips_server_and_client_filters() { + let filter = WhereFilter::parse(Some( + "{ block: { number: { _lte: 2000 } }, log: { srcAddress: '0xabc', data: '0xdead' }, transaction: { value: { _gt: 1000 } } }", + )) + .unwrap(); + let hint = render_where_hint(&filter, 1500); + assert_eq!( + hint, + "{ block: { number: { _gte: 1500, _lte: 2000 } }, transaction: { value: { _gt: 1000 } }, log: { srcAddress: \"0xabc\", data: \"0xdead\" } }", + ); + } + + #[test] + fn hint_carries_only_unfetched_block_number_set() { + let filter = + WhereFilter::parse(Some("{ block: { number: { _in: [100, 50, 200] } } }")).unwrap(); + let hint = render_where_hint(&filter, 120); + assert_eq!(hint, "{ block: { number: { _in: [200] } } }"); + } + + #[test] + fn hint_includes_block_and_status_filters() { + let filter = WhereFilter::parse(Some( + "{ block: { miner: '0xbeef' }, transaction: { status: 1, type: [0, 2] } }", + )) + .unwrap(); + let hint = render_where_hint(&filter, 100); + assert_eq!( + hint, + "{ block: { number: { _gte: 100 }, miner: \"0xbeef\" }, transaction: { status: 1, type: [0,2] } }", + ); + } +} diff --git a/packages/cli/src/executor/mod.rs b/packages/cli/src/executor/mod.rs index f66dddcb6..3b98dd87b 100644 --- a/packages/cli/src/executor/mod.rs +++ b/packages/cli/src/executor/mod.rs @@ -10,6 +10,7 @@ use crate::{ mod codegen; mod config; +mod data; mod dev; pub mod init; mod local; @@ -87,6 +88,11 @@ pub async fn execute( Ok(None) } + CommandType::Data(args) => { + data::run(args).await?; + Ok(None) + } + CommandType::Skills(SkillsSubcommand::Update) => { skills::run_update(&parsed_project_paths)?; Ok(None) diff --git a/packages/cli/src/lib.rs b/packages/cli/src/lib.rs index 00afaf4a7..1c5d8109f 100644 --- a/packages/cli/src/lib.rs +++ b/packages/cli/src/lib.rs @@ -4,6 +4,7 @@ pub use cli_args::init_config; mod commands; pub mod config_parsing; pub mod constants; +pub mod data; pub mod docker_env; mod evm; pub mod executor; diff --git a/packages/cli/src/template_dirs.rs b/packages/cli/src/template_dirs.rs index 81f410c66..d0406bc88 100644 --- a/packages/cli/src/template_dirs.rs +++ b/packages/cli/src/template_dirs.rs @@ -432,6 +432,7 @@ mod test { assert_eq!( names, vec![ + "envio-data", "envio-docs", "indexer-blocks", "indexer-configuration", diff --git a/packages/cli/templates/static/shared/.claude/skills/envio-data/SKILL.md b/packages/cli/templates/static/shared/.claude/skills/envio-data/SKILL.md new file mode 100644 index 000000000..7581f313c --- /dev/null +++ b/packages/cli/templates/static/shared/.claude/skills/envio-data/SKILL.md @@ -0,0 +1,56 @@ +--- +name: envio-data +description: >- + Query raw blockchain data — block ranges, event lookups, transactions, chain height — + via `envio data`. Use instead of curl or web searches for block discovery. +metadata: + managed-by: envio +--- + +# `envio data` + +**Do NOT web-search for block ranges.** Use `envio data` instead. + +```bash +envio data ... --chain= [--where=''] +``` + +- **Fields**: any EVM block/log/transaction field, plus `knownHeight`. + Examples: `block.number`, `log.srcAddress`, `transaction.hash`. + Case-insensitive — `gasLimit`, `gas_limit`, `GASLIMIT` all work. +- **--chain**: numeric id (`8453`) or name (`base`, `arbitrum-one`). +- **--where**: JSON5, grouping fields under `block`, `transaction`, `log`. + Any field can be filtered: + - Match with a scalar, an array, `_eq`, or `_in` (e.g. `log: { srcAddress: "0x..." }`). + - Compare numeric fields with `_gt`, `_gte`, `_lt`, `_lte` (e.g. `transaction: { value: { _gt: 1000000000000000000 } }`). + - Comparison ops are numeric-only; hex/bool fields take only `_eq`/`_in`. + +## Examples + +Find when a contract started emitting an event: + +```bash +envio data block.number log.transactionHash \ + --chain=base \ + --where='{ + block: { number: { _gte: 0 } }, + log: { srcAddress: "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913" }, + }' +``` + +Filter on any field, including numeric comparisons: + +```bash +envio data transaction.hash transaction.value \ + --chain=base \ + --where='{ + block: { number: { _gte: 1000000, _lt: 1000100 } }, + transaction: { value: { _gt: 1000000000000000000 } }, + }' +``` + +Get the current chain height: + +```bash +envio data knownHeight --chain=arbitrum-one +``` diff --git a/packages/cli/templates/static/shared/.claude/skills/indexer-testing/SKILL.md b/packages/cli/templates/static/shared/.claude/skills/indexer-testing/SKILL.md index ee738dc4b..199ac0c27 100644 --- a/packages/cli/templates/static/shared/.claude/skills/indexer-testing/SKILL.md +++ b/packages/cli/templates/static/shared/.claude/skills/indexer-testing/SKILL.md @@ -134,35 +134,6 @@ pnpm test # Run all tests pnpm test -- -u # Update snapshots ``` -## Advanced: Finding Block Ranges with HyperSync - -Auto-exit mode eliminates the need for manual block discovery in most cases. Use this when you need specific block ranges for pinned snapshots. - -**Do NOT web-search for block ranges.** Query HyperSync directly. Endpoint pattern: `https://{chainId}.hypersync.xyz` (e.g., chain 1 → `https://1.hypersync.xyz`). - -Common chain IDs: 1 (Ethereum), 8453 (Base), 42161 (Arbitrum), 10 (Optimism), 137 (Polygon), 56 (BSC), 43114 (Avalanche), 100 (Gnosis), 59144 (Linea), 534352 (Scroll), 81457 (Blast), 42220 (Celo). - -```bash -curl --request POST \ - --url https://1.hypersync.xyz/query \ - --header 'Content-Type: application/json' \ - --header "Authorization: Bearer $ENVIO_API_TOKEN" \ - --data '{ - "from_block": 0, - "logs": [ - { - "address": ["0xYOUR_CONTRACT_ADDRESS"], - "topics": [ - ["0xYOUR_EVENT_TOPIC0"] - ] - } - ], - "field_selection": { - "log": ["block_number"] - } - }' -``` - -Returns the earliest matching blocks. Use `from_block` to paginate forward. Pick a tight range (50–200 blocks) for fast, deterministic tests. - > If something is unclear, use the `envio-docs` skill to search and read the latest documentation. + +> To look up block ranges or query raw chain data for a test, use the `envio-data` skill instead of curl or web searches. diff --git a/packages/cli/tests/data_integration.rs b/packages/cli/tests/data_integration.rs new file mode 100644 index 000000000..8e4afabf5 --- /dev/null +++ b/packages/cli/tests/data_integration.rs @@ -0,0 +1,180 @@ +//! End-to-end integration tests for `envio data`. +//! +//! Invokes the CLI binary (via the `script` example) and asserts on stdout/stderr. +//! Runs as part of the normal `cargo test` flow; each test silently skips when +//! `ENVIO_API_TOKEN` is absent so the suite stays green locally and on forks +//! that don't have access to the secret. + +use std::process::Command; + +fn skip_without_token() -> bool { + let has_token = std::env::var_os("ENVIO_API_TOKEN").is_some_and(|v| !v.is_empty()); + if !has_token { + eprintln!("skipping: ENVIO_API_TOKEN not set"); + } + !has_token +} + +struct Output { + stdout: String, + stderr: String, + ok: bool, +} + +impl std::fmt::Display for Output { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "exit: {}\n--- stdout ---\n{}\n--- stderr ---\n{}", + if self.ok { "ok" } else { "FAIL" }, + self.stdout, + self.stderr, + ) + } +} + +impl Output { + fn stderr_template(&self) -> String { + // `\b\d+\b` only matches standalone integer literals — digits embedded + // in hex strings like `0xdAC17F958...` keep their literal characters + // so the assertion can compare the address bytes exactly. + regex::Regex::new(r"\b\d+\b") + .unwrap() + .replace_all(self.stderr.trim(), "") + .into_owned() + } +} + +fn envio_data(args: &[&str]) -> Output { + let output = Command::new("cargo") + .args(["run", "--quiet", "--example", "script", "--", "data"]) + .args(args) + .output() + .expect("failed to execute envio data"); + Output { + stdout: String::from_utf8_lossy(&output.stdout).to_string(), + stderr: String::from_utf8_lossy(&output.stderr).to_string(), + ok: output.status.success(), + } +} + +#[test] +fn height_returns_a_number() { + if skip_without_token() { + return; + } + let out = envio_data(&["knownHeight", "--chain=base"]); + assert!(out.ok, "envio data failed:\n{out}"); + + let stripped = out + .stdout + .strip_prefix("knownHeight: ") + .and_then(|s| s.strip_suffix('\n')) + .unwrap_or_else(|| panic!("unexpected stdout:\n{out}")); + let height: u64 = stripped + .parse() + .unwrap_or_else(|_| panic!("height should be a number:\n{out}")); + assert!(height > 1_000_000, "height suspiciously low:\n{out}"); + + assert_eq!( + out.stderr_template(), + "Chain base is at height .", + "unexpected stderr:\n{out}", + ); +} + +/// Historical block 20000000 on Ethereum mainnet — deterministic output. +/// Queries USDT Transfer events (11 in this block). +#[test] +fn query_returns_deterministic_block_and_log_data() { + if skip_without_token() { + return; + } + let out = envio_data(&[ + "block.number", + "block.gasUsed", + "log.srcAddress", + "log.logIndex", + "--chain=1", + "--where={ block: { number: { _gte: 20000000, _lte: 20000000 } }, log: { srcAddress: \"0xdAC17F958D2ee523a2206206994597C13D831ec7\", topic0: \"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef\" } }", + ]); + assert!(out.ok, "envio data failed:\n{out}"); + + assert_eq!( + out.stdout, + "\ +blocks[1]{number,gasUsed}: + 20000000,11089692 +logs[11]{srcAddress,logIndex}: + 0xdac17f958d2ee523a2206206994597c13d831ec7,86 + 0xdac17f958d2ee523a2206206994597c13d831ec7,89 + 0xdac17f958d2ee523a2206206994597c13d831ec7,108 + 0xdac17f958d2ee523a2206206994597c13d831ec7,111 + 0xdac17f958d2ee523a2206206994597c13d831ec7,112 + 0xdac17f958d2ee523a2206206994597c13d831ec7,125 + 0xdac17f958d2ee523a2206206994597c13d831ec7,132 + 0xdac17f958d2ee523a2206206994597c13d831ec7,133 + 0xdac17f958d2ee523a2206206994597c13d831ec7,172 + 0xdac17f958d2ee523a2206206994597c13d831ec7,208 + 0xdac17f958d2ee523a2206206994597c13d831ec7,209 +", + "unexpected stdout:\n{out}", + ); + + assert_eq!(out.stderr_template(), "", "unexpected stderr:\n{out}"); +} + +/// Block-only selection with no log/transaction filter. HyperSync returns rows +/// only for matching selections, so without `include_all_blocks` the response is +/// empty — this guards the regression where `envio data block.` returned +/// nothing. +#[test] +fn block_only_selection_returns_block_data() { + if skip_without_token() { + return; + } + let out = envio_data(&[ + "block.number", + "block.gasUsed", + "--chain=1", + "--where={ block: { number: 20000000 } }", + ]); + assert!(out.ok, "envio data failed:\n{out}"); + + assert_eq!( + out.stdout, + "\ +blocks[1]{number,gasUsed}: + 20000000,11089692 +", + "unexpected stdout:\n{out}", + ); + + assert_eq!(out.stderr_template(), "", "unexpected stderr:\n{out}"); +} + +/// Deterministic large range that hypersync cannot return in one batch. +/// Verifies the executor prints the "next page" hint and echoes back the +/// original chain input plus the unchanged upper bound. +#[test] +fn paginates_when_range_exceeds_one_batch() { + if skip_without_token() { + return; + } + let out = envio_data(&[ + "block.number", + "log.srcAddress", + "--chain=1", + "--where={ block: { number: { _gte: 18000000, _lte: 19000000 } }, log: { srcAddress: \"0xdAC17F958D2ee523a2206206994597C13D831ec7\" } }", + ]); + assert!(out.ok, "envio data failed:\n{out}"); + + let expected = [ + "Got a response up to block . To get the next page, run the following command:", + " envio data block.number log.srcAddress \\", + " --chain= \\", + " --where='{ block: { number: { _gte: , _lte: } }, log: { srcAddress: \"0xdAC17F958D2ee523a2206206994597C13D831ec7\" } }'", + ] + .join("\n"); + assert_eq!(out.stderr_template(), expected, "unexpected stderr:\n{out}"); +}