diff --git a/crates/autopilot/src/boundary/mod.rs b/crates/autopilot/src/boundary/mod.rs index ff98acc9fd..916b854abb 100644 --- a/crates/autopilot/src/boundary/mod.rs +++ b/crates/autopilot/src/boundary/mod.rs @@ -21,7 +21,14 @@ pub use { }, shared::order_validation::{is_order_outside_market_price, Amounts}, }; -use {crate::domain, ethrpc::Web3, std::collections::HashMap, url::Url}; +use { + crate::domain, + ethrpc::Web3, + model::time::now_in_epoch_seconds, + number::conversions::u256_to_big_uint, + std::collections::{HashMap, HashSet}, + url::Url, +}; pub mod events; pub mod order; @@ -46,3 +53,56 @@ pub struct SolvableOrders { pub quotes: HashMap, pub latest_settlement_block: u64, } + +impl SolvableOrders { + pub fn combine_with(&self, other: Self) -> Self { + let mut orders = self.orders.clone(); + let mut quotes = self.quotes.clone(); + for (uid, new_order) in other.orders { + orders.insert(uid, new_order); + } + for (uid, quote) in other.quotes { + quotes.insert(uid, quote); + } + + let now = now_in_epoch_seconds(); + orders.retain(|_uid, order| { + let expired = || { + order.data.valid_to < now + || order + .metadata + .ethflow_data + .as_ref() + .is_some_and(|data| data.user_valid_to < now as i64) + }; + let onchain_error = || { + order + .metadata + .onchain_order_data + .as_ref() + .is_some_and(|data| data.placement_error.is_some()) + }; + let fulfilled = || match order.data.kind { + OrderKind::Sell => { + order.metadata.executed_sell_amount >= u256_to_big_uint(&order.data.sell_amount) + } + OrderKind::Buy => { + order.metadata.executed_buy_amount >= u256_to_big_uint(&order.data.buy_amount) + } + }; + + !order.metadata.invalidated && !onchain_error() && !expired() && !fulfilled() + }); + + let order_uids = orders.keys().collect::>(); + quotes.retain(|uid, _quote| order_uids.contains(uid)); + + Self { + orders, + quotes, + latest_settlement_block: self + .latest_settlement_block + .max(other.latest_settlement_block), + } + } +} diff --git a/crates/autopilot/src/database/auction.rs b/crates/autopilot/src/database/auction.rs index 9594a49a59..141788f87d 100644 --- a/crates/autopilot/src/database/auction.rs +++ b/crates/autopilot/src/database/auction.rs @@ -92,6 +92,43 @@ impl Postgres { }) } + pub async fn orders_after( + &self, + after_timestamp: DateTime, + min_valid_to: u32, + ) -> Result { + let _timer = super::Metrics::get() + .database_queries + .with_label_values(&["solvable_orders_after"]) + .start_timer(); + + let mut ex = self.pool.begin().await?; + // Set the transaction isolation level to REPEATABLE READ + // so the both SELECT queries below are executed in the same database snapshot + // taken at the moment before the first query is executed. + sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ") + .execute(ex.deref_mut()) + .await?; + + let orders: HashMap = + database::orders::full_orders_after(&mut ex, after_timestamp, min_valid_to as i64) + .map(|result| match result { + Ok(order) => full_order_into_model_order(order) + .map(|order| (domain::OrderUid(order.metadata.uid.0), order)), + Err(err) => Err(anyhow::Error::from(err)), + }) + .try_collect() + .await?; + let latest_settlement_block = + database::orders::latest_settlement_block(&mut ex).await? as u64; + let quotes = self.read_quotes(orders.keys()).await?; + Ok(boundary::SolvableOrders { + orders, + quotes, + latest_settlement_block, + }) + } + pub async fn replace_current_auction(&self, auction: &dto::Auction) -> Result { let _timer = super::Metrics::get() .database_queries diff --git a/crates/autopilot/src/infra/persistence/mod.rs b/crates/autopilot/src/infra/persistence/mod.rs index bd743bf1b7..d064073662 100644 --- a/crates/autopilot/src/infra/persistence/mod.rs +++ b/crates/autopilot/src/infra/persistence/mod.rs @@ -9,7 +9,6 @@ use { boundary::database::byte_array::ByteArray, chrono::{DateTime, Utc}, database::{order_events::OrderEventLabel, settlement_observations::Observation}, - futures::TryStreamExt, number::conversions::{big_decimal_to_u256, u256_to_big_decimal}, primitive_types::{H160, H256}, std::{ @@ -69,6 +68,17 @@ impl Persistence { .map_err(DatabaseError) } + pub async fn orders_after( + &self, + after_timestamp: DateTime, + min_valid_to: u32, + ) -> Result { + self.postgres + .orders_after(after_timestamp, min_valid_to) + .await + .map_err(DatabaseError) + } + pub async fn read_quotes( &self, orders: impl Iterator, @@ -390,38 +400,6 @@ impl Persistence { Ok(solution) } - pub async fn orders_after( - &self, - after_timestamp: DateTime, - min_valid_to: i64, - ) -> anyhow::Result> { - let _timer = Metrics::get() - .database_queries - .with_label_values(&["orders_after"]) - .start_timer(); - let mut ex = self.postgres.pool.acquire().await.context("begin")?; - Ok( - database::orders::full_orders_after(&mut ex, after_timestamp, min_valid_to) - .try_collect() - .await?, - ) - } - - pub async fn trades_after( - &self, - after_block: i64, - ) -> anyhow::Result> { - let _timer = Metrics::get() - .database_queries - .with_label_values(&["trades_after"]) - .start_timer(); - let mut ex = self.postgres.pool.acquire().await.context("begin")?; - Ok(database::trades::trades_after(&mut ex, after_block) - .map_ok(|trade| (domain::OrderUid(trade.order_uid.0), trade)) - .try_collect() - .await?) - } - /// Returns the oldest settlement event for which the accociated auction is /// not yet populated in the database. pub async fn get_settlement_without_auction( diff --git a/crates/autopilot/src/solvable_orders.rs b/crates/autopilot/src/solvable_orders.rs index c18fdb4228..19e2b3543a 100644 --- a/crates/autopilot/src/solvable_orders.rs +++ b/crates/autopilot/src/solvable_orders.rs @@ -17,7 +17,7 @@ use { signature::Signature, time::now_in_epoch_seconds, }, - number::conversions::{big_decimal_to_u256, u256_to_big_decimal}, + number::conversions::u256_to_big_decimal, primitive_types::{H160, H256, U256}, prometheus::{ Histogram, @@ -31,7 +31,6 @@ use { shared::{ account_balances::{BalanceFetching, Query}, bad_token::BadTokenDetecting, - db_order_conversions::full_order_into_model_order, price_estimation::{ native::NativePriceEstimating, native_price_cache::CachingNativePriceEstimator, @@ -167,92 +166,6 @@ impl SolvableOrdersCache { .map(|inner| inner.auction.clone()) } - fn build_solvable_orders( - current_orders: &boundary::SolvableOrders, - new_orders: Vec, - mut new_trades: HashMap, - new_quotes: HashMap, - ) -> Result { - let now = now_in_epoch_seconds(); - let mut orders = current_orders.orders.clone(); - // Remove expired orders. - orders.retain(|_uid, order| { - order.data.valid_to >= now - && !order - .metadata - .ethflow_data - .as_ref() - .is_some_and(|data| data.user_valid_to < now as i64) - }); - let mut quotes = current_orders.quotes.clone(); - let mut latest_trade_block = current_orders.latest_settlement_block; - - for new_order in new_orders { - let uid = domain::OrderUid(new_order.uid.0); - - if new_order.onchain_placement_error.is_some() || new_order.invalidated { - orders.remove(&uid); - quotes.remove(&uid); - continue; - } - - // Drop already visited trades to slightly reduce time complexity. - let Some(trade_amounts) = new_trades.remove(&uid) else { - continue; - }; - - let fulfilled = match new_order.kind { - database::orders::OrderKind::Sell => { - trade_amounts.sell_amount >= new_order.sell_amount - } - database::orders::OrderKind::Buy => { - trade_amounts.buy_amount >= new_order.buy_amount - } - }; - - if !fulfilled { - let order = full_order_into_model_order(new_order).map_err(anyhow::Error::from)?; - orders.insert(uid, order); - - if let Some(new_quote) = new_quotes.get(&uid) { - quotes.insert(uid, new_quote.clone()); - } - } else { - orders.remove(&uid); - quotes.remove(&uid); - } - latest_trade_block = latest_trade_block.max(trade_amounts.block_number as u64); - } - - // Iterate over remaining trades to drop orders that are already fulfilled. - for (uid, trade_amounts) in new_trades { - if let Some(order) = orders.get(&uid) { - let fulfilled = match order.data.kind { - model::order::OrderKind::Sell => { - big_decimal_to_u256(&trade_amounts.sell_amount).context("U256 overflow")? - >= order.data.sell_amount - } - model::order::OrderKind::Buy => { - big_decimal_to_u256(&trade_amounts.buy_amount).context("U256 overflow")? - >= order.data.buy_amount - } - }; - - if fulfilled { - orders.remove(&uid); - quotes.remove(&uid); - } - } - latest_trade_block = latest_trade_block.max(trade_amounts.block_number as u64); - } - - Ok(boundary::SolvableOrders { - orders, - quotes, - latest_settlement_block: latest_trade_block, - }) - } - /// Manually update solvable orders. Usually called by the background /// updating task. /// @@ -265,7 +178,11 @@ impl SolvableOrdersCache { let db_solvable_orders = { let lock = self.cache.lock().await; if let Some(cache) = &*lock { - self.updated_solvable_orders(min_valid_to, cache).await? + let new_orders = self + .persistence + .orders_after(cache.last_order_creation_timestamp, min_valid_to) + .await?; + cache.solvable_orders.combine_with(new_orders) } else { self.persistence.all_solvable_orders(min_valid_to).await? } @@ -420,27 +337,23 @@ impl SolvableOrdersCache { Ok(()) } - async fn updated_solvable_orders( - &self, - min_valid_to: u32, - cache: &Inner, - ) -> Result { - let new_orders_fut = self - .persistence - .orders_after(cache.last_order_creation_timestamp, min_valid_to as i64); - let new_trades_fut = self.persistence.trades_after( - i64::try_from(cache.solvable_orders.latest_settlement_block) - .context("block number value exceeds i64")?, - ); - let (new_orders, new_trades) = tokio::try_join!(new_orders_fut, new_trades_fut)?; - let order_uids = new_orders - .iter() - .map(|order| domain::OrderUid(order.uid.0)) - .collect::>(); - let quotes = self.persistence.read_quotes(order_uids.iter()).await?; - - Self::build_solvable_orders(&cache.solvable_orders, new_orders, new_trades, quotes) - } + // async fn updated_solvable_orders( + // &self, + // min_valid_to: u32, + // cache: &Inner, + // ) -> Result { + // let new_orders_fut = self + // .persistence + // .orders_after(cache.last_order_creation_timestamp, min_valid_to as + // i64); let (new_orders, new_trades) = tokio::try_join!(new_orders_fut, + // new_trades_fut)?; let order_uids = new_orders + // .iter() + // .map(|order| domain::OrderUid(order.uid.0)) + // .collect::>(); + // let quotes = self.persistence.read_quotes(order_uids.iter()).await?; + // + // Self::build_solvable_orders(&cache.solvable_orders, new_orders, + // new_trades, quotes) } async fn fetch_balances(&self, queries: Vec) -> HashMap { let fetched_balances = { diff --git a/crates/database/src/orders.rs b/crates/database/src/orders.rs index b2492eaed5..9b198219b8 100644 --- a/crates/database/src/orders.rs +++ b/crates/database/src/orders.rs @@ -701,7 +701,7 @@ pub fn full_orders_after( " LEFT OUTER JOIN ethflow_orders eth_o on eth_o.uid = o.uid ", " WHERE (o.creation_timestamp > $1 OR o.cancellation_timestamp > $1)", " AND o.valid_to >= $2", - " AND CASE WHEN eth_o.valid_to IS NULL THEN true ELSE eth_o.valid_to >= $1 END", + " AND CASE WHEN eth_o.valid_to IS NULL THEN true ELSE eth_o.valid_to >= $2 END", ); sqlx::query_as(QUERY) .bind(after_timestamp) @@ -1619,7 +1619,7 @@ mod tests { #[tokio::test] #[ignore] - async fn postgres_orders_without_trades_after() { + async fn postgres_orders_after() { let mut db = PgConnection::connect("postgresql://").await.unwrap(); let mut db = db.begin().await.unwrap(); crate::clear_DANGER_(&mut db).await.unwrap(); diff --git a/crates/database/src/trades.rs b/crates/database/src/trades.rs index b55b455248..bb39ea8f1a 100644 --- a/crates/database/src/trades.rs +++ b/crates/database/src/trades.rs @@ -75,32 +75,13 @@ ON o.uid = t.order_uid"#; .fetch(ex) } -pub fn trades_after( - ex: &mut PgConnection, - after_block: i64, -) -> BoxStream<'_, Result> { - const QUERY: &str = r#" -SELECT - MAX(t.block_number) as block_number, - t.order_uid, - SUM(t.buy_amount) as buy_amount, - SUM(t.sell_amount) as sell_amount, - SUM(t.fee_amount) as fee_amount -FROM trades t -WHERE t.block_number > $1 -GROUP BY t.order_uid -"#; - - sqlx::query_as(QUERY).bind(after_block).fetch(ex) -} - #[cfg(test)] mod tests { use { super::*, crate::{ byte_array::ByteArray, - events::{self, Event, EventIndex, Settlement, Trade}, + events::{Event, EventIndex, Settlement, Trade}, onchain_broadcasted_orders::{insert_onchain_order, OnchainOrderPlacement}, orders::Order, PgTransaction, @@ -351,98 +332,6 @@ mod tests { assert_trades(&mut db, Some(&owners[0]), None, &[]).await; } - #[tokio::test] - #[ignore] - async fn postgres_trade_after() { - let mut db = PgConnection::connect("postgresql://").await.unwrap(); - let mut db = db.begin().await.unwrap(); - crate::clear_DANGER_(&mut db).await.unwrap(); - - async fn get_trades_after( - ex: &mut PgConnection, - min_block: i64, - ) -> Result, sqlx::Error> { - trades_after(ex, min_block).try_collect().await - } - - let (index_a, event_a) = { - ( - EventIndex { - block_number: 1, - ..Default::default() - }, - Trade { - order_uid: ByteArray([1u8; 56]), - sell_amount_including_fee: BigDecimal::from(10), - buy_amount: BigDecimal::from(100), - fee_amount: BigDecimal::from(1), - }, - ) - }; - let (index_b, event_b) = { - ( - EventIndex { - block_number: 2, - ..Default::default() - }, - Trade { - order_uid: ByteArray([1u8; 56]), - sell_amount_including_fee: BigDecimal::from(20), - buy_amount: BigDecimal::from(200), - fee_amount: BigDecimal::from(2), - }, - ) - }; - let (index_c, event_c) = { - ( - EventIndex { - block_number: 1, - log_index: 1, - }, - Trade { - order_uid: ByteArray([2u8; 56]), - sell_amount_including_fee: BigDecimal::from(40), - buy_amount: BigDecimal::from(400), - fee_amount: BigDecimal::from(4), - }, - ) - }; - - events::insert_trade(&mut db, &index_a, &event_a) - .await - .unwrap(); - events::insert_trade(&mut db, &index_b, &event_b) - .await - .unwrap(); - events::insert_trade(&mut db, &index_c, &event_c) - .await - .unwrap(); - - assert!(get_trades_after(&mut db, 2).await.unwrap().is_empty()); - - let mut result = get_trades_after(&mut db, 0).await.unwrap(); - result.sort_by_key(|t| t.block_number); - assert_eq!( - result, - vec![ - TradedAmounts { - block_number: 1, - order_uid: ByteArray([2u8; 56]), - sell_amount: BigDecimal::from(40), - buy_amount: BigDecimal::from(400), - fee_amount: BigDecimal::from(4), - }, - TradedAmounts { - block_number: 2, - order_uid: ByteArray([1u8; 56]), - sell_amount: BigDecimal::from(30), - buy_amount: BigDecimal::from(300), - fee_amount: BigDecimal::from(3), - }, - ] - ); - } - // Testing Trades with settlements async fn add_settlement( ex: &mut PgTransaction<'_>, diff --git a/crates/number/src/conversions.rs b/crates/number/src/conversions.rs index 747e533d6d..5283a86cbd 100644 --- a/crates/number/src/conversions.rs +++ b/crates/number/src/conversions.rs @@ -41,17 +41,13 @@ pub fn big_rational_to_u256(ratio: &BigRational) -> Result { pub fn u256_to_big_decimal(u256: &U256) -> BigDecimal { let big_uint = u256_to_big_uint(u256); - big_uint_to_big_decimal(&big_uint) + BigDecimal::from(BigInt::from(big_uint)) } pub fn big_decimal_to_big_uint(big_decimal: &BigDecimal) -> Option { big_decimal.to_bigint()?.try_into().ok() } -pub fn big_uint_to_big_decimal(big_uint: &BigUint) -> BigDecimal { - BigDecimal::from(BigInt::from(big_uint.clone())) -} - pub fn big_decimal_to_u256(big_decimal: &BigDecimal) -> Option { if !big_decimal.is_integer() { return None;