diff --git a/Cargo.lock b/Cargo.lock index 5c34735afc..5efc3ebd04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9632,6 +9632,7 @@ dependencies = [ "bytes", "dashmap 6.1.0", "derive_more 1.0.0", + "futures", "solana-client", "solana-sdk", "thiserror 1.0.69", diff --git a/crates/solana-indexer/Cargo.toml b/crates/solana-indexer/Cargo.toml index b4333d83a4..a099fd0355 100644 --- a/crates/solana-indexer/Cargo.toml +++ b/crates/solana-indexer/Cargo.toml @@ -20,6 +20,7 @@ async-trait = { workspace = true } bytes = { workspace = true } dashmap = { workspace = true } derive_more = { workspace = true } +futures = { workspace = true } solana-client = { workspace = true } solana-sdk = { workspace = true } thiserror = { workspace = true } diff --git a/crates/solana-indexer/src/indexer/ingester.rs b/crates/solana-indexer/src/indexer/ingester.rs index 94e92a69ca..739991f19e 100644 --- a/crates/solana-indexer/src/indexer/ingester.rs +++ b/crates/solana-indexer/src/indexer/ingester.rs @@ -1,63 +1,387 @@ -#![expect(dead_code)] -//! The ingester owns the yellowstone gRPC stream. It drains the socket as fast -//! as yellowstone delivers, pushes tagged updates into the channel, and updates -//! `LATEST_CHAIN_SLOT` on every slot-filter message. It performs no decoding. - -// TODO: This file only declares the component skeleton. The `run` body is -// `unimplemented!`; the actual drain and reconnect with backoff logic arrives -// in a later change. +//! The ingester drains the yellowstone gRPC stream as fast as it delivers, +//! pushes tagged updates into the channel, and advances the latest-chain-slot +//! counter on every slot-filter message. It performs no decoding. +//! +//! The stream it drains is an `AutoReconnect`-backed +//! [`GeyserStream`](yellowstone_grpc_client::GeyserStream) from +//! `yellowstone-grpc-client`: reconnects, backoff, keepalive, and +//! resume-from-checkpoint are handled inside that stream and never surface +//! here. The ingester's [`Ingester::run`] loop therefore has no backoff of its +//! own; it returns when the stream ends (the wrapper gave up on an +//! unrecoverable error) or when the decoder hangs up. +//! +//! [`Ingester::serve`] is the production entrypoint — the "actual caller" — +//! that builds the subscription request, resumes from the persisted watermark, +//! opens the `GeyserStream`, and runs the drain loop. It expects the +//! [`GeyserGrpcClient`] it receives to have been built with a reconnect config +//! (via `set_reconnect_config`), otherwise the `AutoReconnect` wrapper won't +//! actually reconnect. use { - crate::{traits::store::Store, types::shared::StreamUpdate}, - std::sync::{Arc, atomic::AtomicU64}, - tokio::sync::mpsc::Sender, - yellowstone_grpc_client::GrpcConnector, + crate::{ + traits::store::Store, + types::{ + Signature, + errors::StoreError, + shared::StreamUpdate, + slot::Slot, + wire::{ + CommitmentLevel, + SubscribeRequest, + SubscribeRequestFilterAccounts, + SubscribeRequestFilterSlots, + SubscribeRequestFilterTransactions, + SubscribeUpdate, + SubscribeUpdateAccount, + SubscribeUpdateSlot, + SubscribeUpdateTransaction, + UpdateOneof, + }, + }, + }, + futures::stream::{Stream, StreamExt}, + solana_sdk::pubkey::Pubkey, + std::{ + ops::ControlFlow, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + }, + tokio::sync::mpsc::{Sender, error::TrySendError}, + yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, GeyserStream}, + yellowstone_grpc_proto::tonic::Status, }; -/// The sole writer is the ingester, on every slot-filter message. Anchors the -/// partial-event watchdog and the finalization worker. Cold start is zero; the -/// watchdog skips its comparison on the first tick. -/// -/// This is the chain tip, not indexing progress. How far the indexer has -/// actually persisted is the watermark in `solana.indexer_state`, written by -/// the decoder, which is a separate value. -pub static LATEST_CHAIN_SLOT: AtomicU64 = AtomicU64::new(0); - -/// Cap on the exponential backoff between reconnect attempts. -pub const RECONNECT_BACKOFF_CAP: std::time::Duration = std::time::Duration::from_secs(30); - /// Capacity of the channel from the ingester to the decoder. pub const INGEST_TO_DECODER_CAPACITY: usize = 1024; /// Ingester component. /// -/// Generic over a `GrpcConnector` implementor so the unit tests can drive it -/// with a mock. -pub(crate) struct Ingester { - /// gRPC connector implementor - pub connector: C, +/// Generic over the update `Stream` so unit tests can drive it with a mock. +/// Production wires this to an `AutoReconnect`-backed `GeyserStream` via +/// [`Ingester::serve`]. +/// +/// `Ping`/`Pong` frames are ignored: the library passes them through, but they +/// carry no data the ingester needs, and answering server pings is not part of +/// the drain path. +pub(crate) struct Ingester +where + S: Stream> + Unpin + Send, +{ + /// The yellowstone update stream. Expected to be `AutoReconnect`-backed in + /// production, so reconnects happen inside the stream and never surface to + /// the drain loop. + pub stream: S, /// Sends `StreamUpdate` to the decoder. Should be bounded to /// `INGEST_TO_DECODER_CAPACITY` entries. pub tx: Sender, - /// Store implementor; used to checkpoint the slot. - pub store: Arc, + /// Latest chain slot seen on the slot filter. The ingester is the sole + /// writer. The `Arc` is taken from the caller so the watchdog and the + /// finalization worker can share it as a read handle once they are wired + /// up; neither reads it yet. Cold start is zero (`AtomicU64::default`). + pub latest_chain_slot: Arc, } -impl Ingester { - /// Construct a new ingester. The caller owns the channel capacity decision. - pub fn new(connector: C, tx: Sender, store: Arc) -> Self { +impl Ingester +where + S: Stream> + Unpin + Send, +{ + /// Construct a new ingester over an already-open update stream. The caller + /// supplies `latest_chain_slot` so it can share the same `Arc` + /// with the partial-event watchdog and the finalization worker, and reuse + /// it across restarts. The caller also owns building the stream, the + /// subscription request, the resume slot, and the reconnect policy that + /// come with it. Production wiring lives in [`Ingester::serve`]. + pub fn new(stream: S, tx: Sender, latest_chain_slot: Arc) -> Self { Self { - connector, + stream, + tx, + latest_chain_slot, + } + } + + /// Drain the update stream until it ends or the decoder hangs up. + /// + /// Recoverable stream errors never reach this loop: the `AutoReconnect` + /// wrapper handles them internally. Returns `Ok(())` when the decoder + /// dropped its receiver (clean shutdown), or [`Err(Error)`] when the stream + /// ended terminally (the wrapper gave up on an unrecoverable error, or the + /// stream closed). + pub async fn run(&mut self) -> Result<(), Error> { + while let Some(update) = self.stream.next().await { + match update { + Ok(update) => { + if Self::handle_update(&self.tx, &self.latest_chain_slot, update) + .await + .is_break() + { + tracing::info!("decoder channel closed; ingester stopping"); + return Ok(()); + } + } + Err(status) => { + tracing::warn!(%status, "yellowstone stream error; ingester stopping"); + return Err(Error::Stream(status)); + } + } + } + tracing::info!("yellowstone stream ended; ingester stopping"); + Err(Error::StreamEnded) + } + + /// Dispatch one wire message. Breaks when the decoder is gone. + /// + /// Associated function taking the channel and chain-tip counter by + /// reference rather than `&self`, so the future borrows only those (both + /// `Sync`) fields across awaits. That keeps `run`'s future `Send` without + /// requiring `Ingester: Sync` — the `GeyserStream` field is `Send` but not + /// `Sync`. + async fn handle_update( + tx: &Sender, + latest_chain_slot: &AtomicU64, + update: SubscribeUpdate, + ) -> ControlFlow<()> { + use UpdateOneof::*; + + let Some(update) = update.update_oneof else { + tracing::warn!("update without a payload"); + return ControlFlow::Continue(()); + }; + match update { + Transaction(tx_msg) => Self::handle_transaction(tx, tx_msg).await, + Account(account) => Self::handle_account(tx, account).await, + Slot(slot) => Self::handle_slot(latest_chain_slot, slot).await, + + // Ping/Pong frames carry no data the ingester needs; the library passes them through, + // and we drop them here. + Ping(_) | Pong(_) => ControlFlow::Continue(()), + + // Not part of our subscription; irrelevant to the ingester even if the provider sends + // them. + TransactionStatus(_) | Block(_) | BlockMeta(_) | Entry(_) => ControlFlow::Continue(()), + } + } + + /// Forward a transaction update to the decoder, skipping frames without a + /// body or with a malformed signature. + async fn handle_transaction( + tx: &Sender, + tx_msg: SubscribeUpdateTransaction, + ) -> ControlFlow<()> { + let Some(inner) = tx_msg.transaction else { + tracing::warn!(slot = tx_msg.slot, "transaction update without a body"); + return ControlFlow::Continue(()); + }; + let Ok(signature) = Signature::try_from(inner.signature.as_slice()) else { + tracing::warn!( + slot = tx_msg.slot, + "transaction update with a malformed signature" + ); + return ControlFlow::Continue(()); + }; + Self::forward( + tx, + StreamUpdate::Tx { + slot: Slot(tx_msg.slot), + signature, + inner: Box::new(inner), + }, + ) + .await + } + + /// Forward an account update to the decoder, skipping frames without a + /// body. + async fn handle_account( + tx: &Sender, + account: SubscribeUpdateAccount, + ) -> ControlFlow<()> { + let Some(inner) = account.account else { + tracing::warn!(slot = account.slot, "account update without a body"); + return ControlFlow::Continue(()); + }; + let txn_signature = inner + .txn_signature + .as_deref() + .and_then(|bytes| Signature::try_from(bytes).ok()); + Self::forward( tx, - store, + StreamUpdate::Account { + slot: Slot(account.slot), + txn_signature, + inner: Box::new(inner), + }, + ) + .await + } + + /// Consume a slot message: advance the in-memory chain-tip counter. Slot + /// messages never enter the channel, so this always continues. + async fn handle_slot( + latest_chain_slot: &AtomicU64, + slot: SubscribeUpdateSlot, + ) -> ControlFlow<()> { + latest_chain_slot.fetch_max(slot.slot, Ordering::Relaxed); + ControlFlow::Continue(()) + } + + /// Push one update into the decoder channel. A full channel is the intended + /// overload signal: warn once, then block until the decoder drains. Breaks + /// when the decoder dropped its receiver. + async fn forward(tx: &Sender, update: StreamUpdate) -> ControlFlow<()> { + match tx.try_send(update) { + Ok(()) => ControlFlow::Continue(()), + Err(TrySendError::Full(update)) => { + // TODO: Rate-limit if sustained backpressure floods logs. + tracing::warn!("decoder channel full; ingester blocked on backpressure"); + match tx.send(update).await { + Ok(()) => ControlFlow::Continue(()), + Err(_) => ControlFlow::Break(()), + } + } + Err(TrySendError::Closed(_)) => ControlFlow::Break(()), } } +} + +/// Why the ingester stopped. +#[derive(Debug, thiserror::Error)] +pub(crate) enum Error { + /// The persisted watermark could not be read. + #[error("failed to read the resume watermark: {0}")] + Store(#[from] StoreError), + /// The yellowstone subscription could not be opened. + #[error("failed to open the yellowstone subscription: {0}")] + Subscribe(#[from] GeyserGrpcClientError), + /// The stream returned a terminal gRPC error — the `AutoReconnect` wrapper + /// gave up on an unrecoverable failure. + #[error("yellowstone stream error: {0}")] + Stream(#[from] Status), + /// The stream ended without an error — the `AutoReconnect` wrapper stopped. + #[error("yellowstone stream ended")] + StreamEnded, +} + +impl Ingester { + /// Production entrypoint: build the subscription request, resume from the + /// persisted watermark, open an `AutoReconnect`-backed `GeyserStream`, and + /// run the drain loop. + /// + /// The initial `from_slot` is `watermark + 1`, or `None` on a cold start + /// (the provider subscribes from the live tip). Reconnect `from_slot` is + /// driven by the `AutoReconnect` wrapper's `BlockMeta` checkpoint, not this + /// method. + /// + /// Returns `Ok(())` on a clean shutdown (the decoder dropped its receiver), + /// or `Err(Error)` if setup failed or the stream ended terminally. The + /// client is consumed and dropped with the ingester. + /// + /// `latest_chain_slot` is taken from the caller so the same `Arc` can be + /// shared with the watchdog and finalization worker and reused across + /// restarts. + pub async fn serve( + mut client: GeyserGrpcClient, + tx: Sender, + store: St, + latest_chain_slot: Arc, + settlement_program: Pubkey, + solflow_program: Pubkey, + ) -> Result<(), Error> { + let request = subscribe_request(settlement_program, solflow_program); + let from_slot = store.read_watermark().await?.map(|watermark| watermark + 1); + let request = SubscribeRequest { + from_slot, + ..request + }; - /// TODO: Outer loop: open the subscription, drain it, push into the - /// channel, reconnect on failure with exponential backoff. - pub async fn run(&mut self) { - unimplemented!() + // The sink is the bidi request half: if kept, it can reconfigure the + // subscription at runtime (add/remove a tracked program, change commitment, + // narrow filters). Not used for this puprose at this time, but worth + // considering in case our indexing requirements get more dynamic. + let (_sink, stream) = client.subscribe_with_request(Some(request)).await?; + + let mut ingester = Ingester::new(stream, tx, latest_chain_slot); + ingester.run().await?; + Ok(()) + } +} + +/// Temporary compile-time proof that [`Ingester::serve`]'s future is `Send`. +/// +/// Keep this only until a real `tokio::spawn(Ingester::serve(...))` call site +/// lands; the actual spawn is the better check. Delete this helper then. +#[allow(dead_code)] +fn assert_serve_future_is_send( + client: GeyserGrpcClient, + tx: Sender, + store: St, + latest_chain_slot: Arc, + settlement_program: Pubkey, + solflow_program: Pubkey, +) { + fn is_send(_: F) {} + is_send(Ingester::serve( + client, + tx, + store, + latest_chain_slot, + settlement_program, + solflow_program, + )); +} + +/// The wire-level filter shape: the four named program filters and the +/// `chain_tip` slot filter, multiplexed into a single subscription at +/// `confirmed` commitment. `from_slot` is left unset; [`Ingester::serve`] fills +/// it in from the persisted watermark before subscribing. +/// +/// The library auto-adds a `BlockMeta` + `slot` filter (under its +/// `__autoreconnect` key) so the `AutoReconnect` wrapper can checkpoint and +/// resume on reconnect; those messages are consumed inside the wrapper and +/// never reach the ingester. +/// +/// TODO: source the exact subscriptions from a config file once this crate's +/// configuration module lands. +fn subscribe_request(settlement_program: Pubkey, solflow_program: Pubkey) -> SubscribeRequest { + // `failed: None` includes failed transactions: the failure itself is the + // on-chain signal downstream consumers read. + let transactions = |program: Pubkey| SubscribeRequestFilterTransactions { + vote: Some(false), + failed: None, + account_include: vec![program.to_string()], + ..Default::default() + }; + let accounts = |program: Pubkey| SubscribeRequestFilterAccounts { + owner: vec![program.to_string()], + ..Default::default() + }; + SubscribeRequest { + transactions: [ + ( + "settlement_txs".to_owned(), + transactions(settlement_program), + ), + ("sol_flow_txs".to_owned(), transactions(solflow_program)), + ] + .into(), + accounts: [ + ("settlement_owned".to_owned(), accounts(settlement_program)), + ("sol_flow_owned".to_owned(), accounts(solflow_program)), + ] + .into(), + slots: [( + "chain_tip".to_owned(), + SubscribeRequestFilterSlots { + // one message per slot at the subscription's commitment level + filter_by_commitment: Some(true), + ..Default::default() + }, + )] + .into(), + commitment: Some(CommitmentLevel::Confirmed as i32), + ..Default::default() } } diff --git a/crates/solana-indexer/src/indexer/watchdog.rs b/crates/solana-indexer/src/indexer/watchdog.rs index ef1f408207..342a6f975e 100644 --- a/crates/solana-indexer/src/indexer/watchdog.rs +++ b/crates/solana-indexer/src/indexer/watchdog.rs @@ -17,15 +17,12 @@ use { std::sync::Arc, }; -#[allow(unused_imports)] -use crate::indexer::ingester::LATEST_CHAIN_SLOT; - /// Partial-event watchdog component. /// /// The watchdog holds a view of the partial-event map the decoder mutates. /// /// Every 500 ms it scans the map and gives up on any partial more than 32 slots -/// behind `LATEST_CHAIN_SLOT`. +/// behind the ingester's latest-chain-slot counter. /// /// Those entries are flushed to `solana.dead_letter` with a reason of /// `AccountUpdateMissing` or `TxUpdateMissing` depending on which half was diff --git a/crates/solana-indexer/src/types/commitment.rs b/crates/solana-indexer/src/types/commitment.rs index 07cf688514..c0c95ac05e 100644 --- a/crates/solana-indexer/src/types/commitment.rs +++ b/crates/solana-indexer/src/types/commitment.rs @@ -69,7 +69,8 @@ pub(crate) struct AccountInfo { /// A `solana.*` row that has not yet reached `finalized` commitment — the kind /// picked up by the aged-row sweep, where `commitment = 'confirmed'` and the -/// row's slot is at least one finalization window behind `LATEST_CHAIN_SLOT`. +/// row's slot is at least one finalization window behind the latest chain +/// slot. #[derive(Debug, Clone)] pub(crate) struct UnfinalizedRow { /// Table the row lives in. diff --git a/crates/solana-indexer/src/types/errors.rs b/crates/solana-indexer/src/types/errors.rs index 27607bc304..568462788a 100644 --- a/crates/solana-indexer/src/types/errors.rs +++ b/crates/solana-indexer/src/types/errors.rs @@ -43,8 +43,8 @@ pub(crate) enum StreamError { #[error("stream send timeout")] SendTimeout, /// The resume slot is outside the provider's replay window. The caller - /// should reset `from_slot` to `LATEST_CHAIN_SLOT − replay_window`, - /// record the lost range, and retry the subscription. + /// should reset `from_slot` to the latest chain slot minus the replay + /// window, record the lost range, and retry the subscription. #[error( "replay window exceeded: attempted slot {attempted_slot}, earliest replayable \ {earliest_replayable_slot}" diff --git a/crates/solana-indexer/src/types/wire.rs b/crates/solana-indexer/src/types/wire.rs index 8db1bf55b6..c52e94703a 100644 --- a/crates/solana-indexer/src/types/wire.rs +++ b/crates/solana-indexer/src/types/wire.rs @@ -3,7 +3,22 @@ //! Re-exports of the `yellowstone-grpc-proto` message types the indexer //! consumes as its wire-format surface. pub use yellowstone_grpc_proto::{ - geyser::{SubscribeUpdateAccountInfo, SubscribeUpdateTransactionInfo}, + geyser::{ + CommitmentLevel, + SlotStatus, + SubscribeRequest, + SubscribeRequestFilterAccounts, + SubscribeRequestFilterSlots, + SubscribeRequestFilterTransactions, + SubscribeUpdate, + SubscribeUpdateAccount, + SubscribeUpdateAccountInfo, + SubscribeUpdatePing, + SubscribeUpdateSlot, + SubscribeUpdateTransaction, + SubscribeUpdateTransactionInfo, + subscribe_update::UpdateOneof, + }, solana::storage::confirmed_block::{ CompiledInstruction, InnerInstructions,