Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
squadgazzz committed Aug 23, 2024
1 parent 8551f81 commit efc0d58
Showing 1 changed file with 38 additions and 48 deletions.
86 changes: 38 additions & 48 deletions crates/autopilot/src/database/auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ use {
crate::{boundary, domain, infra::persistence::dto},
anyhow::{Context, Result},
chrono::{DateTime, Utc},
database::orders::FullOrder,
ethcontract::jsonrpc::futures_util::stream::BoxStream,
futures::{StreamExt, TryStreamExt},
model::{order::Order, quote::QuoteId},
shared::{
db_order_conversions::full_order_into_model_order,
event_storing_helpers::{create_db_search_parameters, create_quote_row},
order_quoting::{QuoteData, QuoteSearchParameters, QuoteStoring},
},
sqlx::PgConnection,
std::{collections::HashMap, ops::DerefMut},
};

Expand Down Expand Up @@ -66,30 +69,8 @@ impl Postgres {
.with_label_values(&["solvable_orders"])
.start_timer();

let mut ex = self.pool.begin().await?;
// Set the transaction isolation level to REPEATABLE READ
// so the both SELECT queries below are executed in the same database snapshot
// taken at the moment before the first query is executed.
sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
.execute(ex.deref_mut())
.await?;
let orders: HashMap<domain::OrderUid, Order> =
database::orders::solvable_orders(&mut ex, min_valid_to as i64)
.map(|result| match result {
Ok(order) => full_order_into_model_order(order)
.map(|order| (domain::OrderUid(order.metadata.uid.0), order)),
Err(err) => Err(anyhow::Error::from(err)),
})
.try_collect()
.await?;
let latest_settlement_block =
database::orders::latest_settlement_block(&mut ex).await? as u64;
let quotes = self.read_quotes(orders.keys()).await?;
Ok(boundary::SolvableOrders {
orders,
quotes,
latest_settlement_block,
})
self.fetch_orders_data(|ex| database::orders::solvable_orders(ex, min_valid_to as i64))
.await
}

pub async fn orders_after(
Expand All @@ -99,9 +80,33 @@ impl Postgres {
) -> Result<boundary::SolvableOrders> {
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["solvable_orders_after"])
.with_label_values(&["orders_after"])
.start_timer();

self.fetch_orders_data(|ex| {
database::orders::full_orders_after(ex, after_timestamp, min_valid_to as i64)
})
.await
}

pub async fn replace_current_auction(&self, auction: &dto::Auction) -> Result<dto::AuctionId> {
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["replace_current_auction"])
.start_timer();

let data = serde_json::to_value(auction)?;
let mut ex = self.pool.begin().await?;
database::auction::delete_all_auctions(&mut ex).await?;
let id = database::auction::save(&mut ex, &data).await?;
ex.commit().await?;
Ok(id)
}

async fn fetch_orders_data<F>(&self, orders_fn: F) -> Result<boundary::SolvableOrders>
where
F: FnOnce(&mut PgConnection) -> BoxStream<'_, std::result::Result<FullOrder, sqlx::Error>>,
{
let mut ex = self.pool.begin().await?;
// Set the transaction isolation level to REPEATABLE READ
// so the both SELECT queries below are executed in the same database snapshot
Expand All @@ -110,15 +115,14 @@ impl Postgres {
.execute(ex.deref_mut())
.await?;

let orders: HashMap<domain::OrderUid, Order> =
database::orders::full_orders_after(&mut ex, after_timestamp, min_valid_to as i64)
.map(|result| match result {
Ok(order) => full_order_into_model_order(order)
.map(|order| (domain::OrderUid(order.metadata.uid.0), order)),
Err(err) => Err(anyhow::Error::from(err)),
})
.try_collect()
.await?;
let orders: HashMap<domain::OrderUid, Order> = orders_fn(&mut ex)
.map(|result| match result {
Ok(order) => full_order_into_model_order(order)
.map(|order| (domain::OrderUid(order.metadata.uid.0), order)),
Err(err) => Err(anyhow::Error::from(err)),
})
.try_collect()
.await?;
let latest_settlement_block =
database::orders::latest_settlement_block(&mut ex).await? as u64;
let quotes = self.read_quotes(orders.keys()).await?;
Expand All @@ -128,18 +132,4 @@ impl Postgres {
latest_settlement_block,
})
}

pub async fn replace_current_auction(&self, auction: &dto::Auction) -> Result<dto::AuctionId> {
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["replace_current_auction"])
.start_timer();

let data = serde_json::to_value(auction)?;
let mut ex = self.pool.begin().await?;
database::auction::delete_all_auctions(&mut ex).await?;
let id = database::auction::save(&mut ex, &data).await?;
ex.commit().await?;
Ok(id)
}
}

0 comments on commit efc0d58

Please sign in to comment.