From 416c7676768dd93de05e2027704677e9dfc500e6 Mon Sep 17 00:00:00 2001 From: Dusan Stanivukovic Date: Fri, 4 Oct 2024 10:38:49 +0200 Subject: [PATCH] Multiple winners in autopilot (#2996) # Description Fixes https://github.com/cowprotocol/services/issues/2994 Fixes https://github.com/cowprotocol/services/issues/2995 Fixes https://github.com/cowprotocol/services/issues/2998 - [ ] Adds configuration parameter `max_winners_per_auction` to enable/disable multiple winners feature. - [ ] Chooses the winners based on the `max_winners_per_auction`. - [ ] Then issues multiple settle calls for each of the winners. The only remaining part to implement is competition saving but this is captured with https://github.com/cowprotocol/services/issues/3021 --- crates/autopilot/src/arguments.rs | 7 + .../autopilot/src/infra/solvers/dto/solve.rs | 43 +++--- crates/autopilot/src/run.rs | 2 + crates/autopilot/src/run_loop.rs | 115 +++++++++++----- crates/autopilot/src/shadow.rs | 130 +++++++++++++----- 5 files changed, 209 insertions(+), 88 deletions(-) diff --git a/crates/autopilot/src/arguments.rs b/crates/autopilot/src/arguments.rs index 16efd33716..a32869bb63 100644 --- a/crates/autopilot/src/arguments.rs +++ b/crates/autopilot/src/arguments.rs @@ -232,6 +232,11 @@ pub struct Arguments { /// If the value is 0, the native prices are fetched from the cache #[clap(long, env, default_value = "0s", value_parser = humantime::parse_duration)] pub run_loop_native_price_timeout: Duration, + + #[clap(long, env, default_value = "1")] + /// The maximum number of winners per auction. Each winner will be allowed + /// to settle their winning orders at the same time. + pub max_winners_per_auction: usize, } impl std::fmt::Display for Arguments { @@ -277,6 +282,7 @@ impl std::fmt::Display for Arguments { run_loop_mode, max_run_loop_delay, run_loop_native_price_timeout, + max_winners_per_auction, } = self; write!(f, "{}", shared)?; @@ -356,6 +362,7 @@ impl std::fmt::Display for Arguments { "run_loop_native_price_timeout: {:?}", run_loop_native_price_timeout )?; + writeln!(f, "max_winners_per_auction: {:?}", max_winners_per_auction)?; Ok(()) } } diff --git a/crates/autopilot/src/infra/solvers/dto/solve.rs b/crates/autopilot/src/infra/solvers/dto/solve.rs index 8b4682d5d1..04670462bd 100644 --- a/crates/autopilot/src/infra/solvers/dto/solve.rs +++ b/crates/autopilot/src/infra/solvers/dto/solve.rs @@ -98,27 +98,7 @@ impl Solution { domain::competition::Score::new(self.score.into())?, self.orders .into_iter() - .map(|(o, amounts)| { - ( - o.into(), - domain::competition::TradedOrder { - sell: eth::Asset { - token: amounts.sell_token.into(), - amount: amounts.limit_sell.into(), - }, - buy: eth::Asset { - token: amounts.buy_token.into(), - amount: amounts.limit_buy.into(), - }, - side: match amounts.side { - Side::Buy => domain::auction::order::Side::Buy, - Side::Sell => domain::auction::order::Side::Sell, - }, - executed_sell: amounts.executed_sell.into(), - executed_buy: amounts.executed_buy.into(), - }, - ) - }) + .map(|(o, amounts)| (o.into(), amounts.into_domain())) .collect(), self.clearing_prices .into_iter() @@ -155,6 +135,27 @@ pub struct TradedOrder { executed_buy: U256, } +impl TradedOrder { + pub fn into_domain(self) -> domain::competition::TradedOrder { + domain::competition::TradedOrder { + sell: eth::Asset { + token: self.sell_token.into(), + amount: self.limit_sell.into(), + }, + buy: eth::Asset { + token: self.buy_token.into(), + amount: self.limit_buy.into(), + }, + side: match self.side { + Side::Buy => domain::auction::order::Side::Buy, + Side::Sell => domain::auction::order::Side::Sell, + }, + executed_sell: self.executed_sell.into(), + executed_buy: self.executed_buy.into(), + } + } +} + #[serde_as] #[derive(Clone, Debug, Deserialize)] #[serde(rename_all = "camelCase", deny_unknown_fields)] diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 06c95c2e8b..3066855d61 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -528,6 +528,7 @@ pub async fn run(args: Arguments) { solve_deadline: args.solve_deadline, synchronization: args.run_loop_mode, max_run_loop_delay: args.max_run_loop_delay, + max_winners_per_auction: args.max_winners_per_auction, }; let run = RunLoop::new( @@ -622,6 +623,7 @@ async fn shadow_mode(args: Arguments) -> ! { liveness.clone(), args.run_loop_mode, current_block, + args.max_winners_per_auction, ); shadow.run_forever().await; diff --git a/crates/autopilot/src/run_loop.rs b/crates/autopilot/src/run_loop.rs index 1e590e40a6..ac2ed3c197 100644 --- a/crates/autopilot/src/run_loop.rs +++ b/crates/autopilot/src/run_loop.rs @@ -35,7 +35,7 @@ use { rand::seq::SliceRandom, shared::token_list::AutoUpdatingTokenList, std::{ - collections::{HashMap, HashSet}, + collections::{HashMap, HashSet, VecDeque}, sync::Arc, time::{Duration, Instant}, }, @@ -52,6 +52,7 @@ pub struct Config { /// allowed to start before it has to re-synchronize to the blockchain /// by waiting for the next block to appear. pub max_run_loop_delay: Duration, + pub max_winners_per_auction: usize, } pub struct RunLoop { @@ -83,6 +84,13 @@ impl RunLoop { liveness: Arc, maintenance: Arc, ) -> Self { + // Added to make sure no more than one winner is activated by accident + // Should be removed once we decide to activate "multiple winners per auction" + // feature. + assert_eq!( + config.max_winners_per_auction, 1, + "only one winner is supported" + ); Self { config, eth, @@ -238,35 +246,36 @@ impl RunLoop { let auction = self.remove_in_flight_orders(auction).await; let solutions = self.competition(&auction).await; - if solutions.is_empty() { - tracing::info!("no solutions for auction"); + let winners = self.select_winners(&solutions); + if winners.is_empty() { + tracing::info!("no winners for auction"); return; } let competition_simulation_block = self.eth.current_block().borrow().number; + let block_deadline = competition_simulation_block + self.config.submission_deadline; + + // Post-processing should not be executed asynchronously since it includes steps + // of storing all the competition/auction-related data to the DB. + if let Err(err) = self + .post_processing( + &auction, + competition_simulation_block, + // TODO: Support multiple winners + // https://github.com/cowprotocol/services/issues/3021 + &winners.first().expect("must exist").solution, + &solutions, + block_deadline, + ) + .await + { + tracing::error!(?err, "failed to post-process competition"); + return; + } - // TODO: Keep going with other solutions until some deadline. - if let Some(Participant { driver, solution }) = solutions.last() { + for Participant { driver, solution } in winners { tracing::info!(driver = %driver.name, solution = %solution.id(), "winner"); - let block_deadline = competition_simulation_block + self.config.submission_deadline; - - // Post-processing should not be executed asynchronously since it includes steps - // of storing all the competition/auction-related data to the DB. - if let Err(err) = self - .post_processing( - &auction, - competition_simulation_block, - solution, - &solutions, - block_deadline, - ) - .await - { - tracing::error!(?err, "failed to post-process competition"); - return; - } - self.start_settlement_execution( auction.id, single_run_start, @@ -374,15 +383,15 @@ impl RunLoop { auction: &domain::Auction, competition_simulation_block: u64, winning_solution: &competition::Solution, - solutions: &[Participant], + solutions: &VecDeque, block_deadline: u64, ) -> Result<()> { let start = Instant::now(); let winner = winning_solution.solver().into(); let winning_score = winning_solution.score().get().0; let reference_score = solutions - .iter() - .nth_back(1) + // todo multiple winners per auction + .get(1) .map(|participant| participant.solution.score().get().0) .unwrap_or_default(); let participants = solutions @@ -426,6 +435,8 @@ impl RunLoop { }, solutions: solutions .iter() + // reverse as solver competition table is sorted from worst to best, so we need to keep the ordering for backwards compatibility + .rev() .enumerate() .map(|(index, participant)| SolverSettlement { solver: participant.driver.name.clone(), @@ -489,8 +500,8 @@ impl RunLoop { } /// Runs the solver competition, making all configured drivers participate. - /// Returns all fair solutions sorted by their score (worst to best). - async fn competition(&self, auction: &domain::Auction) -> Vec { + /// Returns all fair solutions sorted by their score (best to worst). + async fn competition(&self, auction: &domain::Auction) -> VecDeque { let request = solve::Request::new( auction, &self.market_makable_token_list.all(), @@ -514,11 +525,14 @@ impl RunLoop { // Shuffle so that sorting randomly splits ties. solutions.shuffle(&mut rand::thread_rng()); - solutions.sort_unstable_by_key(|participant| participant.solution.score().get().0); + solutions.sort_unstable_by_key(|participant| { + std::cmp::Reverse(participant.solution.score().get().0) + }); // Make sure the winning solution is fair. - while !Self::is_solution_fair(solutions.last(), &solutions, auction) { - let unfair_solution = solutions.pop().expect("must exist"); + let mut solutions = solutions.into_iter().collect::>(); + while !Self::is_solution_fair(solutions.front(), &solutions, auction) { + let unfair_solution = solutions.pop_front().expect("must exist"); tracing::warn!( invalidated = unfair_solution.driver.name, "fairness check invalidated of solution" @@ -529,10 +543,39 @@ impl RunLoop { solutions } + /// Chooses the winners from the given participants. + /// + /// Participants are already sorted by their score (best to worst). + /// + /// Winners are selected one by one, starting from the best solution, + /// until `max_winners_per_auction` are selected. The solution is a winner + /// if it swaps tokens that are not yet swapped by any other already + /// selected winner. + fn select_winners<'a>(&self, participants: &'a VecDeque) -> Vec<&'a Participant> { + let mut winners = Vec::new(); + let mut already_swapped_tokens = HashSet::new(); + for participant in participants.iter() { + let swapped_tokens = participant + .solution + .orders() + .iter() + .flat_map(|(_, order)| vec![order.sell.token, order.buy.token]) + .collect::>(); + if swapped_tokens.is_disjoint(&already_swapped_tokens) { + winners.push(participant); + already_swapped_tokens.extend(swapped_tokens); + if winners.len() >= self.config.max_winners_per_auction { + break; + } + } + } + winners + } + /// Records metrics, order events and logs for the given solutions. - /// Expects the winning solution to be the last in the list. - fn report_on_solutions(&self, solutions: &[Participant], auction: &domain::Auction) { - let Some(winner) = solutions.last() else { + /// Expects the winning solution to be the first in the list. + fn report_on_solutions(&self, solutions: &VecDeque, auction: &domain::Auction) { + let Some(winner) = solutions.front() else { // no solutions means nothing to report return; }; @@ -551,7 +594,7 @@ impl RunLoop { .flat_map(|solution| solution.solution.order_ids().copied()) .collect(); let winning_orders: HashSet<_> = solutions - .last() + .front() .into_iter() .flat_map(|solution| solution.solution.order_ids().copied()) .collect(); @@ -577,7 +620,7 @@ impl RunLoop { /// Returns true if winning solution is fair or winner is None fn is_solution_fair( winner: Option<&Participant>, - remaining: &Vec, + remaining: &VecDeque, auction: &domain::Auction, ) -> bool { let Some(winner) = winner else { return true }; diff --git a/crates/autopilot/src/shadow.rs b/crates/autopilot/src/shadow.rs index 5899410810..13007cf302 100644 --- a/crates/autopilot/src/shadow.rs +++ b/crates/autopilot/src/shadow.rs @@ -10,7 +10,7 @@ use { crate::{ arguments::RunLoopMode, - domain, + domain::{self, competition::TradedOrder}, infra::{ self, solvers::dto::{reveal, solve}, @@ -24,7 +24,12 @@ use { primitive_types::{H160, U256}, rand::seq::SliceRandom, shared::token_list::AutoUpdatingTokenList, - std::{cmp, sync::Arc, time::Duration}, + std::{ + cmp, + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, + }, tracing::Instrument, }; @@ -38,9 +43,11 @@ pub struct RunLoop { liveness: Arc, synchronization: RunLoopMode, current_block: CurrentBlockWatcher, + max_winners_per_auction: usize, } impl RunLoop { + #[allow(clippy::too_many_arguments)] pub fn new( orderbook: infra::shadow::Orderbook, drivers: Vec, @@ -49,7 +56,12 @@ impl RunLoop { liveness: Arc, synchronization: RunLoopMode, current_block: CurrentBlockWatcher, + max_winners_per_auction: usize, ) -> Self { + // Added to make sure no more than one winner is activated by accident + // Should be removed once we decide to activate "multiple winners per auction" + // feature. + assert_eq!(max_winners_per_auction, 1, "only one winner is supported"); Self { orderbook, drivers, @@ -60,6 +72,7 @@ impl RunLoop { liveness, synchronization, current_block, + max_winners_per_auction, } } @@ -119,30 +132,35 @@ impl RunLoop { .orders .set(i64::try_from(auction.orders.len()).unwrap_or(i64::MAX)); - let mut participants = self.competition(auction).await; - - // Shuffle so that sorting randomly splits ties. - participants.shuffle(&mut rand::thread_rng()); - participants.sort_unstable_by_key(|participant| cmp::Reverse(participant.score())); + let participants = self.competition(auction).await; + let winners = self.select_winners(&participants); - if let Some(Participant { - driver, - solution: Ok(solution), - }) = participants.first() - { - let reference_score = participants - .get(1) - .map(|participant| participant.score()) + for (i, Participant { driver, solution }) in winners.iter().enumerate() { + let score = solution + .as_ref() + .map(|solution| solution.score.get()) .unwrap_or_default(); - let reward = solution - .score - .get() + let reference_score = winners + .get(i + 1) + .map(|winner| winner.score()) + .unwrap_or_else(|| { + // If this was the last winning solution pick the first worse overall + // solution that came from a different driver (or 0) as the reference score. + participants + .iter() + .filter(|p| p.driver.name != driver.name) + .filter_map(|p| p.solution.as_ref().ok()) + .map(|p| p.score.get()) + .find(|other_score| *other_score <= score) + .unwrap_or_default() + }); + let reward = score .checked_sub(reference_score) .expect("reference score unexpectedly larger than winner's score"); tracing::info!( driver =% driver.name, - score =% solution.score, + %score, %reward, "winner" ); @@ -190,11 +208,48 @@ impl RunLoop { let request = solve::Request::new(auction, &self.trusted_tokens.all(), self.solve_deadline); let request = &request; - futures::future::join_all(self.drivers.iter().map(|driver| async move { - let solution = self.participate(driver, request).await; - Participant { driver, solution } - })) - .await + let mut participants = + futures::future::join_all(self.drivers.iter().map(|driver| async move { + let solution = self.participate(driver, request).await; + Participant { driver, solution } + })) + .await; + + // Shuffle so that sorting randomly splits ties. + participants.shuffle(&mut rand::thread_rng()); + participants.sort_unstable_by_key(|participant| cmp::Reverse(participant.score())); + + participants + } + + /// Chooses the winners from the given participants. + /// + /// Participants are already sorted by their score (best to worst). + /// + /// Winners are selected one by one, starting from the best solution, + /// until `max_winners_per_auction` are selected. The solution is a winner + /// if it swaps tokens that are not yet swapped by any other already + /// selected winner. + fn select_winners<'a>(&self, participants: &'a [Participant<'a>]) -> Vec<&'a Participant<'a>> { + let mut winners = Vec::new(); + let mut already_swapped_tokens = HashSet::new(); + for participant in participants.iter() { + if let Ok(solution) = &participant.solution { + let swapped_tokens = solution + .orders() + .iter() + .flat_map(|(_, order)| vec![order.sell.token, order.buy.token]) + .collect::>(); + if swapped_tokens.is_disjoint(&already_swapped_tokens) { + winners.push(participant); + already_swapped_tokens.extend(swapped_tokens); + if winners.len() >= self.max_winners_per_auction { + break; + } + } + } + } + winners } /// Computes a driver's solutions in the shadow competition. @@ -207,7 +262,7 @@ impl RunLoop { .await .map_err(|_| Error::Timeout)? .map_err(Error::Solve)?; - let (score, solution_id, submission_address) = proposed + let (score, solution_id, submission_address, orders) = proposed .solutions .into_iter() .max_by_key(|solution| solution.score) @@ -216,11 +271,16 @@ impl RunLoop { solution.score, solution.solution_id, solution.submission_address, + solution.orders, ) }) .ok_or(Error::NoSolutions)?; let score = NonZeroU256::new(score).ok_or(Error::ZeroScore)?; + let orders = orders + .into_iter() + .map(|(order_uid, amounts)| (order_uid.into(), amounts.into_domain())) + .collect(); let revealed = driver .reveal(&reveal::Request { solution_id }) @@ -238,6 +298,7 @@ impl RunLoop { score, account: submission_address, calldata: revealed.calldata, + orders, }) } } @@ -247,12 +308,6 @@ struct Participant<'a> { solution: Result, } -struct Solution { - score: NonZeroU256, - account: H160, - calldata: reveal::Calldata, -} - impl Participant<'_> { fn score(&self) -> U256 { self.solution @@ -262,6 +317,19 @@ impl Participant<'_> { } } +struct Solution { + score: NonZeroU256, + account: H160, + calldata: reveal::Calldata, + orders: HashMap, +} + +impl Solution { + fn orders(&self) -> &HashMap { + &self.orders + } +} + #[derive(Debug, thiserror::Error)] enum Error { #[error("the solver timed out")]