Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental solvable orders cache update #2923

Merged
merged 110 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 107 commits
Commits
Show all changes
110 commits
Select commit Hold shift + click to select a range
bee782a
Update the cache structure
squadgazzz Aug 20, 2024
0b8c97d
Define new struct
squadgazzz Aug 20, 2024
22abb5e
Fetch functions
squadgazzz Aug 20, 2024
ab2f8cb
update_solvable_orders
squadgazzz Aug 22, 2024
e6a4a64
Fetch in parallel
squadgazzz Aug 22, 2024
6984418
Minor
squadgazzz Aug 22, 2024
ac2f36a
Naming
squadgazzz Aug 22, 2024
efb7389
BoxStream
squadgazzz Aug 22, 2024
c185443
Tests
squadgazzz Aug 22, 2024
3a12bac
Naming
squadgazzz Aug 23, 2024
5786d82
Tests
squadgazzz Aug 23, 2024
e7165f6
Trades test
squadgazzz Aug 23, 2024
d630e9e
Indexes
squadgazzz Aug 23, 2024
fe50a81
Naming
squadgazzz Aug 23, 2024
350191f
Redundant index
squadgazzz Aug 23, 2024
a2262cb
Remove expired orders
squadgazzz Aug 23, 2024
298e31a
Merge branch 'main' into 2831/improve-auction-update
squadgazzz Aug 23, 2024
cea8850
Fix after merge
squadgazzz Aug 23, 2024
8eb3e55
Fix test
squadgazzz Aug 23, 2024
c883090
Redundant tx
squadgazzz Aug 23, 2024
53969d5
Minor
squadgazzz Aug 23, 2024
3dc9243
Check eth flow timestamp
squadgazzz Aug 23, 2024
2d0813b
Update executed amounts properly
squadgazzz Aug 23, 2024
6d012c5
Revert back to FullOrder
squadgazzz Aug 23, 2024
8551f81
Query fix
squadgazzz Aug 23, 2024
efc0d58
Refactoring
squadgazzz Aug 23, 2024
c173a33
Docs
squadgazzz Aug 23, 2024
d8b12a4
Commented code
squadgazzz Aug 23, 2024
a5a0769
Fix
squadgazzz Aug 23, 2024
25faa4e
Comment
squadgazzz Aug 23, 2024
d34cc12
Redundant option
squadgazzz Aug 23, 2024
8868079
Revert "Redundant option"
squadgazzz Aug 23, 2024
1d2efee
Revert "Comment"
squadgazzz Aug 23, 2024
b5a869e
Revert "Fix"
squadgazzz Aug 23, 2024
b2a04f9
Revert "Commented code"
squadgazzz Aug 23, 2024
b06e2a9
Revert "Docs"
squadgazzz Aug 23, 2024
396fc43
Revert "Refactoring"
squadgazzz Aug 23, 2024
157e6c0
Revert "Query fix"
squadgazzz Aug 23, 2024
f93f2f6
Revert "Revert back to FullOrder"
squadgazzz Aug 23, 2024
902687b
Fix
squadgazzz Aug 23, 2024
35cd677
Fix
squadgazzz Aug 26, 2024
8ca4434
Fetch onchain data
squadgazzz Aug 27, 2024
945b7cc
Ethflow data
squadgazzz Aug 27, 2024
e9d5463
Migrate fields
squadgazzz Aug 27, 2024
de7324c
Presignature
squadgazzz Aug 27, 2024
9e05929
Move field
squadgazzz Aug 27, 2024
c1a5cfa
Naming
squadgazzz Aug 27, 2024
11d9db5
Execute everything in a single TX
squadgazzz Aug 27, 2024
85c9d16
Naming
squadgazzz Aug 27, 2024
9035848
Naming
squadgazzz Aug 28, 2024
a0f1d9e
Fixed doc
squadgazzz Aug 28, 2024
b70a134
Repeatable read isolation level
squadgazzz Aug 28, 2024
5b9d146
Updates after test
squadgazzz Aug 28, 2024
62907ed
Onchain order tests
squadgazzz Aug 28, 2024
6e61210
Refunds test
squadgazzz Aug 28, 2024
7004db3
Presignature events test
squadgazzz Aug 28, 2024
c20b156
More indexes
squadgazzz Aug 28, 2024
8eafd2b
Merge branch 'main' into 2831/improve-auction-update
squadgazzz Aug 28, 2024
a9262dd
Redundant import
squadgazzz Aug 28, 2024
065bed0
Fix
squadgazzz Aug 28, 2024
0e51662
Fix
squadgazzz Aug 28, 2024
1720b1c
Empty commit
squadgazzz Aug 28, 2024
772520b
Update interactions
squadgazzz Aug 28, 2024
157c126
Metric name
squadgazzz Aug 28, 2024
779d6b9
Fix
squadgazzz Aug 28, 2024
1d3b179
Docs
squadgazzz Aug 28, 2024
65a630a
Fix
squadgazzz Aug 28, 2024
cd54bc4
Simplified approach
squadgazzz Aug 30, 2024
3d7e0d4
postgres_open_orders_after
squadgazzz Aug 30, 2024
6dd8299
postgres_updated_order_uids_after
squadgazzz Aug 30, 2024
79a989a
Revert back to FullOrder
squadgazzz Aug 30, 2024
57d46fb
Drop read_orders_after
squadgazzz Aug 30, 2024
2604f12
Drop latest_presignature_events_after
squadgazzz Aug 30, 2024
c16ec11
Drop latest_order_events_after
squadgazzz Aug 30, 2024
2740db0
Drop updates_after
squadgazzz Aug 30, 2024
276f1b5
Drop events_after
squadgazzz Aug 30, 2024
f1951c9
Redundant code
squadgazzz Aug 30, 2024
4b91878
Redundant changes
squadgazzz Aug 30, 2024
c494446
Fetching improvements
squadgazzz Aug 30, 2024
2cc9125
Merge branch 'main' into 2831/improve-auction-update
squadgazzz Aug 30, 2024
ab0d5d5
Fix
squadgazzz Aug 30, 2024
ef721b5
Expired filter fix
squadgazzz Sep 2, 2024
86c3f45
Safer casting
squadgazzz Sep 2, 2024
fd4d039
Naming
squadgazzz Sep 2, 2024
73976cf
Avoid long locking
squadgazzz Sep 2, 2024
86cb105
Unify DB updates
squadgazzz Sep 2, 2024
b6c5538
Const timestamp
squadgazzz Sep 2, 2024
6baaf2c
Compare prev and found creation timestamp
squadgazzz Sep 2, 2024
6519810
Redundant comments
squadgazzz Sep 2, 2024
0c8eeee
Wording
squadgazzz Sep 2, 2024
4cbf63f
Doc
squadgazzz Sep 2, 2024
a10b4e3
Drop interactions fetching
squadgazzz Sep 2, 2024
7176646
Minor formatting fix
squadgazzz Sep 2, 2024
1afe8ed
Fetch quotes in a single tx
squadgazzz Sep 2, 2024
dd8b362
Naming
squadgazzz Sep 2, 2024
2c8a45c
Leftovers
squadgazzz Sep 4, 2024
ea84a28
Extract to a new function
squadgazzz Sep 4, 2024
72d8b7a
Rename migration
squadgazzz Sep 4, 2024
55eb516
Merge branch 'main' into 2831/improve-auction-update
squadgazzz Sep 4, 2024
459bf87
Use optional cache
squadgazzz Sep 4, 2024
fb8dad3
Remove autopilot e2e readiness check
squadgazzz Sep 4, 2024
db3a1b5
Revert "Remove autopilot e2e readiness check"
squadgazzz Sep 4, 2024
4376b22
Revert "Use optional cache"
squadgazzz Sep 4, 2024
bd95564
Safe casting
squadgazzz Sep 4, 2024
b24d9cc
Test comments
squadgazzz Sep 4, 2024
1f94ebe
Unify query
squadgazzz Sep 4, 2024
c724e35
Merge branch 'main' into 2831/improve-auction-update
squadgazzz Sep 5, 2024
b012d45
Updated comment
squadgazzz Sep 5, 2024
9438fa0
Naming
squadgazzz Sep 5, 2024
ac250e7
Merge branch 'main' into 2831/improve-auction-update
squadgazzz Sep 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/autopilot/src/boundary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub fn web3_client(ethrpc: &Url, ethrpc_args: &shared::ethrpc::Arguments) -> Web
}

pub struct SolvableOrders {
pub orders: Vec<model::order::Order>,
pub orders: HashMap<domain::OrderUid, model::order::Order>,
pub quotes: HashMap<domain::OrderUid, domain::Quote>,
pub latest_settlement_block: u64,
}
37 changes: 17 additions & 20 deletions crates/autopilot/src/database/auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use {
chrono::{DateTime, Utc},
futures::{StreamExt, TryStreamExt},
model::{order::Order, quote::QuoteId},
num::ToPrimitive,
shared::{
db_order_conversions::full_order_into_model_order,
event_storing_helpers::{create_db_search_parameters, create_quote_row},
order_quoting::{QuoteData, QuoteSearchParameters, QuoteStoring},
},
std::ops::DerefMut,
std::{collections::HashMap, ops::DerefMut},
};

#[async_trait::async_trait]
Expand Down Expand Up @@ -60,7 +61,7 @@ impl QuoteStoring for Postgres {
}

impl Postgres {
pub async fn solvable_orders(&self, min_valid_to: u32) -> Result<boundary::SolvableOrders> {
pub async fn all_solvable_orders(&self, min_valid_to: u32) -> Result<boundary::SolvableOrders> {
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["solvable_orders"])
Expand All @@ -73,24 +74,20 @@ impl Postgres {
sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
.execute(ex.deref_mut())
.await?;
let orders: Vec<Order> = database::orders::solvable_orders(&mut ex, min_valid_to as i64)
.map(|result| match result {
Ok(order) => full_order_into_model_order(order),
Err(err) => Err(anyhow::Error::from(err)),
})
.try_collect()
.await?;
let latest_settlement_block =
database::orders::latest_settlement_block(&mut ex).await? as u64;
let quotes = self
.read_quotes(
orders
.iter()
.map(|order| domain::OrderUid(order.metadata.uid.0))
.collect::<Vec<_>>()
.iter(),
)
.await?;
let orders: HashMap<domain::OrderUid, Order> =
database::orders::solvable_orders(&mut ex, i64::from(min_valid_to))
.map(|result| match result {
Ok(order) => full_order_into_model_order(order)
.map(|order| (domain::OrderUid(order.metadata.uid.0), order)),
Err(err) => Err(anyhow::Error::from(err)),
})
.try_collect()
.await?;
let latest_settlement_block = database::orders::latest_settlement_block(&mut ex)
.await?
.to_u64()
.context("latest_settlement_block is not u64")?;
let quotes = self.read_quotes(orders.keys()).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this await be parallelized with the latest_settlement_block() one? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, since there is an opened transaction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand. Are you saying that in order to get the quote, we need to first read the latest settlement block?

Copy link
Contributor Author

@squadgazzz squadgazzz Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an opened tx which doesn't allow executing SQL queries in parallel:

let mut tx = self.postgres.pool.begin().await.context("begin")?;
// Set the transaction isolation level to REPEATABLE READ
// so all the SELECT queries below are executed in the same database snapshot
// taken at the moment before the first query is executed.
sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
.execute(tx.deref_mut())
.await?;

We should fetch all the data using a single DB snapshot even for quotes since there is a logic of updating them based on the onchain data:

// We only need to insert quotes for orders that will be included in an
// auction (they are needed to compute solver rewards). If placement
// failed, then the quote is not needed.
insert_quotes(
transaction,
quotes
.clone()
.into_iter()
.flatten()
.collect::<Vec<_>>()
.as_slice(),
)
.await
.context("appending quotes for onchain orders failed")?;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, shouldn't we use the same tx for fetching the quote?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right. Looks like I reverted the wrong change during one of the reworks. Thanks!

Ok(boundary::SolvableOrders {
orders,
quotes,
Expand Down
5 changes: 5 additions & 0 deletions crates/autopilot/src/database/quotes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ impl Postgres {
&self,
orders: impl Iterator<Item = &domain::OrderUid>,
) -> Result<HashMap<domain::OrderUid, domain::Quote>, sqlx::Error> {
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["read_quotes"])
.start_timer();

let mut ex = self.pool.acquire().await?;
let order_uids: Vec<_> = orders.map(|uid| ByteArray(uid.0)).collect();
let quotes: HashMap<_, _> = database::orders::read_quotes(&mut ex, &order_uids)
Expand Down
147 changes: 143 additions & 4 deletions crates/autopilot/src/infra/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use {
infra::persistence::dto::AuctionId,
},
anyhow::Context,
bigdecimal::ToPrimitive,
boundary::database::byte_array::ByteArray,
chrono::Utc,
chrono::{DateTime, Utc},
database::{
order_events::OrderEventLabel,
orders::{
Expand All @@ -22,10 +23,14 @@ use {
SellTokenSource as DomainSellTokenSource,
SigningScheme as DomainSigningScheme,
},
number::conversions::{big_decimal_to_u256, u256_to_big_decimal},
futures::{StreamExt, TryStreamExt},
itertools::Itertools,
number::conversions::{big_decimal_to_u256, u256_to_big_decimal, u256_to_big_uint},
primitive_types::{H160, H256},
shared::db_order_conversions::full_order_into_model_order,
std::{
collections::{HashMap, HashSet},
ops::DerefMut,
sync::Arc,
},
tracing::Instrument,
Expand Down Expand Up @@ -71,12 +76,13 @@ impl Persistence {
.map_err(DatabaseError)
}

pub async fn solvable_orders(
/// Finds solvable orders based on the order's min validity period.
pub async fn all_solvable_orders(
&self,
min_valid_to: u32,
) -> Result<boundary::SolvableOrders, DatabaseError> {
self.postgres
.solvable_orders(min_valid_to)
.all_solvable_orders(min_valid_to)
.await
.map_err(DatabaseError)
}
Expand Down Expand Up @@ -395,6 +401,139 @@ impl Persistence {
Ok(solution)
}

/// Computes solvable orders based on the latest observed block number,
/// order creation timestamp, and minimum validity period.
pub async fn solvable_order_after(
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
&self,
current_orders: HashMap<domain::OrderUid, model::order::Order>,
after_timestamp: DateTime<Utc>,
after_block: u64,
min_valid_to: u32,
) -> anyhow::Result<boundary::SolvableOrders> {
let after_block = i64::try_from(after_block).context("block number value exceeds i64")?;
let mut tx = self.postgres.pool.begin().await.context("begin")?;
// Set the transaction isolation level to REPEATABLE READ
// so all the SELECT queries below are executed in the same database snapshot
// taken at the moment before the first query is executed.
sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
.execute(tx.deref_mut())
.await?;

// Fetch the orders that were updated after the given block and were created or
// cancelled after the given timestamp.
let next_orders: HashMap<domain::OrderUid, model::order::Order> = {
let _timer = Metrics::get()
.database_queries
.with_label_values(&["open_orders_after"])
.start_timer();

database::orders::open_orders_after(&mut tx, after_block, after_timestamp)
.map(|result| match result {
Ok(order) => full_order_into_model_order(order)
.map(|order| (domain::OrderUid(order.metadata.uid.0), order)),
Err(err) => Err(anyhow::Error::from(err)),
})
.try_collect()
.await?
};

// Fetch quotes for new orders and also update them for the cached ones since
// they could also be updated.
Comment on lines +440 to +441
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also needed because ethflow orders could theoretically be reorged?
We should really find a way to implement ethflow orders in a nicer way to remove all those ugly edge cases. :/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also needed because ethflow orders could theoretically be reorged?

Because of this code:

// We only need to insert quotes for orders that will be included in an
// auction (they are needed to compute solver rewards). If placement
// failed, then the quote is not needed.
insert_quotes(
transaction,
quotes
.clone()
.into_iter()
.flatten()
.collect::<Vec<_>>()
.as_slice(),
)
.await
.context("appending quotes for onchain orders failed")?;

let updated_quotes = {
let _timer = Metrics::get()
.database_queries
.with_label_values(&["read_quotes"])
.start_timer();

let all_order_uids = next_orders
.keys()
.chain(current_orders.keys())
.unique()
.map(|uid| ByteArray(uid.0))
.collect::<Vec<_>>();

database::orders::read_quotes(&mut tx, &all_order_uids)
.await?
.into_iter()
.filter_map(|quote| {
let order_uid = domain::OrderUid(quote.order_uid.0);
dto::quote::into_domain(quote)
.map_err(|err| {
tracing::warn!(?order_uid, ?err, "failed to convert quote from db")
})
.ok()
.map(|quote| (order_uid, quote))
})
.collect()
};

let latest_settlement_block = database::orders::latest_settlement_block(&mut tx)
.await?
.to_u64()
.context("latest_settlement_block is not u64")?;

Self::build_solvable_orders(
current_orders,
next_orders,
updated_quotes,
latest_settlement_block,
min_valid_to,
)
}

fn build_solvable_orders(
mut current_orders: HashMap<domain::OrderUid, model::order::Order>,
next_orders: HashMap<domain::OrderUid, model::order::Order>,
mut next_quotes: HashMap<domain::OrderUid, domain::Quote>,
latest_settlement_block: u64,
min_valid_to: u32,
) -> anyhow::Result<boundary::SolvableOrders> {
// Blindly insert all new orders into the cache.
for (uid, order) in next_orders {
current_orders.insert(uid, order);
}

// Filter out all the invalid orders.
current_orders.retain(|_uid, order| {
let expired = order.data.valid_to < min_valid_to
|| order
.metadata
.ethflow_data
.as_ref()
.is_some_and(|data| data.user_valid_to < i64::from(min_valid_to));

let invalidated = order.metadata.invalidated;
let onchain_error = order
.metadata
.onchain_order_data
.as_ref()
.is_some_and(|data| data.placement_error.is_some());
let fulfilled = {
match order.data.kind {
model::order::OrderKind::Sell => {
order.metadata.executed_sell_amount
>= u256_to_big_uint(&order.data.sell_amount)
}
model::order::OrderKind::Buy => {
order.metadata.executed_buy_amount
>= u256_to_big_uint(&order.data.buy_amount)
}
}
};

!expired && !invalidated && !onchain_error && !fulfilled
});

// Keep only relevant quotes.
next_quotes.retain(|uid, _quote| current_orders.contains_key(uid));

Ok(boundary::SolvableOrders {
orders: current_orders,
quotes: next_quotes,
latest_settlement_block,
})
}

/// Returns the oldest settlement event for which the accociated auction is
/// not yet populated in the database.
pub async fn get_settlement_without_auction(
Expand Down
2 changes: 1 addition & 1 deletion crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl RunLoop {
}

async fn cut_auction(&self) -> Option<domain::AuctionWithId> {
let auction = match self.solvable_orders_cache.current_auction() {
let auction = match self.solvable_orders_cache.current_auction().await {
Some(auction) => auction,
None => {
tracing::debug!("no current auction");
Expand Down
Loading
Loading