Skip to content

Commit

Permalink
Fix some flaky local_node e2e tests (#2959)
Browse files Browse the repository at this point in the history
# Problem 1
[flaky
test](https://github.com/cowprotocol/services/actions/runs/10742540060/job/29795363509)

Current hypothesis:
There appears to be a race condition that causes the incremental orders
cache to sometimes miss new orders which means they can't get settled in
the test.
Consider this case:
1. orderbook receives POST order request
2. orderbook computes the current timestamp
3. orderbook spends some time doing extra checks
4. autopilot updates open orders with current timestmp
5. autopilot cache checkpoint is now AFTER order creation timestamp
6. orderbook finally adds the order to the DB (which now has creation
timestamp **before** last DB checkpoint)
7. autopilot updates open orders but only checks for orders created
**after** `5` which means it will never see the new order

I already tried to let postgres populate the order's
`creation_timestamp` with `now()` when executing the insertion but that
still didn't resolve the issues. This might be caused by the rust code
and postgres reporting different timestamps for the exact same point in
time. But maybe it could be related to how DB transactions might have
inconsistent views on the tables. 🤷‍♂️

I ended up resolving the issue by moving the DB checkpoint timestamp (of
the incremental orders cache) 1 minute into the past creating a buffer
period which will mitigate race conditions due to timestamp issues. This
means we'll re-fetch orders created or cancelled in the last minute more
often than necessary but given that the query is way faster now this
should not be an issue.

# Problem 2
[Flaky
test](https://github.com/cowprotocol/services/actions/runs/10746259879/job/29806839501?pr=2959#logs)

The issue seems to be that the DB can sometimes deadlock when multiple
transactions are waiting for each other. Luckily postgres is able to
detect those deadlocks and simply cancels on of the transactions. If the
transaction to truncate the tables fails we simply retry it until it
works.

# Problem 3
Native price cache expiry is 30s by default which is sometimes too long
for tests which require a price for a token to first be `NoLiquidity`
and an actual price later on (default wait timeout is also 30s). The
simple fix is to reduce the native price cache validity.

# Problem 4
[flaky
test](https://github.com/cowprotocol/services/actions/runs/10750833964/job/29817436705?pr=2959)

Many tests are synchronized by waiting for the current auction to reach
a specific number of orders. This is very prone to race conditions so I
updated those tests to use safer ways to synchronize on the events.
I also added the ability to return `Option<bool>` and `Result<bool>`
from the future we try to await to make some of the code more ergonomic.

## How to test
Ran flaky test runner for a long while ([41 successful
runs](https://github.com/cowprotocol/services/actions/runs/10753333251/job/29822651957?pr=2959)
for all tests). It still encountered an error but I was not actually
able to reproduce this error but running only that error in a loop
([>200
tries](https://github.com/cowprotocol/services/actions/runs/10754405993/job/29824987289?pr=2959)).
It looks to me that some errors have a higher likelihood if you run them
together with a bunch of other test over an over. But since this is not
how we run our tests for real I think I'll call it a day here.
  • Loading branch information
MartinquaXD authored Sep 9, 2024
1 parent 8f36763 commit a34ba2a
Show file tree
Hide file tree
Showing 18 changed files with 294 additions and 368 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/pull-request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ jobs:
with:
file: docker-compose.yaml
up-opts: -d db migrations
- run: cargo nextest run -p e2e forked_node --nocapture --run-ignored ignored-only
- run: cargo nextest run -p e2e forked_node --test-threads 1 --run-ignored ignored-only --failure-output final

test-driver:
timeout-minutes: 60
Expand Down Expand Up @@ -212,7 +212,7 @@ jobs:
attempt=1
while true; do
echo "Running test attempt #$attempt"
if ! cargo nextest run -p e2e local_node_no_liquidity_limit_order --test-threads 1 --failure-output final --run-ignored ignored-only; then
if ! cargo nextest run -p e2e local_node --test-threads 1 --failure-output final --run-ignored ignored-only; then
exit 1
fi
attempt=$((attempt+1))
Expand Down
2 changes: 2 additions & 0 deletions crates/autopilot/src/boundary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,7 @@ pub struct SolvableOrders {
pub orders: HashMap<domain::OrderUid, model::order::Order>,
pub quotes: HashMap<domain::OrderUid, domain::Quote>,
pub latest_settlement_block: u64,
/// Used as a checkpoint - meaning at this point in time
/// **at least** the stored orders were present in the system.
pub fetched_from_db: chrono::DateTime<chrono::Utc>,
}
1 change: 1 addition & 0 deletions crates/autopilot/src/infra/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ impl Persistence {
after_block: u64,
min_valid_to: u32,
) -> anyhow::Result<boundary::SolvableOrders> {
tracing::debug!(?after_timestamp, ?after_block, "fetch orders updated since");
let after_block = i64::try_from(after_block).context("block number value exceeds i64")?;
let started_at = chrono::offset::Utc::now();
let mut tx = self.postgres.pool.begin().await.context("begin")?;
Expand Down
11 changes: 10 additions & 1 deletion crates/autopilot/src/solvable_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ impl SolvableOrdersCache {
.collect::<Result<_, _>>()?,
surplus_capturing_jit_order_owners,
};

*self.cache.lock().await = Some(Inner {
auction,
solvable_orders: db_solvable_orders,
Expand Down Expand Up @@ -360,7 +361,15 @@ impl SolvableOrdersCache {
None => self.persistence.all_solvable_orders(min_valid_to).boxed(),
};

fetch_orders.await
let mut orders = fetch_orders.await?;

// Move the checkpoint slightly back in time to mitigate race conditions
// caused by inconsistencies of stored timestamps. See #2959 for more details.
// This will cause us to fetch orders created or cancelled in the buffer
// period multiple times but that is a small price to pay for not missing
// orders.
orders.fetched_from_db -= chrono::TimeDelta::seconds(60);
Ok(orders)
}

/// Executed orders filtering in parallel.
Expand Down
31 changes: 27 additions & 4 deletions crates/e2e/src/setup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,19 @@ pub fn config_tmp_file<C: AsRef<[u8]>>(content: C) -> TempPath {
/// long time.
pub const TIMEOUT: Duration = Duration::from_secs(30);

/// Repeatedly evaluate condition until it returns true or the timeout is
/// reached. If condition evaluates to true, Ok(()) is returned. If the timeout
/// Repeatedly evaluates condition until it returns a truthy value
/// (true, Some(true), Result(true)) or the timeout is reached.
/// If condition evaluates to truthy, Ok(()) is returned. If the timeout
/// is reached Err is returned.
pub async fn wait_for_condition<Fut>(
timeout: Duration,
mut condition: impl FnMut() -> Fut,
) -> Result<()>
where
Fut: Future<Output = bool>,
Fut: Future<Output: AwaitableCondition>,
{
let start = std::time::Instant::now();
while !condition().await {
while !condition().await.was_successful() {
tokio::time::sleep(Duration::from_millis(200)).await;
if start.elapsed() > timeout {
return Err(anyhow!("timeout"));
Expand All @@ -58,6 +59,28 @@ where
Ok(())
}

pub trait AwaitableCondition {
fn was_successful(&self) -> bool;
}

impl AwaitableCondition for bool {
fn was_successful(&self) -> bool {
*self
}
}

impl AwaitableCondition for Option<bool> {
fn was_successful(&self) -> bool {
self.is_some_and(|inner| inner)
}
}

impl AwaitableCondition for Result<bool> {
fn was_successful(&self) -> bool {
self.as_ref().is_ok_and(|inner| *inner)
}
}

static NODE_MUTEX: Mutex<()> = Mutex::new(());

const DEFAULT_FILTERS: &[&str] = &[
Expand Down
37 changes: 29 additions & 8 deletions crates/e2e/src/setup/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ impl<'a> Services<'a> {
"--amount-to-estimate-prices-with=1000000000000000000".to_string(),
"--block-stream-poll-interval=1s".to_string(),
"--simulation-node-url=http://localhost:8545".to_string(),
"--native-price-cache-max-age=2s".to_string(),
"--native-price-prefetch-time=500ms".to_string(),
]
.into_iter()
}
Expand Down Expand Up @@ -324,6 +326,9 @@ impl<'a> Services<'a> {
.expect("waiting for autopilot timed out");
}

/// Fetches the current auction. Don't use this as a synchronization
/// mechanism in tests because that is prone to race conditions
/// which would make tests flaky.
pub async fn get_auction(&self) -> dto::AuctionWithId {
let response = self
.http
Expand Down Expand Up @@ -439,10 +444,6 @@ impl<'a> Services<'a> {
}
}

pub async fn solvable_orders(&self) -> usize {
self.get_auction().await.auction.orders.len()
}

/// Retrieve an [`Order`]. If the respons status is not `200`, return the
/// status and the body.
pub async fn get_order(&self, uid: &OrderUid) -> Result<Order, (StatusCode, String)> {
Expand Down Expand Up @@ -605,10 +606,30 @@ impl<'a> Services<'a> {

pub async fn clear_database() {
tracing::info!("Clearing database.");
let mut db = sqlx::PgConnection::connect(LOCAL_DB_URL).await.unwrap();
let mut db = db.begin().await.unwrap();
database::clear_DANGER_(&mut db).await.unwrap();
db.commit().await.unwrap();

async fn truncate_tables() -> Result<(), sqlx::Error> {
let mut db = sqlx::PgConnection::connect(LOCAL_DB_URL).await?;
let mut db = db.begin().await?;
database::clear_DANGER_(&mut db).await?;
db.commit().await
}

// This operation can fail when postgres detects a deadlock.
// It will terminate one of the deadlocking requests and if it decideds
// to terminate this request we need to retry it.
let mut attempt = 0;
loop {
match truncate_tables().await {
Ok(_) => return,
Err(err) => {
tracing::error!(?err, "failed to truncate tables");
}
}
attempt += 1;
if attempt >= 10 {
panic!("repeatedly failed to clear DB");
}
}
}

pub type Db = sqlx::Pool<sqlx::Postgres>;
26 changes: 17 additions & 9 deletions crates/e2e/tests/e2e/cow_amm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use {
SigningScheme,
Solution,
},
std::collections::HashMap,
std::collections::{HashMap, HashSet},
web3::signing::SecretKeyRef,
};

Expand Down Expand Up @@ -551,7 +551,10 @@ async fn cow_amm_driver_support(web3: Web3) {

wait_for_condition(TIMEOUT, || async {
let auctions = mock_solver.get_auctions();
let found_cow_amms = &auctions.last().unwrap().surplus_capturing_jit_order_owners;
let found_cow_amms: HashSet<_> = auctions
.iter()
.flat_map(|a| a.surplus_capturing_jit_order_owners.clone())
.collect();

expected_cow_amms
.iter()
Expand All @@ -576,13 +579,18 @@ async fn cow_amm_driver_support(web3: Web3) {

wait_for_condition(TIMEOUT, || async {
let auctions = mock_solver.get_auctions();
let auction_prices = &auctions.last().unwrap().tokens;

expected_prices.iter().all(|token| {
auction_prices
.get(token)
.is_some_and(|t| t.reference_price.is_some())
})
let auction_prices: HashSet<_> = auctions
.iter()
.flat_map(|a| {
a.tokens
.iter()
.filter_map(|(token, info)| info.reference_price.map(|_| token))
})
.collect();

expected_prices
.iter()
.all(|token| auction_prices.contains(token))
})
.await
.unwrap();
Expand Down
19 changes: 4 additions & 15 deletions crates/e2e/tests/e2e/eth_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ async fn eth_integration(web3: Web3) {
);

let trader_a_eth_balance_before = web3.eth().balance(trader_a.address(), None).await.unwrap();
let trader_b_eth_balance_before = web3.eth().balance(trader_b.address(), None).await.unwrap();

let services = Services::new(onchain.contracts()).await;
services.start_protocol(solver).await;
Expand Down Expand Up @@ -103,23 +102,13 @@ async fn eth_integration(web3: Web3) {
services.create_order(&order_buy_eth_b).await.unwrap();

tracing::info!("Waiting for trade.");
wait_for_condition(TIMEOUT, || async { services.solvable_orders().await == 2 })
.await
.unwrap();

let trade_happened = || async {
let balance_a = web3.eth().balance(trader_a.address(), None).await.unwrap();
let balance_b = web3.eth().balance(trader_b.address(), None).await.unwrap();
balance_a != trader_a_eth_balance_before && balance_b != trader_b_eth_balance_before

let trader_a_eth_decreased = (balance_a - trader_a_eth_balance_before) == to_wei(49);
let trader_b_eth_increased = balance_b >= to_wei(49);
trader_a_eth_decreased && trader_b_eth_increased
};
wait_for_condition(TIMEOUT, trade_happened).await.unwrap();

// Check matching
let trader_a_eth_balance_after = web3.eth().balance(trader_a.address(), None).await.unwrap();
let trader_b_eth_balance_after = web3.eth().balance(trader_b.address(), None).await.unwrap();
assert_eq!(
trader_a_eth_balance_after - trader_a_eth_balance_before,
to_wei(49)
);
assert!(trader_b_eth_balance_after >= to_wei(49));
}
90 changes: 15 additions & 75 deletions crates/e2e/tests/e2e/ethflow.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use {
anyhow::bail,
autopilot::{
database::onchain_order_events::ethflow_events::WRAP_ALL_SELECTOR,
infra::persistence::dto,
},
autopilot::database::onchain_order_events::ethflow_events::WRAP_ALL_SELECTOR,
contracts::{CoWSwapEthFlow, ERC20Mintable, WETH9},
database::order_events::OrderEventLabel,
e2e::{nodes::local_node::TestNodeApi, setup::*, tx, tx_value},
Expand Down Expand Up @@ -105,11 +102,8 @@ async fn eth_flow_tx(web3: Web3) {
.await;

tracing::info!("waiting for trade");
wait_for_condition(TIMEOUT, || async { services.solvable_orders().await == 1 })
.await
.unwrap();

test_order_was_settled(&services, &ethflow_order, &web3).await;
test_order_was_settled(&ethflow_order, &web3).await;

// make sure the fee was charged for zero fee limit orders
let fee_charged = || async {
Expand Down Expand Up @@ -192,11 +186,7 @@ async fn eth_flow_indexing_after_refund(web3: Web3) {
submit_order(&ethflow_order, trader.account(), onchain.contracts()).await;

tracing::info!("waiting for trade");
wait_for_condition(TIMEOUT, || async { services.solvable_orders().await == 1 })
.await
.unwrap();

test_order_was_settled(&services, &ethflow_order, &web3).await;
test_order_was_settled(&ethflow_order, &web3).await;

// Check order events
let events = crate::database::events_of_order(
Expand Down Expand Up @@ -271,12 +261,6 @@ async fn test_order_availability_in_api(
for address in [owner, &contracts.ethflow.address()] {
test_account_query(address, services.client(), order, owner, contracts).await;
}

wait_for_condition(TIMEOUT, || async { services.solvable_orders().await == 1 })
.await
.unwrap();

test_auction_query(services, order, contracts).await;
}

async fn test_trade_availability_in_api(
Expand All @@ -299,21 +283,19 @@ async fn test_trade_availability_in_api(
}
}

async fn test_order_was_settled(
services: &Services<'_>,
ethflow_order: &ExtendedEthFlowOrder,
web3: &Web3,
) {
let auction_is_empty = || async { services.solvable_orders().await == 0 };
wait_for_condition(TIMEOUT, auction_is_empty).await.unwrap();
async fn test_order_was_settled(ethflow_order: &ExtendedEthFlowOrder, web3: &Web3) {
wait_for_condition(TIMEOUT, || async {
let buy_token = ERC20Mintable::at(web3, ethflow_order.0.buy_token);
let receiver_buy_token_balance = buy_token
.balance_of(ethflow_order.0.receiver)
.call()
.await
.expect("Unable to get token balance");

let buy_token = ERC20Mintable::at(web3, ethflow_order.0.buy_token);
let receiver_buy_token_balance = buy_token
.balance_of(ethflow_order.0.receiver)
.call()
.await
.expect("Unable to get token balance");
assert!(receiver_buy_token_balance >= ethflow_order.0.buy_amount);
receiver_buy_token_balance >= ethflow_order.0.buy_amount
})
.await
.unwrap();
}

async fn test_orders_query(
Expand Down Expand Up @@ -349,16 +331,6 @@ async fn test_account_query(
test_order_parameters(&response[0], order, owner, contracts).await;
}

async fn test_auction_query(
services: &Services<'_>,
order: &ExtendedEthFlowOrder,
contracts: &Contracts,
) {
let response = services.get_auction().await;
assert_eq!(response.auction.orders.len(), 1);
test_auction_order_parameters(&response.auction.orders[0], order, contracts).await;
}

enum TradeQuery {
ByUid(OrderUid),
ByOwner(H160),
Expand Down Expand Up @@ -424,38 +396,6 @@ async fn test_order_parameters(
assert_eq!(response.interactions.pre[0].call_data, WRAP_ALL_SELECTOR);
}

async fn test_auction_order_parameters(
response: &dto::order::Order,
order: &ExtendedEthFlowOrder,
contracts: &Contracts,
) {
// Expected values from actual EIP1271 order instead of eth-flow order
assert_eq!(response.valid_to, u32::MAX);
assert_eq!(response.owner, contracts.ethflow.address());
assert_eq!(response.sell_token, contracts.weth.address());

match order.0.fee_amount.is_zero() {
true => {
assert_eq!(response.class, OrderClass::Limit);
}
false => {
assert_eq!(response.class, OrderClass::Market);
}
}
assert!(order
.is_valid_cowswap_signature(&response.signature, contracts)
.await
.is_ok());

// Requires wrapping first
assert_eq!(response.pre_interactions.len(), 1);
assert_eq!(
response.pre_interactions[0].target,
contracts.ethflow.address()
);
assert_eq!(response.pre_interactions[0].call_data, WRAP_ALL_SELECTOR);
}

pub struct ExtendedEthFlowOrder(pub EthflowOrder);

impl ExtendedEthFlowOrder {
Expand Down
Loading

0 comments on commit a34ba2a

Please sign in to comment.