Skip to content

Commit

Permalink
Query fix
Browse files Browse the repository at this point in the history
  • Loading branch information
squadgazzz committed Aug 23, 2024
1 parent 6d012c5 commit 8551f81
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 263 deletions.
62 changes: 61 additions & 1 deletion crates/autopilot/src/boundary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@ pub use {
},
shared::order_validation::{is_order_outside_market_price, Amounts},
};
use {crate::domain, ethrpc::Web3, std::collections::HashMap, url::Url};
use {
crate::domain,
ethrpc::Web3,
model::time::now_in_epoch_seconds,
number::conversions::u256_to_big_uint,
std::collections::{HashMap, HashSet},
url::Url,
};

pub mod events;
pub mod order;
Expand All @@ -46,3 +53,56 @@ pub struct SolvableOrders {
pub quotes: HashMap<domain::OrderUid, domain::Quote>,
pub latest_settlement_block: u64,
}

impl SolvableOrders {
pub fn combine_with(&self, other: Self) -> Self {
let mut orders = self.orders.clone();
let mut quotes = self.quotes.clone();
for (uid, new_order) in other.orders {
orders.insert(uid, new_order);
}
for (uid, quote) in other.quotes {
quotes.insert(uid, quote);
}

let now = now_in_epoch_seconds();
orders.retain(|_uid, order| {
let expired = || {
order.data.valid_to < now
|| order
.metadata
.ethflow_data
.as_ref()
.is_some_and(|data| data.user_valid_to < now as i64)
};
let onchain_error = || {
order
.metadata
.onchain_order_data
.as_ref()
.is_some_and(|data| data.placement_error.is_some())
};
let fulfilled = || match order.data.kind {
OrderKind::Sell => {
order.metadata.executed_sell_amount >= u256_to_big_uint(&order.data.sell_amount)
}
OrderKind::Buy => {
order.metadata.executed_buy_amount >= u256_to_big_uint(&order.data.buy_amount)
}
};

!order.metadata.invalidated && !onchain_error() && !expired() && !fulfilled()
});

let order_uids = orders.keys().collect::<HashSet<_>>();
quotes.retain(|uid, _quote| order_uids.contains(uid));

Self {
orders,
quotes,
latest_settlement_block: self
.latest_settlement_block
.max(other.latest_settlement_block),
}
}
}
37 changes: 37 additions & 0 deletions crates/autopilot/src/database/auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,43 @@ impl Postgres {
})
}

pub async fn orders_after(
&self,
after_timestamp: DateTime<Utc>,
min_valid_to: u32,
) -> Result<boundary::SolvableOrders> {
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["solvable_orders_after"])
.start_timer();

let mut ex = self.pool.begin().await?;
// Set the transaction isolation level to REPEATABLE READ
// so the both SELECT queries below are executed in the same database snapshot
// taken at the moment before the first query is executed.
sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
.execute(ex.deref_mut())
.await?;

let orders: HashMap<domain::OrderUid, Order> =
database::orders::full_orders_after(&mut ex, after_timestamp, min_valid_to as i64)
.map(|result| match result {
Ok(order) => full_order_into_model_order(order)
.map(|order| (domain::OrderUid(order.metadata.uid.0), order)),
Err(err) => Err(anyhow::Error::from(err)),
})
.try_collect()
.await?;
let latest_settlement_block =
database::orders::latest_settlement_block(&mut ex).await? as u64;
let quotes = self.read_quotes(orders.keys()).await?;
Ok(boundary::SolvableOrders {
orders,
quotes,
latest_settlement_block,
})
}

pub async fn replace_current_auction(&self, auction: &dto::Auction) -> Result<dto::AuctionId> {
let _timer = super::Metrics::get()
.database_queries
Expand Down
44 changes: 11 additions & 33 deletions crates/autopilot/src/infra/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use {
boundary::database::byte_array::ByteArray,
chrono::{DateTime, Utc},
database::{order_events::OrderEventLabel, settlement_observations::Observation},
futures::TryStreamExt,
number::conversions::{big_decimal_to_u256, u256_to_big_decimal},
primitive_types::{H160, H256},
std::{
Expand Down Expand Up @@ -69,6 +68,17 @@ impl Persistence {
.map_err(DatabaseError)
}

pub async fn orders_after(
&self,
after_timestamp: DateTime<Utc>,
min_valid_to: u32,
) -> Result<boundary::SolvableOrders, DatabaseError> {
self.postgres
.orders_after(after_timestamp, min_valid_to)
.await
.map_err(DatabaseError)
}

pub async fn read_quotes(
&self,
orders: impl Iterator<Item = &domain::OrderUid>,
Expand Down Expand Up @@ -390,38 +400,6 @@ impl Persistence {
Ok(solution)
}

pub async fn orders_after(
&self,
after_timestamp: DateTime<Utc>,
min_valid_to: i64,
) -> anyhow::Result<Vec<database::orders::FullOrder>> {
let _timer = Metrics::get()
.database_queries
.with_label_values(&["orders_after"])
.start_timer();
let mut ex = self.postgres.pool.acquire().await.context("begin")?;
Ok(
database::orders::full_orders_after(&mut ex, after_timestamp, min_valid_to)
.try_collect()
.await?,
)
}

pub async fn trades_after(
&self,
after_block: i64,
) -> anyhow::Result<HashMap<domain::OrderUid, database::trades::TradedAmounts>> {
let _timer = Metrics::get()
.database_queries
.with_label_values(&["trades_after"])
.start_timer();
let mut ex = self.postgres.pool.acquire().await.context("begin")?;
Ok(database::trades::trades_after(&mut ex, after_block)
.map_ok(|trade| (domain::OrderUid(trade.order_uid.0), trade))
.try_collect()
.await?)
}

/// Returns the oldest settlement event for which the accociated auction is
/// not yet populated in the database.
pub async fn get_settlement_without_auction(
Expand Down
133 changes: 23 additions & 110 deletions crates/autopilot/src/solvable_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use {
signature::Signature,
time::now_in_epoch_seconds,
},
number::conversions::{big_decimal_to_u256, u256_to_big_decimal},
number::conversions::u256_to_big_decimal,
primitive_types::{H160, H256, U256},
prometheus::{
Histogram,
Expand All @@ -31,7 +31,6 @@ use {
shared::{
account_balances::{BalanceFetching, Query},
bad_token::BadTokenDetecting,
db_order_conversions::full_order_into_model_order,
price_estimation::{
native::NativePriceEstimating,
native_price_cache::CachingNativePriceEstimator,
Expand Down Expand Up @@ -167,92 +166,6 @@ impl SolvableOrdersCache {
.map(|inner| inner.auction.clone())
}

fn build_solvable_orders(
current_orders: &boundary::SolvableOrders,
new_orders: Vec<database::orders::FullOrder>,
mut new_trades: HashMap<domain::OrderUid, database::trades::TradedAmounts>,
new_quotes: HashMap<domain::OrderUid, domain::Quote>,
) -> Result<boundary::SolvableOrders> {
let now = now_in_epoch_seconds();
let mut orders = current_orders.orders.clone();
// Remove expired orders.
orders.retain(|_uid, order| {
order.data.valid_to >= now
&& !order
.metadata
.ethflow_data
.as_ref()
.is_some_and(|data| data.user_valid_to < now as i64)
});
let mut quotes = current_orders.quotes.clone();
let mut latest_trade_block = current_orders.latest_settlement_block;

for new_order in new_orders {
let uid = domain::OrderUid(new_order.uid.0);

if new_order.onchain_placement_error.is_some() || new_order.invalidated {
orders.remove(&uid);
quotes.remove(&uid);
continue;
}

// Drop already visited trades to slightly reduce time complexity.
let Some(trade_amounts) = new_trades.remove(&uid) else {
continue;
};

let fulfilled = match new_order.kind {
database::orders::OrderKind::Sell => {
trade_amounts.sell_amount >= new_order.sell_amount
}
database::orders::OrderKind::Buy => {
trade_amounts.buy_amount >= new_order.buy_amount
}
};

if !fulfilled {
let order = full_order_into_model_order(new_order).map_err(anyhow::Error::from)?;
orders.insert(uid, order);

if let Some(new_quote) = new_quotes.get(&uid) {
quotes.insert(uid, new_quote.clone());
}
} else {
orders.remove(&uid);
quotes.remove(&uid);
}
latest_trade_block = latest_trade_block.max(trade_amounts.block_number as u64);
}

// Iterate over remaining trades to drop orders that are already fulfilled.
for (uid, trade_amounts) in new_trades {
if let Some(order) = orders.get(&uid) {
let fulfilled = match order.data.kind {
model::order::OrderKind::Sell => {
big_decimal_to_u256(&trade_amounts.sell_amount).context("U256 overflow")?
>= order.data.sell_amount
}
model::order::OrderKind::Buy => {
big_decimal_to_u256(&trade_amounts.buy_amount).context("U256 overflow")?
>= order.data.buy_amount
}
};

if fulfilled {
orders.remove(&uid);
quotes.remove(&uid);
}
}
latest_trade_block = latest_trade_block.max(trade_amounts.block_number as u64);
}

Ok(boundary::SolvableOrders {
orders,
quotes,
latest_settlement_block: latest_trade_block,
})
}

/// Manually update solvable orders. Usually called by the background
/// updating task.
///
Expand All @@ -265,7 +178,11 @@ impl SolvableOrdersCache {
let db_solvable_orders = {
let lock = self.cache.lock().await;
if let Some(cache) = &*lock {
self.updated_solvable_orders(min_valid_to, cache).await?
let new_orders = self
.persistence
.orders_after(cache.last_order_creation_timestamp, min_valid_to)
.await?;
cache.solvable_orders.combine_with(new_orders)
} else {
self.persistence.all_solvable_orders(min_valid_to).await?
}
Expand Down Expand Up @@ -420,27 +337,23 @@ impl SolvableOrdersCache {
Ok(())
}

async fn updated_solvable_orders(
&self,
min_valid_to: u32,
cache: &Inner,
) -> Result<boundary::SolvableOrders> {
let new_orders_fut = self
.persistence
.orders_after(cache.last_order_creation_timestamp, min_valid_to as i64);
let new_trades_fut = self.persistence.trades_after(
i64::try_from(cache.solvable_orders.latest_settlement_block)
.context("block number value exceeds i64")?,
);
let (new_orders, new_trades) = tokio::try_join!(new_orders_fut, new_trades_fut)?;
let order_uids = new_orders
.iter()
.map(|order| domain::OrderUid(order.uid.0))
.collect::<Vec<_>>();
let quotes = self.persistence.read_quotes(order_uids.iter()).await?;

Self::build_solvable_orders(&cache.solvable_orders, new_orders, new_trades, quotes)
}
// async fn updated_solvable_orders(
// &self,
// min_valid_to: u32,
// cache: &Inner,
// ) -> Result<boundary::SolvableOrders> {
// let new_orders_fut = self
// .persistence
// .orders_after(cache.last_order_creation_timestamp, min_valid_to as
// i64); let (new_orders, new_trades) = tokio::try_join!(new_orders_fut,
// new_trades_fut)?; let order_uids = new_orders
// .iter()
// .map(|order| domain::OrderUid(order.uid.0))
// .collect::<Vec<_>>();
// let quotes = self.persistence.read_quotes(order_uids.iter()).await?;
//
// Self::build_solvable_orders(&cache.solvable_orders, new_orders,
// new_trades, quotes) }

async fn fetch_balances(&self, queries: Vec<Query>) -> HashMap<Query, U256> {
let fetched_balances = {
Expand Down
4 changes: 2 additions & 2 deletions crates/database/src/orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ pub fn full_orders_after(
" LEFT OUTER JOIN ethflow_orders eth_o on eth_o.uid = o.uid ",
" WHERE (o.creation_timestamp > $1 OR o.cancellation_timestamp > $1)",
" AND o.valid_to >= $2",
" AND CASE WHEN eth_o.valid_to IS NULL THEN true ELSE eth_o.valid_to >= $1 END",
" AND CASE WHEN eth_o.valid_to IS NULL THEN true ELSE eth_o.valid_to >= $2 END",
);
sqlx::query_as(QUERY)
.bind(after_timestamp)
Expand Down Expand Up @@ -1619,7 +1619,7 @@ mod tests {

#[tokio::test]
#[ignore]
async fn postgres_orders_without_trades_after() {
async fn postgres_orders_after() {
let mut db = PgConnection::connect("postgresql://").await.unwrap();
let mut db = db.begin().await.unwrap();
crate::clear_DANGER_(&mut db).await.unwrap();
Expand Down
Loading

0 comments on commit 8551f81

Please sign in to comment.