From ed09b91d4f0d8dd89c8fe16feed92be31e0a181b Mon Sep 17 00:00:00 2001 From: ilya Date: Thu, 5 Sep 2024 17:23:20 +0300 Subject: [PATCH] Incremental solvable orders cache update (#2923) # Description > Updating solvable orders (ie creating a new auction) currently takes >2s with some pretty heavy outliers ([logs](https://production-6de61f.kb.eu-central-1.aws.cloud.es.io/app/r/s/ALsEK)) > > This makes it hard to bring CoW protocol's auction rate down to one batch per block as simply creating up to date state would take >15% of the total time we have at hand. We should at least be able to half this time (if not getting it down even more) In order to relieve the situation, it was proposed to introduce incremental solvable orders cache update, which selects all the solvable orders using the old heavy query only at startup, stores the latest received order's creation timestamp in memory, and then makes much faster incremental bounded queries to the orders and additional tables that select fewer data and executes faster. # Changes Since incremental fetching retrieves orders created/cancelled after the specific timestamps, it is also required now to fetch orders that have any onchain update based on the last fetched block number. Having said that, the data needs to be fetched within a single TX, so there is no way to run all the queries in parallel. 1. If the current solvable orders cache is empty, execute the original heavy SQL query to fetch all current solvable orders and store them in memory. 2. Otherwise, fetch full orders that created or cancelled after the last stored timestamp and also find UIDs of the order that have any onchain data updated after the latest observed block number. This includes fetching data from the following tables: trades, ethflow_data, order_execution, invalidations, onchain_order_invalidations, onchain_placed_orders, presignature_events. 3. Fetch quotes for all the collected orders. 4. Add all the newly received orders to the cache. 5. Filter out all the orders that are one of: contain on-chain errors, expired, fulfilled, invalidated. 6. Calculate the latest observed order creation timestamp. 7. Continue with the regular auction creation process. As a result, we now have 3 SQL queries where each executes in ~50ms instead of a single one taking ~2s. ## How to test New DB tests. Existing e2e tests. ## Related Issues Fixes #2831 --- Cargo.lock | 1 + crates/autopilot/src/boundary/mod.rs | 2 +- crates/autopilot/src/database/auction.rs | 37 +- crates/autopilot/src/database/quotes.rs | 5 + crates/autopilot/src/infra/persistence/mod.rs | 147 +++++++- crates/autopilot/src/run_loop.rs | 2 +- crates/autopilot/src/solvable_orders.rs | 133 +++++-- crates/database/Cargo.toml | 1 + crates/database/src/orders.rs | 347 +++++++++++++++++- crates/orderbook/src/database/orders.rs | 5 +- ...ate_indexes_for_solvable_orders_search.sql | 4 + 11 files changed, 607 insertions(+), 77 deletions(-) create mode 100644 database/sql/V069__create_indexes_for_solvable_orders_search.sql diff --git a/Cargo.lock b/Cargo.lock index 8423435c94..7e2c79d5f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1659,6 +1659,7 @@ dependencies = [ "futures", "hex", "hex-literal", + "maplit", "sqlx", "strum", "tokio", diff --git a/crates/autopilot/src/boundary/mod.rs b/crates/autopilot/src/boundary/mod.rs index b89ec8b3bc..a139335689 100644 --- a/crates/autopilot/src/boundary/mod.rs +++ b/crates/autopilot/src/boundary/mod.rs @@ -36,7 +36,7 @@ pub fn web3_client(ethrpc: &Url, ethrpc_args: &shared::ethrpc::Arguments) -> Web } pub struct SolvableOrders { - pub orders: Vec, + pub orders: HashMap, pub quotes: HashMap, pub latest_settlement_block: u64, } diff --git a/crates/autopilot/src/database/auction.rs b/crates/autopilot/src/database/auction.rs index 9230ce72a3..7ea8533ca6 100644 --- a/crates/autopilot/src/database/auction.rs +++ b/crates/autopilot/src/database/auction.rs @@ -5,12 +5,13 @@ use { chrono::{DateTime, Utc}, futures::{StreamExt, TryStreamExt}, model::{order::Order, quote::QuoteId}, + num::ToPrimitive, shared::{ db_order_conversions::full_order_into_model_order, event_storing_helpers::{create_db_search_parameters, create_quote_row}, order_quoting::{QuoteData, QuoteSearchParameters, QuoteStoring}, }, - std::ops::DerefMut, + std::{collections::HashMap, ops::DerefMut}, }; #[async_trait::async_trait] @@ -60,7 +61,7 @@ impl QuoteStoring for Postgres { } impl Postgres { - pub async fn solvable_orders(&self, min_valid_to: u32) -> Result { + pub async fn all_solvable_orders(&self, min_valid_to: u32) -> Result { let _timer = super::Metrics::get() .database_queries .with_label_values(&["solvable_orders"]) @@ -73,24 +74,20 @@ impl Postgres { sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ") .execute(ex.deref_mut()) .await?; - let orders: Vec = database::orders::solvable_orders(&mut ex, min_valid_to as i64) - .map(|result| match result { - Ok(order) => full_order_into_model_order(order), - Err(err) => Err(anyhow::Error::from(err)), - }) - .try_collect() - .await?; - let latest_settlement_block = - database::orders::latest_settlement_block(&mut ex).await? as u64; - let quotes = self - .read_quotes( - orders - .iter() - .map(|order| domain::OrderUid(order.metadata.uid.0)) - .collect::>() - .iter(), - ) - .await?; + let orders: HashMap = + database::orders::solvable_orders(&mut ex, i64::from(min_valid_to)) + .map(|result| match result { + Ok(order) => full_order_into_model_order(order) + .map(|order| (domain::OrderUid(order.metadata.uid.0), order)), + Err(err) => Err(anyhow::Error::from(err)), + }) + .try_collect() + .await?; + let latest_settlement_block = database::orders::latest_settlement_block(&mut ex) + .await? + .to_u64() + .context("latest_settlement_block is not u64")?; + let quotes = self.read_quotes(orders.keys()).await?; Ok(boundary::SolvableOrders { orders, quotes, diff --git a/crates/autopilot/src/database/quotes.rs b/crates/autopilot/src/database/quotes.rs index 79a6ad15dc..4205f51780 100644 --- a/crates/autopilot/src/database/quotes.rs +++ b/crates/autopilot/src/database/quotes.rs @@ -30,6 +30,11 @@ impl Postgres { &self, orders: impl Iterator, ) -> Result, sqlx::Error> { + let _timer = super::Metrics::get() + .database_queries + .with_label_values(&["read_quotes"]) + .start_timer(); + let mut ex = self.pool.acquire().await?; let order_uids: Vec<_> = orders.map(|uid| ByteArray(uid.0)).collect(); let quotes: HashMap<_, _> = database::orders::read_quotes(&mut ex, &order_uids) diff --git a/crates/autopilot/src/infra/persistence/mod.rs b/crates/autopilot/src/infra/persistence/mod.rs index 9e93f1f8d9..c855843eab 100644 --- a/crates/autopilot/src/infra/persistence/mod.rs +++ b/crates/autopilot/src/infra/persistence/mod.rs @@ -6,8 +6,9 @@ use { infra::persistence::dto::AuctionId, }, anyhow::Context, + bigdecimal::ToPrimitive, boundary::database::byte_array::ByteArray, - chrono::Utc, + chrono::{DateTime, Utc}, database::{ order_events::OrderEventLabel, orders::{ @@ -22,10 +23,14 @@ use { SellTokenSource as DomainSellTokenSource, SigningScheme as DomainSigningScheme, }, - number::conversions::{big_decimal_to_u256, u256_to_big_decimal}, + futures::{StreamExt, TryStreamExt}, + itertools::Itertools, + number::conversions::{big_decimal_to_u256, u256_to_big_decimal, u256_to_big_uint}, primitive_types::{H160, H256}, + shared::db_order_conversions::full_order_into_model_order, std::{ collections::{HashMap, HashSet}, + ops::DerefMut, sync::Arc, }, tracing::Instrument, @@ -71,12 +76,13 @@ impl Persistence { .map_err(DatabaseError) } - pub async fn solvable_orders( + /// Finds solvable orders based on the order's min validity period. + pub async fn all_solvable_orders( &self, min_valid_to: u32, ) -> Result { self.postgres - .solvable_orders(min_valid_to) + .all_solvable_orders(min_valid_to) .await .map_err(DatabaseError) } @@ -395,6 +401,139 @@ impl Persistence { Ok(solution) } + /// Computes solvable orders based on the latest observed block number, + /// order creation timestamp, and minimum validity period. + pub async fn solvable_orders_after( + &self, + current_orders: HashMap, + after_timestamp: DateTime, + after_block: u64, + min_valid_to: u32, + ) -> anyhow::Result { + let after_block = i64::try_from(after_block).context("block number value exceeds i64")?; + let mut tx = self.postgres.pool.begin().await.context("begin")?; + // Set the transaction isolation level to REPEATABLE READ + // so all the SELECT queries below are executed in the same database snapshot + // taken at the moment before the first query is executed. + sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ") + .execute(tx.deref_mut()) + .await?; + + // Fetch the orders that were updated after the given block and were created or + // cancelled after the given timestamp. + let next_orders: HashMap = { + let _timer = Metrics::get() + .database_queries + .with_label_values(&["open_orders_after"]) + .start_timer(); + + database::orders::open_orders_after(&mut tx, after_block, after_timestamp) + .map(|result| match result { + Ok(order) => full_order_into_model_order(order) + .map(|order| (domain::OrderUid(order.metadata.uid.0), order)), + Err(err) => Err(anyhow::Error::from(err)), + }) + .try_collect() + .await? + }; + + // Fetch quotes for new orders and also update them for the cached ones since + // they could also be updated. + let updated_quotes = { + let _timer = Metrics::get() + .database_queries + .with_label_values(&["read_quotes"]) + .start_timer(); + + let all_order_uids = next_orders + .keys() + .chain(current_orders.keys()) + .unique() + .map(|uid| ByteArray(uid.0)) + .collect::>(); + + database::orders::read_quotes(&mut tx, &all_order_uids) + .await? + .into_iter() + .filter_map(|quote| { + let order_uid = domain::OrderUid(quote.order_uid.0); + dto::quote::into_domain(quote) + .map_err(|err| { + tracing::warn!(?order_uid, ?err, "failed to convert quote from db") + }) + .ok() + .map(|quote| (order_uid, quote)) + }) + .collect() + }; + + let latest_settlement_block = database::orders::latest_settlement_block(&mut tx) + .await? + .to_u64() + .context("latest_settlement_block is not u64")?; + + Self::build_solvable_orders( + current_orders, + next_orders, + updated_quotes, + latest_settlement_block, + min_valid_to, + ) + } + + fn build_solvable_orders( + mut current_orders: HashMap, + next_orders: HashMap, + mut next_quotes: HashMap, + latest_settlement_block: u64, + min_valid_to: u32, + ) -> anyhow::Result { + // Blindly insert all new orders into the cache. + for (uid, order) in next_orders { + current_orders.insert(uid, order); + } + + // Filter out all the invalid orders. + current_orders.retain(|_uid, order| { + let expired = order.data.valid_to < min_valid_to + || order + .metadata + .ethflow_data + .as_ref() + .is_some_and(|data| data.user_valid_to < i64::from(min_valid_to)); + + let invalidated = order.metadata.invalidated; + let onchain_error = order + .metadata + .onchain_order_data + .as_ref() + .is_some_and(|data| data.placement_error.is_some()); + let fulfilled = { + match order.data.kind { + model::order::OrderKind::Sell => { + order.metadata.executed_sell_amount + >= u256_to_big_uint(&order.data.sell_amount) + } + model::order::OrderKind::Buy => { + order.metadata.executed_buy_amount + >= u256_to_big_uint(&order.data.buy_amount) + } + } + }; + + !expired && !invalidated && !onchain_error && !fulfilled + }); + + // Keep only relevant quotes. + next_quotes.retain(|uid, _quote| current_orders.contains_key(uid)); + + Ok(boundary::SolvableOrders { + orders: current_orders, + quotes: next_quotes, + latest_settlement_block, + }) + } + /// Returns the oldest settlement event for which the accociated auction is /// not yet populated in the database. pub async fn get_settlement_without_auction( diff --git a/crates/autopilot/src/run_loop.rs b/crates/autopilot/src/run_loop.rs index 363ec566ca..8d7a85bda8 100644 --- a/crates/autopilot/src/run_loop.rs +++ b/crates/autopilot/src/run_loop.rs @@ -150,7 +150,7 @@ impl RunLoop { } async fn cut_auction(&self) -> Option { - let auction = match self.solvable_orders_cache.current_auction() { + let auction = match self.solvable_orders_cache.current_auction().await { Some(auction) => auction, None => { tracing::debug!("no current auction"); diff --git a/crates/autopilot/src/solvable_orders.rs b/crates/autopilot/src/solvable_orders.rs index 1db6fb7209..aa129c868f 100644 --- a/crates/autopilot/src/solvable_orders.rs +++ b/crates/autopilot/src/solvable_orders.rs @@ -1,10 +1,12 @@ use { crate::{ + boundary::{self, SolvableOrders}, domain::{self, auction::Price, eth}, infra::{self, banned}, }, - anyhow::Result, + anyhow::{Context, Result}, bigdecimal::BigDecimal, + chrono::{DateTime, Utc}, database::order_events::OrderEventLabel, ethrpc::block_stream::CurrentBlockWatcher, futures::future::join_all, @@ -38,11 +40,11 @@ use { }, std::{ collections::{btree_map::Entry, BTreeMap, HashMap, HashSet}, - sync::{Arc, Mutex, Weak}, + sync::{Arc, Weak}, time::Duration, }, strum::VariantNames, - tokio::time::Instant, + tokio::{sync::Mutex, time::Instant}, tracing::Instrument, }; @@ -91,7 +93,7 @@ pub struct SolvableOrdersCache { banned_users: banned::Users, balance_fetcher: Arc, bad_token_detector: Arc, - cache: Mutex, + cache: Mutex>, native_price_estimator: Arc, signature_validator: Arc, metrics: &'static Metrics, @@ -104,8 +106,9 @@ pub struct SolvableOrdersCache { type Balances = HashMap; struct Inner { - auction: Option, - update_time: Instant, + auction: domain::Auction, + solvable_orders: boundary::SolvableOrders, + last_order_creation_timestamp: DateTime, } impl SolvableOrdersCache { @@ -129,10 +132,7 @@ impl SolvableOrdersCache { banned_users, balance_fetcher, bad_token_detector, - cache: Mutex::new(Inner { - auction: None, - update_time: Instant::now(), - }), + cache: Mutex::new(None), native_price_estimator, signature_validator, metrics: Metrics::instance(observe::metrics::get_storage_registry()).unwrap(), @@ -157,8 +157,12 @@ impl SolvableOrdersCache { ); } - pub fn current_auction(&self) -> Option { - self.cache.lock().unwrap().auction.clone() + pub async fn current_auction(&self) -> Option { + self.cache + .lock() + .await + .as_ref() + .map(|inner| inner.auction.clone()) } /// Manually update solvable orders. Usually called by the background @@ -169,30 +173,28 @@ impl SolvableOrdersCache { /// other's results. pub async fn update(&self, block: u64) -> Result<()> { let start = Instant::now(); - let min_valid_to = now_in_epoch_seconds() + self.min_order_validity_period.as_secs() as u32; - let db_solvable_orders = self.persistence.solvable_orders(min_valid_to).await?; - let mut counter = OrderFilterCounter::new(self.metrics, &db_solvable_orders.orders); + let (db_solvable_orders, latest_creation_timestamp) = self.get_solvable_orders().await?; + + let orders = db_solvable_orders + .orders + .values() + .cloned() + .collect::>(); + + let mut counter = OrderFilterCounter::new(self.metrics, &orders); let mut invalid_order_uids = HashSet::new(); let mut filtered_order_events = Vec::new(); let (balances, orders, cow_amms) = { - let queries = db_solvable_orders - .orders - .iter() - .map(Query::from_order) - .collect::>(); + let queries = orders.iter().map(Query::from_order).collect::>(); let cow_amms_fut = async { let _timer = self.stage_timer("cow_amm_registry"); self.cow_amm_registry.amms().await }; tokio::join!( self.fetch_balances(queries), - self.filter_invalid_orders( - db_solvable_orders.orders, - &mut counter, - &mut invalid_order_uids, - ), + self.filter_invalid_orders(orders, &mut counter, &mut invalid_order_uids,), cow_amms_fut ) }; @@ -301,10 +303,11 @@ impl SolvableOrdersCache { .collect::>()?, surplus_capturing_jit_order_owners, }; - *self.cache.lock().unwrap() = Inner { - auction: Some(auction), - update_time: Instant::now(), - }; + *self.cache.lock().await = Some(Inner { + auction, + solvable_orders: db_solvable_orders, + last_order_creation_timestamp: latest_creation_timestamp, + }); tracing::debug!(%block, "updated current auction cache"); self.metrics @@ -337,6 +340,74 @@ impl SolvableOrdersCache { .collect() } + /// Returns current solvable orders along with the latest order creation + /// timestamp. + async fn get_solvable_orders(&self) -> Result<(SolvableOrders, DateTime)> { + const INITIAL_ORDER_CREATION_TIMESTAMP: DateTime = DateTime::::MIN_UTC; + + // A new auction should be created regardless of whether new solvable orders are + // found. The incremental solvable orders cache updater should only be + // enabled after the initial full SQL query + // (`persistence::all_solvable_orders`) returned some orders. Until then, + // `MIN_UTC` is used to indicate that no orders have been found yet by + // (`persistence::all_solvable_orders`). This prevents situations where + // starting the service with a large existing DB would cause + // the incremental query to load all unfiltered orders into memory, potentially + // leading to OOM issues because incremental query doesn't filter out + // expired/invalid orders in the SQL query and basically can return the whole + // table when filters with default values are used. + let (db_solvable_orders, previous_creation_timestamp) = { + let cache_data = { + let lock = self.cache.lock().await; + match &*lock { + Some(cache) + if cache.last_order_creation_timestamp + > INITIAL_ORDER_CREATION_TIMESTAMP => + { + Some(( + cache.solvable_orders.orders.clone(), + cache.last_order_creation_timestamp, + cache.solvable_orders.latest_settlement_block, + )) + } + _ => None, + } + }; + + let min_valid_to = now_in_epoch_seconds() + + u32::try_from(self.min_order_validity_period.as_secs()) + .context("min_order_validity_period is not u32")?; + match cache_data { + Some((current_orders, last_order_creation_timestamp, latest_settlement_block)) => ( + self.persistence + .solvable_orders_after( + current_orders, + last_order_creation_timestamp, + latest_settlement_block, + min_valid_to, + ) + .await?, + last_order_creation_timestamp, + ), + None => ( + self.persistence.all_solvable_orders(min_valid_to).await?, + INITIAL_ORDER_CREATION_TIMESTAMP, + ), + } + }; + + let latest_creation_timestamp = db_solvable_orders + .orders + .values() + .map(|order| order.metadata.creation_date) + .max() + .map_or(previous_creation_timestamp, |max_creation_timestamp| { + std::cmp::max(max_creation_timestamp, previous_creation_timestamp) + }); + + Ok((db_solvable_orders, latest_creation_timestamp)) + } + /// Executed orders filtering in parallel. async fn filter_invalid_orders( &self, @@ -373,10 +444,6 @@ impl SolvableOrdersCache { orders } - pub fn last_update_time(&self) -> Instant { - self.cache.lock().unwrap().update_time - } - pub fn track_auction_update(&self, result: &str) { self.metrics .auction_update diff --git a/crates/database/Cargo.toml b/crates/database/Cargo.toml index 86efbc346c..59d00f0745 100644 --- a/crates/database/Cargo.toml +++ b/crates/database/Cargo.toml @@ -16,6 +16,7 @@ strum = { workspace = true } [dev-dependencies] hex-literal = { workspace = true } +maplit = { workspace = true } tokio = { workspace = true, features = ["macros"] } [lints] diff --git a/crates/database/src/orders.rs b/crates/database/src/orders.rs index 6079d37753..9998f94fe6 100644 --- a/crates/database/src/orders.rs +++ b/crates/database/src/orders.rs @@ -687,6 +687,46 @@ pub fn solvable_orders( sqlx::query_as(OPEN_ORDERS).bind(min_valid_to).fetch(ex) } +pub fn open_orders_after( + ex: &mut PgConnection, + after_block: i64, + after_timestamp: DateTime, +) -> BoxStream<'_, Result> { + const UPDATED_UIDS_QUERY: &str = r#" +WITH updated_uids AS ( + SELECT DISTINCT order_uid FROM ( + SELECT order_uid FROM trades WHERE block_number > $1 + UNION + SELECT order_uid FROM order_execution WHERE block_number > $1 + UNION + SELECT order_uid FROM invalidations WHERE block_number > $1 + UNION + SELECT uid AS order_uid FROM onchain_order_invalidations WHERE block_number > $1 + UNION + SELECT uid AS order_uid FROM onchain_placed_orders WHERE block_number > $1 + UNION + SELECT order_uid FROM ethflow_refunds WHERE block_number > $1 + UNION + SELECT order_uid FROM presignature_events WHERE block_number > $1 + ) +) + "#; + + #[rustfmt::skip] + const OPEN_ORDERS_AFTER: &str = const_format::concatcp!( + UPDATED_UIDS_QUERY, + " SELECT ", ORDERS_SELECT, + " FROM ", ORDERS_FROM, + " LEFT OUTER JOIN ethflow_orders eth_o on eth_o.uid = o.uid ", + " WHERE (o.creation_timestamp > $2 OR o.cancellation_timestamp > $2 OR o.uid IN (SELECT order_uid FROM updated_uids))", + ); + + sqlx::query_as(OPEN_ORDERS_AFTER) + .bind(after_block) + .bind(after_timestamp) + .fetch(ex) +} + pub async fn latest_settlement_block(ex: &mut PgConnection) -> Result { const QUERY: &str = r#" SELECT COALESCE(MAX(block_number), 0) @@ -773,9 +813,11 @@ mod tests { PgTransaction, }, bigdecimal::num_bigint::{BigInt, ToBigInt}, - chrono::{TimeZone, Utc}, + chrono::{Duration, TimeZone, Utc}, futures::{StreamExt, TryStreamExt}, + maplit::hashset, sqlx::Connection, + std::collections::HashSet, }; async fn read_order_interactions( @@ -1340,11 +1382,12 @@ mod tests { sell_amount: 1.into(), buy_amount: 1.into(), signing_scheme: SigningScheme::PreSign, + creation_timestamp: Utc::now(), ..Default::default() }; insert_order(&mut db, &order).await.unwrap(); - async fn get_order(ex: &mut PgConnection) -> Option { + async fn get_full_order(ex: &mut PgConnection) -> Option { solvable_orders(ex, 0).next().await.transpose().unwrap() } @@ -1370,19 +1413,19 @@ mod tests { } // not solvable because there is no presignature event. - assert!(get_order(&mut db).await.unwrap().presignature_pending); + assert!(get_full_order(&mut db).await.unwrap().presignature_pending); // solvable because once presignature event is observed. pre_signature_event(&mut db, 0, order.owner, order.uid, true).await; - assert!(!get_order(&mut db).await.unwrap().presignature_pending); + assert!(!get_full_order(&mut db).await.unwrap().presignature_pending); // not solvable because "unsigned" presignature event. pre_signature_event(&mut db, 1, order.owner, order.uid, false).await; - assert!(get_order(&mut db).await.unwrap().presignature_pending); + assert!(get_full_order(&mut db).await.unwrap().presignature_pending); // solvable once again because of new presignature event. pre_signature_event(&mut db, 2, order.owner, order.uid, true).await; - assert!(!get_order(&mut db).await.unwrap().presignature_pending); + assert!(!get_full_order(&mut db).await.unwrap().presignature_pending); } #[tokio::test] @@ -1430,11 +1473,12 @@ mod tests { buy_amount: 100.into(), valid_to: 3, partially_fillable: true, + creation_timestamp: Utc::now(), ..Default::default() }; insert_order(&mut db, &order).await.unwrap(); - async fn get_order(ex: &mut PgConnection, min_valid_to: i64) -> Option { + async fn get_full_order(ex: &mut PgConnection, min_valid_to: i64) -> Option { solvable_orders(ex, min_valid_to) .next() .await @@ -1443,7 +1487,7 @@ mod tests { } // not solvable because valid to - assert!(get_order(&mut db, 4).await.is_none()); + assert!(get_full_order(&mut db, 4).await.is_none()); // not solvable because fully executed crate::events::append( @@ -1462,7 +1506,7 @@ mod tests { ) .await .unwrap(); - assert!(get_order(&mut db, 0).await.is_none()); + assert!(get_full_order(&mut db, 0).await.is_none()); crate::events::delete(&mut db, 0).await.unwrap(); // not solvable because invalidated @@ -1480,11 +1524,11 @@ mod tests { ) .await .unwrap(); - assert!(get_order(&mut db, 0).await.is_none()); + assert!(get_full_order(&mut db, 0).await.is_none()); crate::events::delete(&mut db, 0).await.unwrap(); // solvable - assert!(get_order(&mut db, 3).await.is_some()); + assert!(get_full_order(&mut db, 3).await.is_some()); // still solvable because only partially filled crate::events::append( @@ -1503,7 +1547,7 @@ mod tests { ) .await .unwrap(); - assert!(get_order(&mut db, 3).await.is_some()); + assert!(get_full_order(&mut db, 3).await.is_some()); //no longer solvable, if it is a ethflow-order //with shorter user_valid_to from the ethflow @@ -1515,8 +1559,8 @@ mod tests { .await .unwrap(); - assert!(get_order(&mut db, 3).await.is_none()); - assert!(get_order(&mut db, 2).await.is_some()); + assert!(get_full_order(&mut db, 3).await.is_none()); + assert!(get_full_order(&mut db, 2).await.is_some()); // no longer solvable, if there was also a onchain order // placement error @@ -1532,7 +1576,277 @@ mod tests { .await .unwrap(); - assert!(get_order(&mut db, 2).await.is_none()); + assert!(get_full_order(&mut db, 2).await.is_none()); + } + + #[tokio::test] + #[ignore] + async fn postgres_open_orders_after() { + let mut db = PgConnection::connect("postgresql://").await.unwrap(); + let mut db = db.begin().await.unwrap(); + crate::clear_DANGER_(&mut db).await.unwrap(); + + async fn get_open_orders_after( + ex: &mut PgConnection, + after_block: i64, + after_timestamp: DateTime, + ) -> HashSet { + open_orders_after(ex, after_block, after_timestamp) + .map_ok(|o| o.uid) + .try_collect() + .await + .unwrap() + } + + let now = Utc::now(); + let order_a = Order { + uid: ByteArray([1u8; 56]), + creation_timestamp: now, + ..Default::default() + }; + insert_order(&mut db, &order_a).await.unwrap(); + let order_b = Order { + uid: ByteArray([2u8; 56]), + creation_timestamp: now + Duration::seconds(10), + ..Default::default() + }; + insert_order(&mut db, &order_b).await.unwrap(); + let order_c = Order { + uid: ByteArray([3u8; 56]), + cancellation_timestamp: Some(now + Duration::seconds(20)), + ..Default::default() + }; + insert_order(&mut db, &order_c).await.unwrap(); + let order_d = Order { + uid: ByteArray([4u8; 56]), + cancellation_timestamp: Some(now + Duration::seconds(25)), + ..Default::default() + }; + insert_order(&mut db, &order_d).await.unwrap(); + let order_e = Order { + uid: ByteArray([5u8; 56]), + cancellation_timestamp: Some(now + Duration::seconds(25)), + ..Default::default() + }; + insert_order(&mut db, &order_e).await.unwrap(); + + // Check fetching by timestamp only. + // Early timestamp should cover all the orders. + assert_eq!( + get_open_orders_after(&mut db, Default::default(), now - Duration::seconds(1)).await, + hashset![ + ByteArray([1u8; 56]), + ByteArray([2u8; 56]), + ByteArray([3u8; 56]), + ByteArray([4u8; 56]), + ByteArray([5u8; 56]), + ] + ); + // First order created at `now` timestamp. + assert_eq!( + get_open_orders_after(&mut db, Default::default(), now).await, + hashset![ + ByteArray([2u8; 56]), + ByteArray([3u8; 56]), + ByteArray([4u8; 56]), + ByteArray([5u8; 56]) + ] + ); + // First to orders created before `now + 10s` timestamp. + assert_eq!( + get_open_orders_after(&mut db, Default::default(), now + Duration::seconds(10)).await, + hashset![ + ByteArray([3u8; 56]), + ByteArray([4u8; 56]), + ByteArray([5u8; 56]) + ] + ); + + // Check fetching by block number. + let future_timestamp = now + Duration::seconds(50000); + // trades table + let (index_a, event_a) = { + ( + EventIndex { + block_number: 1, + ..Default::default() + }, + Trade { + order_uid: ByteArray([1u8; 56]), + sell_amount_including_fee: BigDecimal::from(10), + buy_amount: BigDecimal::from(100), + fee_amount: BigDecimal::from(1), + }, + ) + }; + let (index_b, event_b) = { + ( + EventIndex { + block_number: 2, + ..Default::default() + }, + Trade { + order_uid: ByteArray([1u8; 56]), + sell_amount_including_fee: BigDecimal::from(20), + buy_amount: BigDecimal::from(200), + fee_amount: BigDecimal::from(2), + }, + ) + }; + let (index_c, event_c) = { + ( + EventIndex { + block_number: 1, + log_index: 1, + }, + Trade { + order_uid: ByteArray([2u8; 56]), + sell_amount_including_fee: BigDecimal::from(40), + buy_amount: BigDecimal::from(400), + fee_amount: BigDecimal::from(4), + }, + ) + }; + + crate::events::insert_trade(&mut db, &index_a, &event_a) + .await + .unwrap(); + crate::events::insert_trade(&mut db, &index_b, &event_b) + .await + .unwrap(); + crate::events::insert_trade(&mut db, &index_c, &event_c) + .await + .unwrap(); + + // No events after block 2. + assert!(get_open_orders_after(&mut db, 2, future_timestamp) + .await + .is_empty()); + assert_eq!( + get_open_orders_after(&mut db, 0, future_timestamp).await, + hashset![ByteArray([1u8; 56]), ByteArray([2u8; 56])] + ); + + // order_execution table + crate::order_execution::save(&mut db, &ByteArray([1u8; 56]), 1, 1, &BigDecimal::from(1)) + .await + .unwrap(); + crate::order_execution::save(&mut db, &ByteArray([1u8; 56]), 2, 2, &BigDecimal::from(2)) + .await + .unwrap(); + crate::order_execution::save(&mut db, &ByteArray([1u8; 56]), 3, 0, &BigDecimal::from(4)) + .await + .unwrap(); + crate::order_execution::save(&mut db, &ByteArray([3u8; 56]), 2, 3, &BigDecimal::from(4)) + .await + .unwrap(); + + assert_eq!( + get_open_orders_after(&mut db, 0, future_timestamp).await, + hashset![ + ByteArray([1u8; 56]), + ByteArray([2u8; 56]), + ByteArray([3u8; 56]) + ] + ); + + // invalidations table + let invalidation_events = vec![ + ( + EventIndex { + block_number: 1, + ..Default::default() + }, + Event::Invalidation(Invalidation { + order_uid: ByteArray([1u8; 56]), + }), + ), + ( + EventIndex { + block_number: 0, + ..Default::default() + }, + Event::Invalidation(Invalidation { + order_uid: ByteArray([2u8; 56]), + }), + ), + ( + EventIndex { + block_number: 2, + log_index: 1, + }, + Event::Invalidation(Invalidation { + order_uid: ByteArray([4u8; 56]), + }), + ), + ]; + crate::events::append(&mut db, &invalidation_events) + .await + .unwrap(); + + assert_eq!( + get_open_orders_after(&mut db, 0, future_timestamp).await, + hashset![ + ByteArray([1u8; 56]), + ByteArray([2u8; 56]), + ByteArray([3u8; 56]), + ByteArray([4u8; 56]) + ] + ); + + // onchain_order_invalidations table + insert_onchain_invalidation( + &mut db, + &EventIndex { + block_number: 0, + ..Default::default() + }, + &ByteArray([3u8; 56]), + ) + .await + .unwrap(); + insert_onchain_invalidation( + &mut db, + &EventIndex { + block_number: 1, + ..Default::default() + }, + &ByteArray([2u8; 56]), + ) + .await + .unwrap(); + insert_onchain_invalidation( + &mut db, + &EventIndex { + block_number: 1, + ..Default::default() + }, + &ByteArray([5u8; 56]), + ) + .await + .unwrap(); + + assert_eq!( + get_open_orders_after(&mut db, 0, future_timestamp).await, + hashset![ + ByteArray([1u8; 56]), + ByteArray([2u8; 56]), + ByteArray([3u8; 56]), + ByteArray([4u8; 56]), + ByteArray([5u8; 56]) + ] + ); + + // Combined query. Order 3 has event after block 2 and order 4, 5 have creation + // timestamp after `now + 20s`. + assert_eq!( + get_open_orders_after(&mut db, 2, now + Duration::seconds(20)).await, + hashset![ + ByteArray([3u8; 56]), + ByteArray([4u8; 56]), + ByteArray([5u8; 56]), + ] + ); } type Data = ([u8; 56], Address, DateTime); @@ -1847,11 +2161,12 @@ mod tests { .await .unwrap(); + let fee: BigDecimal = 0.into(); let order = single_full_order(&mut db, &order_uid) .await .unwrap() .unwrap(); - assert_eq!(order.executed_surplus_fee, 0.into()); + assert_eq!(order.executed_surplus_fee, fee); let fee: BigDecimal = 1.into(); crate::order_execution::save(&mut db, &order_uid, 1, 0, &fee) diff --git a/crates/orderbook/src/database/orders.rs b/crates/orderbook/src/database/orders.rs index 5b563bfe60..e496ff6978 100644 --- a/crates/orderbook/src/database/orders.rs +++ b/crates/orderbook/src/database/orders.rs @@ -118,10 +118,11 @@ async fn cancel_order( } async fn insert_order(order: &Order, ex: &mut PgConnection) -> Result<(), InsertionError> { + let order_uid = ByteArray(order.metadata.uid.0); insert_order_event( ex, &OrderEvent { - order_uid: ByteArray(order.metadata.uid.0), + order_uid, timestamp: Utc::now(), label: OrderEventLabel::Created, }, @@ -157,7 +158,7 @@ async fn insert_order(order: &Order, ex: &mut PgConnection) -> Result<(), Insert .collect::>(); let order = database::orders::Order { - uid: ByteArray(order.metadata.uid.0), + uid: order_uid, owner: ByteArray(order.metadata.owner.0), creation_timestamp: order.metadata.creation_date, sell_token: ByteArray(order.data.sell_token.0), diff --git a/database/sql/V069__create_indexes_for_solvable_orders_search.sql b/database/sql/V069__create_indexes_for_solvable_orders_search.sql new file mode 100644 index 0000000000..53f385ac89 --- /dev/null +++ b/database/sql/V069__create_indexes_for_solvable_orders_search.sql @@ -0,0 +1,4 @@ +-- Creates new indexes required for the incremental solvable orders cache update process. +CREATE INDEX order_creation_cancellation ON orders USING BTREE (creation_timestamp, cancellation_timestamp); +CREATE INDEX order_execution_block_number ON order_execution USING BTREE (block_number); +CREATE INDEX ethflow_refunds_block_number ON ethflow_refunds USING BTREE (block_number);