Skip to content

Commit

Permalink
Incremental solvable orders cache update (#2923)
Browse files Browse the repository at this point in the history
# Description
> Updating solvable orders (ie creating a new auction) currently takes
>2s with some pretty heavy outliers
([logs](https://production-6de61f.kb.eu-central-1.aws.cloud.es.io/app/r/s/ALsEK))
> 
> This makes it hard to bring CoW protocol's auction rate down to one
batch per block as simply creating up to date state would take >15% of
the total time we have at hand. We should at least be able to half this
time (if not getting it down even more)

In order to relieve the situation, it was proposed to introduce
incremental solvable orders cache update, which selects all the solvable
orders using the old heavy query only at startup, stores the latest
received order's creation timestamp in memory, and then makes much
faster incremental bounded queries to the orders and additional tables
that select fewer data and executes faster.

# Changes

Since incremental fetching retrieves orders created/cancelled after the
specific timestamps, it is also required now to fetch orders that have
any onchain update based on the last fetched block number. Having said
that, the data needs to be fetched within a single TX, so there is no
way to run all the queries in parallel.

1. If the current solvable orders cache is empty, execute the original
heavy SQL query to fetch all current solvable orders and store them in
memory.
2. Otherwise, fetch full orders that created or cancelled after the last
stored timestamp and also find UIDs of the order that have any onchain
data updated after the latest observed block number. This includes
fetching data from the following tables: trades, ethflow_data,
order_execution, invalidations, onchain_order_invalidations,
onchain_placed_orders, presignature_events.
3. Fetch quotes for all the collected orders.
4. Add all the newly received orders to the cache.
5. Filter out all the orders that are one of: contain on-chain errors,
expired, fulfilled, invalidated.
6. Calculate the latest observed order creation timestamp.
7. Continue with the regular auction creation process.

As a result, we now have 3 SQL queries where each executes in ~50ms
instead of a single one taking ~2s.

## How to test
New DB tests. Existing e2e tests.

## Related Issues

Fixes #2831
  • Loading branch information
squadgazzz authored Sep 5, 2024
1 parent 12aa87b commit ed09b91
Show file tree
Hide file tree
Showing 11 changed files with 607 additions and 77 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/autopilot/src/boundary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub fn web3_client(ethrpc: &Url, ethrpc_args: &shared::ethrpc::Arguments) -> Web
}

pub struct SolvableOrders {
pub orders: Vec<model::order::Order>,
pub orders: HashMap<domain::OrderUid, model::order::Order>,
pub quotes: HashMap<domain::OrderUid, domain::Quote>,
pub latest_settlement_block: u64,
}
37 changes: 17 additions & 20 deletions crates/autopilot/src/database/auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use {
chrono::{DateTime, Utc},
futures::{StreamExt, TryStreamExt},
model::{order::Order, quote::QuoteId},
num::ToPrimitive,
shared::{
db_order_conversions::full_order_into_model_order,
event_storing_helpers::{create_db_search_parameters, create_quote_row},
order_quoting::{QuoteData, QuoteSearchParameters, QuoteStoring},
},
std::ops::DerefMut,
std::{collections::HashMap, ops::DerefMut},
};

#[async_trait::async_trait]
Expand Down Expand Up @@ -60,7 +61,7 @@ impl QuoteStoring for Postgres {
}

impl Postgres {
pub async fn solvable_orders(&self, min_valid_to: u32) -> Result<boundary::SolvableOrders> {
pub async fn all_solvable_orders(&self, min_valid_to: u32) -> Result<boundary::SolvableOrders> {
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["solvable_orders"])
Expand All @@ -73,24 +74,20 @@ impl Postgres {
sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
.execute(ex.deref_mut())
.await?;
let orders: Vec<Order> = database::orders::solvable_orders(&mut ex, min_valid_to as i64)
.map(|result| match result {
Ok(order) => full_order_into_model_order(order),
Err(err) => Err(anyhow::Error::from(err)),
})
.try_collect()
.await?;
let latest_settlement_block =
database::orders::latest_settlement_block(&mut ex).await? as u64;
let quotes = self
.read_quotes(
orders
.iter()
.map(|order| domain::OrderUid(order.metadata.uid.0))
.collect::<Vec<_>>()
.iter(),
)
.await?;
let orders: HashMap<domain::OrderUid, Order> =
database::orders::solvable_orders(&mut ex, i64::from(min_valid_to))
.map(|result| match result {
Ok(order) => full_order_into_model_order(order)
.map(|order| (domain::OrderUid(order.metadata.uid.0), order)),
Err(err) => Err(anyhow::Error::from(err)),
})
.try_collect()
.await?;
let latest_settlement_block = database::orders::latest_settlement_block(&mut ex)
.await?
.to_u64()
.context("latest_settlement_block is not u64")?;
let quotes = self.read_quotes(orders.keys()).await?;
Ok(boundary::SolvableOrders {
orders,
quotes,
Expand Down
5 changes: 5 additions & 0 deletions crates/autopilot/src/database/quotes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ impl Postgres {
&self,
orders: impl Iterator<Item = &domain::OrderUid>,
) -> Result<HashMap<domain::OrderUid, domain::Quote>, sqlx::Error> {
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["read_quotes"])
.start_timer();

let mut ex = self.pool.acquire().await?;
let order_uids: Vec<_> = orders.map(|uid| ByteArray(uid.0)).collect();
let quotes: HashMap<_, _> = database::orders::read_quotes(&mut ex, &order_uids)
Expand Down
147 changes: 143 additions & 4 deletions crates/autopilot/src/infra/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use {
infra::persistence::dto::AuctionId,
},
anyhow::Context,
bigdecimal::ToPrimitive,
boundary::database::byte_array::ByteArray,
chrono::Utc,
chrono::{DateTime, Utc},
database::{
order_events::OrderEventLabel,
orders::{
Expand All @@ -22,10 +23,14 @@ use {
SellTokenSource as DomainSellTokenSource,
SigningScheme as DomainSigningScheme,
},
number::conversions::{big_decimal_to_u256, u256_to_big_decimal},
futures::{StreamExt, TryStreamExt},
itertools::Itertools,
number::conversions::{big_decimal_to_u256, u256_to_big_decimal, u256_to_big_uint},
primitive_types::{H160, H256},
shared::db_order_conversions::full_order_into_model_order,
std::{
collections::{HashMap, HashSet},
ops::DerefMut,
sync::Arc,
},
tracing::Instrument,
Expand Down Expand Up @@ -71,12 +76,13 @@ impl Persistence {
.map_err(DatabaseError)
}

pub async fn solvable_orders(
/// Finds solvable orders based on the order's min validity period.
pub async fn all_solvable_orders(
&self,
min_valid_to: u32,
) -> Result<boundary::SolvableOrders, DatabaseError> {
self.postgres
.solvable_orders(min_valid_to)
.all_solvable_orders(min_valid_to)
.await
.map_err(DatabaseError)
}
Expand Down Expand Up @@ -395,6 +401,139 @@ impl Persistence {
Ok(solution)
}

/// Computes solvable orders based on the latest observed block number,
/// order creation timestamp, and minimum validity period.
pub async fn solvable_orders_after(
&self,
current_orders: HashMap<domain::OrderUid, model::order::Order>,
after_timestamp: DateTime<Utc>,
after_block: u64,
min_valid_to: u32,
) -> anyhow::Result<boundary::SolvableOrders> {
let after_block = i64::try_from(after_block).context("block number value exceeds i64")?;
let mut tx = self.postgres.pool.begin().await.context("begin")?;
// Set the transaction isolation level to REPEATABLE READ
// so all the SELECT queries below are executed in the same database snapshot
// taken at the moment before the first query is executed.
sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
.execute(tx.deref_mut())
.await?;

// Fetch the orders that were updated after the given block and were created or
// cancelled after the given timestamp.
let next_orders: HashMap<domain::OrderUid, model::order::Order> = {
let _timer = Metrics::get()
.database_queries
.with_label_values(&["open_orders_after"])
.start_timer();

database::orders::open_orders_after(&mut tx, after_block, after_timestamp)
.map(|result| match result {
Ok(order) => full_order_into_model_order(order)
.map(|order| (domain::OrderUid(order.metadata.uid.0), order)),
Err(err) => Err(anyhow::Error::from(err)),
})
.try_collect()
.await?
};

// Fetch quotes for new orders and also update them for the cached ones since
// they could also be updated.
let updated_quotes = {
let _timer = Metrics::get()
.database_queries
.with_label_values(&["read_quotes"])
.start_timer();

let all_order_uids = next_orders
.keys()
.chain(current_orders.keys())
.unique()
.map(|uid| ByteArray(uid.0))
.collect::<Vec<_>>();

database::orders::read_quotes(&mut tx, &all_order_uids)
.await?
.into_iter()
.filter_map(|quote| {
let order_uid = domain::OrderUid(quote.order_uid.0);
dto::quote::into_domain(quote)
.map_err(|err| {
tracing::warn!(?order_uid, ?err, "failed to convert quote from db")
})
.ok()
.map(|quote| (order_uid, quote))
})
.collect()
};

let latest_settlement_block = database::orders::latest_settlement_block(&mut tx)
.await?
.to_u64()
.context("latest_settlement_block is not u64")?;

Self::build_solvable_orders(
current_orders,
next_orders,
updated_quotes,
latest_settlement_block,
min_valid_to,
)
}

fn build_solvable_orders(
mut current_orders: HashMap<domain::OrderUid, model::order::Order>,
next_orders: HashMap<domain::OrderUid, model::order::Order>,
mut next_quotes: HashMap<domain::OrderUid, domain::Quote>,
latest_settlement_block: u64,
min_valid_to: u32,
) -> anyhow::Result<boundary::SolvableOrders> {
// Blindly insert all new orders into the cache.
for (uid, order) in next_orders {
current_orders.insert(uid, order);
}

// Filter out all the invalid orders.
current_orders.retain(|_uid, order| {
let expired = order.data.valid_to < min_valid_to
|| order
.metadata
.ethflow_data
.as_ref()
.is_some_and(|data| data.user_valid_to < i64::from(min_valid_to));

let invalidated = order.metadata.invalidated;
let onchain_error = order
.metadata
.onchain_order_data
.as_ref()
.is_some_and(|data| data.placement_error.is_some());
let fulfilled = {
match order.data.kind {
model::order::OrderKind::Sell => {
order.metadata.executed_sell_amount
>= u256_to_big_uint(&order.data.sell_amount)
}
model::order::OrderKind::Buy => {
order.metadata.executed_buy_amount
>= u256_to_big_uint(&order.data.buy_amount)
}
}
};

!expired && !invalidated && !onchain_error && !fulfilled
});

// Keep only relevant quotes.
next_quotes.retain(|uid, _quote| current_orders.contains_key(uid));

Ok(boundary::SolvableOrders {
orders: current_orders,
quotes: next_quotes,
latest_settlement_block,
})
}

/// Returns the oldest settlement event for which the accociated auction is
/// not yet populated in the database.
pub async fn get_settlement_without_auction(
Expand Down
2 changes: 1 addition & 1 deletion crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl RunLoop {
}

async fn cut_auction(&self) -> Option<domain::AuctionWithId> {
let auction = match self.solvable_orders_cache.current_auction() {
let auction = match self.solvable_orders_cache.current_auction().await {
Some(auction) => auction,
None => {
tracing::debug!("no current auction");
Expand Down
Loading

0 comments on commit ed09b91

Please sign in to comment.