Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix metrics and observations in autopilot runloop #3039

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/autopilot/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use {

type SolutionId = u64;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Solution {
id: SolutionId,
solver: eth::Address,
Expand Down
8 changes: 4 additions & 4 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ pub async fn run(args: Arguments) {
hardcoded: args.trusted_tokens.unwrap_or_default(),
};
// updated in background task
let market_makable_token_list =
let trusted_tokens =
AutoUpdatingTokenList::from_configuration(market_makable_token_list_configuration).await;

let mut maintenance = Maintenance::new(
Expand Down Expand Up @@ -546,7 +546,7 @@ pub async fn run(args: Arguments) {
})
.collect(),
solvable_orders_cache,
market_makable_token_list,
trusted_tokens,
liveness.clone(),
Arc::new(maintenance),
);
Expand All @@ -566,11 +566,11 @@ async fn shadow_mode(args: Arguments) -> ! {
.drivers
.into_iter()
.map(|driver| {
infra::Driver::new(
Arc::new(infra::Driver::new(
driver.url,
driver.name,
driver.fairness_threshold.map(Into::into),
)
))
})
.collect();

Expand Down
170 changes: 98 additions & 72 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use {
rand::seq::SliceRandom,
shared::token_list::AutoUpdatingTokenList,
std::{
collections::{HashMap, HashSet, VecDeque},
collections::{HashMap, HashSet},
sync::Arc,
time::{Duration, Instant},
},
Expand All @@ -61,7 +61,7 @@ pub struct RunLoop {
persistence: infra::Persistence,
drivers: Vec<Arc<infra::Driver>>,
solvable_orders_cache: Arc<SolvableOrdersCache>,
market_makable_token_list: AutoUpdatingTokenList,
trusted_tokens: AutoUpdatingTokenList,
in_flight_orders: Arc<Mutex<HashSet<OrderUid>>>,
liveness: Arc<Liveness>,
/// Maintenance tasks that should run before every runloop to have
Expand All @@ -80,7 +80,7 @@ impl RunLoop {
persistence: infra::Persistence,
drivers: Vec<Arc<infra::Driver>>,
solvable_orders_cache: Arc<SolvableOrdersCache>,
market_makable_token_list: AutoUpdatingTokenList,
trusted_tokens: AutoUpdatingTokenList,
liveness: Arc<Liveness>,
maintenance: Arc<Maintenance>,
) -> Self {
Expand All @@ -97,7 +97,7 @@ impl RunLoop {
persistence,
drivers,
solvable_orders_cache,
market_makable_token_list,
trusted_tokens,
in_flight_orders: Default::default(),
liveness,
maintenance,
Expand Down Expand Up @@ -245,7 +245,25 @@ impl RunLoop {

let auction = self.remove_in_flight_orders(auction).await;

// Mark all auction orders as `Ready` for competition
self.persistence.store_order_events(
auction.orders.iter().map(|o| OrderUid(o.uid.0)),
OrderEventLabel::Ready,
);

// Collect valid solutions from all drivers
let solutions = self.competition(&auction).await;
observe::solutions(&solutions);

// Mark all solved orders as `Considered` for execution
self.persistence.store_order_events(
solutions
.iter()
.flat_map(|s| s.solution.order_ids().copied()),
OrderEventLabel::Considered,
);

// Pick winners for execution
let winners = self.select_winners(&solutions);
if winners.is_empty() {
tracing::info!("no winners for auction");
Expand Down Expand Up @@ -273,9 +291,14 @@ impl RunLoop {
return;
}

observe::unsettled(&solutions, &winners, &auction);
for Participant { driver, solution } in winners {
tracing::info!(driver = %driver.name, solution = %solution.id(), "winner");

// Mark all winning orders as `Executing`
self.persistence
.store_order_events(solution.order_ids().copied(), OrderEventLabel::Executing);

self.start_settlement_execution(
auction.id,
single_run_start,
Expand Down Expand Up @@ -383,7 +406,7 @@ impl RunLoop {
auction: &domain::Auction,
competition_simulation_block: u64,
winning_solution: &competition::Solution,
solutions: &VecDeque<Participant>,
solutions: &[Participant],
block_deadline: u64,
) -> Result<()> {
let start = Instant::now();
Expand Down Expand Up @@ -501,18 +524,14 @@ impl RunLoop {

/// Runs the solver competition, making all configured drivers participate.
/// Returns all fair solutions sorted by their score (best to worst).
async fn competition(&self, auction: &domain::Auction) -> VecDeque<Participant> {
async fn competition(&self, auction: &domain::Auction) -> Vec<Participant> {
let request = solve::Request::new(
auction,
&self.market_makable_token_list.all(),
&self.trusted_tokens.all(),
self.config.solve_deadline,
);
let request = &request;

let order_uids = auction.orders.iter().map(|o| OrderUid(o.uid.0));
self.persistence
.store_order_events(order_uids, OrderEventLabel::Ready);

let mut solutions = futures::future::join_all(
self.drivers
.iter()
Expand All @@ -529,16 +548,22 @@ impl RunLoop {
std::cmp::Reverse(participant.solution.score().get().0)
});

// Make sure the winning solution is fair.
let mut solutions = solutions.into_iter().collect::<VecDeque<_>>();
while !Self::is_solution_fair(solutions.front(), &solutions, auction) {
let unfair_solution = solutions.pop_front().expect("must exist");
tracing::warn!(
invalidated = unfair_solution.driver.name,
"fairness check invalidated of solution"
);
}
self.report_on_solutions(&solutions, auction);
// Filter out solutions that are not fair
let solutions = solutions
.iter()
.enumerate()
.filter_map(|(index, participant)| {
if Self::is_solution_fair(participant, &solutions[index..], auction) {
Some(participant.clone())
} else {
tracing::warn!(
invalidated = participant.driver.name,
"fairness check invalidated of solution"
);
None
}
})
.collect();

solutions
}
Expand All @@ -551,7 +576,7 @@ impl RunLoop {
/// until `max_winners_per_auction` are selected. The solution is a winner
/// if it swaps tokens that are not yet swapped by any other already
/// selected winner.
fn select_winners<'a>(&self, participants: &'a VecDeque<Participant>) -> Vec<&'a Participant> {
fn select_winners<'a>(&self, participants: &'a [Participant]) -> Vec<&'a Participant> {
let mut winners = Vec::new();
let mut already_swapped_tokens = HashSet::new();
for participant in participants.iter() {
Expand All @@ -572,58 +597,12 @@ impl RunLoop {
winners
}

/// Records metrics, order events and logs for the given solutions.
/// Expects the winning solution to be the first in the list.
fn report_on_solutions(&self, solutions: &VecDeque<Participant>, auction: &domain::Auction) {
let Some(winner) = solutions.front() else {
// no solutions means nothing to report
return;
};

solutions.iter().for_each(|solution| {
tracing::debug!(
driver=%solution.driver.name,
orders=?solution.solution.order_ids(),
solution=solution.solution.id(),
"proposed solution"
);
});

let proposed_orders: HashSet<_> = solutions
.iter()
.flat_map(|solution| solution.solution.order_ids().copied())
.collect();
let winning_orders: HashSet<_> = solutions
.front()
.into_iter()
.flat_map(|solution| solution.solution.order_ids().copied())
.collect();
let mut non_winning_orders: HashSet<_> = proposed_orders
.difference(&winning_orders)
.cloned()
.collect();
self.persistence.store_order_events(
non_winning_orders.iter().cloned(),
OrderEventLabel::Considered,
);
self.persistence
.store_order_events(winning_orders, OrderEventLabel::Executing);

let auction_uids = auction.orders.iter().map(|o| o.uid).collect::<HashSet<_>>();

// Report orders that were part of a non-winning solution candidate
// but only if they were part of the auction (filter out jit orders)
non_winning_orders.retain(|uid| auction_uids.contains(uid));
Metrics::matched_unsettled(&winner.driver, non_winning_orders);
}

/// Returns true if winning solution is fair or winner is None
/// Returns true if winning solution is fair
fn is_solution_fair(
winner: Option<&Participant>,
remaining: &VecDeque<Participant>,
winner: &Participant,
remaining: &[Participant],
auction: &domain::Auction,
) -> bool {
let Some(winner) = winner else { return true };
let Some(fairness_threshold) = winner.driver.fairness_threshold else {
return true;
};
Expand Down Expand Up @@ -897,7 +876,8 @@ impl RunLoop {
}
}

struct Participant {
#[derive(Clone)]
pub struct Participant {
driver: Arc<infra::Driver>,
solution: competition::Solution,
}
Expand Down Expand Up @@ -1103,4 +1083,50 @@ pub mod observe {
"Orders no longer in auction"
);
}

pub fn solutions(solutions: &[super::Participant]) {
if solutions.is_empty() {
tracing::info!("no solutions for auction");
}
for participant in solutions {
tracing::debug!(
driver = %participant.driver.name,
orders = ?participant.solution.order_ids(),
solution = %participant.solution.id(),
"proposed solution"
);
}
}

/// Records metrics for the matched but unsettled orders.
pub fn unsettled(
solutions: &[super::Participant],
winners: &[&super::Participant],
auction: &domain::Auction,
) {
let Some(winner) = winners.first() else {
// no solutions means nothing to report
return;
};

let proposed_orders: HashSet<_> = solutions
.iter()
.flat_map(|participant| participant.solution.order_ids().copied())
.collect();
let winning_orders: HashSet<_> = winners
.iter()
.flat_map(|participant| participant.solution.order_ids().copied())
.collect();
let mut non_winning_orders: HashSet<_> = proposed_orders
.difference(&winning_orders)
.cloned()
.collect();

let auction_uids = auction.orders.iter().map(|o| o.uid).collect::<HashSet<_>>();

// Report orders that were part of a non-winning solution candidate
// but only if they were part of the auction (filter out jit orders)
non_winning_orders.retain(|uid| auction_uids.contains(uid));
super::Metrics::matched_unsettled(&winner.driver, non_winning_orders);
}
}
4 changes: 2 additions & 2 deletions crates/autopilot/src/shadow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use {

pub struct RunLoop {
orderbook: infra::shadow::Orderbook,
drivers: Vec<infra::Driver>,
drivers: Vec<Arc<infra::Driver>>,
trusted_tokens: AutoUpdatingTokenList,
auction: domain::auction::Id,
block: u64,
Expand All @@ -50,7 +50,7 @@ impl RunLoop {
#[allow(clippy::too_many_arguments)]
pub fn new(
orderbook: infra::shadow::Orderbook,
drivers: Vec<infra::Driver>,
drivers: Vec<Arc<infra::Driver>>,
trusted_tokens: AutoUpdatingTokenList,
solve_deadline: Duration,
liveness: Arc<Liveness>,
Expand Down
Loading