Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
cc93ca1
driver: parallel unsupported order detection
metalurgical Apr 20, 2026
889f437
driver(tests): unsupported_order_uids test cases
metalurgical Apr 22, 2026
ae060d2
lint: run cargo fmt
metalurgical Apr 22, 2026
c5cf81d
refactor
metalurgical Apr 29, 2026
8ae2e9b
refactor: implement trait to cover simulation branch in test
metalurgical Apr 29, 2026
144212f
fix: lint
metalurgical Apr 30, 2026
8375c44
fix: lint
metalurgical Apr 30, 2026
13d0be9
Merge branch 'main' into fix_chore_3516
metalurgical May 1, 2026
5ad8f6b
update: use tokio::spawn to schedule filtering and liquidity fetching…
metalurgical May 1, 2026
b56b2b9
review: address comments
metalurgical May 4, 2026
6045c8b
refactor: simplify test
metalurgical May 5, 2026
ede7489
review: address additional review comments
metalurgical May 5, 2026
efc6d90
fix: comment
metalurgical May 5, 2026
e01df58
fix: comment
metalurgical May 5, 2026
9dac353
Merge branch 'main' into fix_chore_3516
metalurgical May 5, 2026
5707a99
Merge branch 'main' into fix_chore_3516
metalurgical May 6, 2026
c06963a
refactor: separate concerns in without_unsupported_orders
metalurgical May 6, 2026
60e766f
update
metalurgical May 7, 2026
f41f0d9
Merge branch 'main' into fix_chore_3516
metalurgical May 8, 2026
e954f2c
update: flashloan test for quote changes
metalurgical May 9, 2026
035b6bf
fix: flashloan quote test names
metalurgical May 9, 2026
9aa8acc
Merge branch 'main' into fix_chore_3516
metalurgical May 11, 2026
e17a6ae
Merge branch 'main' into fix_chore_3516
metalurgical May 11, 2026
405dfc9
Merge branch 'main' into fix_chore_3516
metalurgical May 12, 2026
be82231
Merge branch 'main' into fix_chore_3516
metalurgical May 16, 2026
e9ce935
Merge branch 'main' into fix_chore_3516
metalurgical May 21, 2026
b6293f1
Merge branch 'main' into fix_chore_3516
metalurgical May 22, 2026
0e22fb0
Merge branch 'main' into fix_chore_3516
metalurgical May 24, 2026
fcc1f50
Merge branch 'main' into fix_chore_3516
metalurgical May 26, 2026
4f06588
Merge branch 'main' into fix_chore_3516
metalurgical May 27, 2026
01b1703
Merge branch 'main' into fix_chore_3516
metalurgical May 29, 2026
3fab09c
Merge branch 'main' into fix_chore_3516
metalurgical May 30, 2026
f9e8b33
Merge branch 'main' into fix_chore_3516
metalurgical Jun 2, 2026
49c0f66
Merge branch 'main' into fix_chore_3516
metalurgical Jun 5, 2026
21ea1fb
Merge branch 'main' into fix_chore_3516
metalurgical Jun 9, 2026
58ac443
Merge branch 'main' into fix_chore_3516
metalurgical Jun 9, 2026
09609b1
Merge branch 'main' into fix_chore_3516
metalurgical Jun 15, 2026
65614c8
Merge remote-tracking branch 'upstream/main' into fix_chore_3516
metalurgical Jun 20, 2026
4135259
update
metalurgical Jun 20, 2026
53514c9
cargo-fmt
metalurgical Jun 20, 2026
5f1f50e
add abort guard
metalurgical Jun 20, 2026
ed1decb
update
metalurgical Jun 20, 2026
93f0024
Merge branch 'main' into fix_chore_3516
metalurgical Jun 25, 2026
d7fdb55
Merge branch 'main' into fix_chore_3516
metalurgical Jun 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/balance-overrides/src/balance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ mod tests {

let strategy = detector.detect(NATIVE_ETH, user).await.unwrap();

std::assert_matches!(strategy, Strategy::NativeEth);
std::assert_eq!(strategy, Strategy::NativeEth);

// ETH is not an ERC20 token so we can't do an `eth_call` on `balanceOf()`.
// Additionally `eth_getBalance` does not support state overrides.
Expand Down
1 change: 1 addition & 0 deletions crates/driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ app-data = { workspace = true, features = ["test_helpers"] }
contracts = { workspace = true }
ethrpc = { workspace = true, features = ["test-util"] }
maplit = { workspace = true }
mockall = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["process", "test-util"] }

Expand Down
78 changes: 57 additions & 21 deletions crates/driver/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use {
time::Instant,
},
tokio::{sync::mpsc, task},
tracing::{Instrument, instrument},
tracing::Instrument,
};

pub mod auction;
Expand Down Expand Up @@ -259,6 +259,22 @@ impl Drop for SettleTaskHandle {
}
}

struct AbortGuard(Vec<task::AbortHandle>);

impl AbortGuard {
fn disarm(&mut self) {
self.0.clear();
}
}

impl Drop for AbortGuard {
fn drop(&mut self) {
for handle in &self.0 {
handle.abort();
}
}
}

#[derive(Debug)]
pub struct Competition {
pub solver: Solver,
Expand Down Expand Up @@ -330,15 +346,44 @@ impl Competition {
Error::MalformedRequest
})?;

let auction = self.assemble_auction(&tasks).await;
let mut auction = self.assemble_auction(&tasks).await;

let liquidity = async {
match self.solver.liquidity() {
solver::Liquidity::Fetch => tasks.liquidity.await,
solver::Liquidity::Skip => Arc::new(Vec::new()),
// We can run bad token filtering and liquidity fetching in parallel
let risk_detector = self.risk_detector.clone();
let flashloans_enabled = self.solver.config().flashloans_enabled;
let liquidity_mode = self.solver.liquidity();
let auction_handle = tokio::spawn(
async move {
risk_detector
.without_unsupported_orders(&mut auction.orders, flashloans_enabled)
.await;
auction
}
}
.await;
.in_current_span(),
);
let liquidity_handle = tokio::spawn(
async move {
match liquidity_mode {
solver::Liquidity::Fetch => tasks.liquidity.await,
solver::Liquidity::Skip => Arc::new(Vec::new()),
}
}
.in_current_span(),
);
let mut drop_guard = AbortGuard(vec![
auction_handle.abort_handle(),
liquidity_handle.abort_handle(),
]);
let (auction, liquidity) = tokio::join!(auction_handle, liquidity_handle);
drop_guard.disarm();
let auction = auction.map_err(|err| {
tracing::error!(?err, "order filtering task failed");
Error::InternalError(err.to_string())
})?;
let liquidity = liquidity.map_err(|err| {
tracing::error!(?err, "liquidity fetch task failed");
Error::InternalError(err.to_string())
})?;

let elapsed = start.elapsed();
metrics::get()
Expand Down Expand Up @@ -634,14 +679,13 @@ impl Competition {
tasks.app_data.clone()
);

let auction = Self::run_blocking_with_timer("update_orders", move || {
Self::run_blocking_with_timer("update_orders", move || {
// Same as before with sort_orders, we use spawn_blocking() because a lot of CPU
// bound computations are happening and we want to avoid blocking
// the runtime.
Self::update_orders(auction, balances, app_data, cow_amm_orders)
})
.await;
self.without_unsupported_orders(auction).await
.await
}

// Oders already need to be sorted from most relevant to least relevant so that
Expand Down Expand Up @@ -970,16 +1014,6 @@ impl Competition {
}
Ok(())
}

#[instrument(skip_all)]
async fn without_unsupported_orders(&self, mut auction: Auction) -> Auction {
if !self.solver.config().flashloans_enabled {
auction.orders.retain(|o| o.app_data.flashloan().is_none());
}
self.risk_detector
.filter_unsupported_orders_in_auction(auction)
.await
}
}

const MAX_SOLUTIONS_TO_MERGE: usize = 10;
Expand Down Expand Up @@ -1101,4 +1135,6 @@ pub enum Error {
NoValidOrdersFound,
#[error("could not parse the request")]
MalformedRequest,
#[error("internal error: {0}")]
InternalError(String),
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ use {
infra::{self, observe::metrics},
},
bad_tokens::{TokenQuality, trace_call::TraceCallDetectorRaw},
eth_domain_types as eth,
futures::FutureExt,
model::interaction::InteractionData,
request_sharing::BoxRequestSharing,
std::{
ops::Deref,
sync::Arc,
time::{Duration, Instant},
},
Expand All @@ -21,6 +23,29 @@ use {
#[derive(Clone)]
pub struct Detector(Arc<Inner>);

#[cfg_attr(test, mockall::automock)]
#[async_trait::async_trait]
pub trait DetectorApi: Send + Sync {
async fn determine_sell_token_quality(&self, order: &Order, now: Instant) -> Quality;
fn get_quality(&self, token: &eth::TokenAddress, now: Instant) -> Quality;
fn evict_outdated_entries(&self);
}

#[async_trait::async_trait]
impl DetectorApi for Detector {
async fn determine_sell_token_quality(&self, order: &Order, now: Instant) -> Quality {
self.determine_sell_token_quality(order, now).await
}

fn get_quality(&self, token: &eth::TokenAddress, now: Instant) -> Quality {
Deref::deref(self).get_quality(token, now)
}

fn evict_outdated_entries(&self) {
Deref::deref(self).evict_outdated_entries()
}
}

struct Inner {
cache: Cache,
detector: TraceCallDetectorRaw,
Expand Down Expand Up @@ -107,7 +132,7 @@ impl Detector {
}
}

impl std::ops::Deref for Detector {
impl Deref for Detector {
type Target = Cache;

fn deref(&self) -> &Self::Target {
Expand Down
Loading
Loading