Skip to content

Commit

Permalink
Banned users filtering maintenance task (#2937)
Browse files Browse the repository at this point in the history
# Description
Banned user filtering is now part of the auction update critical path
and sometimes takes too much time.

![image](https://github.com/user-attachments/assets/678f769c-48c7-43b3-b4a0-904a7aaad5f7)

# Changes

To speed things up, a background maintenance task is introduced that
periodically re-checks for the address's banned status. Doesn't clean
the cache, so it can grow indefinitely.

Also, switches to `RwLock` to avoid thread blocking since the cache is
used in the orderbook's `/quote` endpoint.

## How to test
New DB test. Existing e2e tests.
  • Loading branch information
squadgazzz authored Sep 5, 2024
1 parent 521a7b4 commit 12aa87b
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 14 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/order-validation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ cached = { workspace = true }
contracts = { path = "../contracts" }
ethcontract = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }

[lints]
Expand Down
132 changes: 118 additions & 14 deletions crates/order-validation/src/banned.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,122 @@
use {
cached::{Cached, TimedCache},
contracts::ChainalysisOracle,
ethcontract::{errors::MethodError, futures::future::join_all, H160},
std::{collections::HashSet, sync::Mutex},
std::{
collections::{HashMap, HashSet},
sync::Arc,
time::{Duration, Instant},
},
tokio::sync::RwLock,
};

/// A list of banned users and an optional registry that can be checked onchain.
pub struct Users {
list: HashSet<H160>,
onchain: Option<Onchain>,
onchain: Option<Arc<Onchain>>,
}

#[derive(Clone)]
struct UserMetadata {
is_banned: bool,
last_updated: Instant,
}

struct Onchain {
contract: ChainalysisOracle,
cache: Mutex<TimedCache<H160, bool>>,
cache: RwLock<HashMap<H160, UserMetadata>>,
}

const TTL: u64 = 60 * 60; // 1 hour
impl Onchain {
pub fn new(contract: ChainalysisOracle) -> Arc<Self> {
let onchain = Arc::new(Self {
contract,
cache: Default::default(),
});

onchain.clone().spawn_maintenance_task();

onchain
}

/// Spawns a background task that periodically checks the cache for expired
/// entries and re-run checks for them.
///
/// Doesn't clean the cache, so it can grow indefinitely.
fn spawn_maintenance_task(self: Arc<Self>) {
let cache_expiry = Duration::from_secs(60 * 60);
let maintenance_timeout = Duration::from_secs(60);
let detector = Arc::clone(&self);

tokio::task::spawn(async move {
loop {
let start = Instant::now();

let expired_data: Vec<_> = {
let now = Instant::now();
let cache = detector.cache.read().await;
cache
.iter()
.filter_map(|(address, metadata)| {
let expired = now
.checked_duration_since(metadata.last_updated)
.unwrap_or_default()
>= cache_expiry - maintenance_timeout;

expired.then_some((*address, metadata.clone()))
})
.collect()
};

let results = join_all(expired_data.into_iter().map(|(address, metadata)| {
let detector = detector.clone();
async move {
match detector.fetch(address).await {
Ok(result) => Some((
address,
UserMetadata {
is_banned: result,
..metadata
},
)),
Err(err) => {
tracing::warn!(
?address,
?err,
"unable to determine banned status in the background task"
);
None
}
}
}
}))
.await
.into_iter()
.flatten();

detector.insert_many_into_cache(results).await;

let remaining_sleep = maintenance_timeout
.checked_sub(start.elapsed())
.unwrap_or_default();
tokio::time::sleep(remaining_sleep).await;
}
});
}

async fn insert_many_into_cache(&self, addresses: impl Iterator<Item = (H160, UserMetadata)>) {
let mut cache = self.cache.write().await;
let now = Instant::now();
for (address, metadata) in addresses {
cache.insert(
address,
UserMetadata {
last_updated: now,
..metadata
},
);
}
}
}

impl Users {
/// Creates a new `Users` instance that checks the hardcoded list and uses
Expand All @@ -25,10 +125,7 @@ impl Users {
pub fn new(contract: Option<ChainalysisOracle>, banned_users: Vec<H160>) -> Self {
Self {
list: HashSet::from_iter(banned_users),
onchain: contract.map(|contract| Onchain {
contract,
cache: Mutex::new(TimedCache::with_lifespan(TTL)),
}),
onchain: contract.map(Onchain::new),
}
}

Expand Down Expand Up @@ -71,12 +168,12 @@ impl Users {
};
let need_lookup: Vec<_> = {
// Scope here to release the lock before the async lookups
let mut cache = onchain.cache.lock().expect("unpoisoned");
let cache = onchain.cache.read().await;
need_lookup
.into_iter()
.filter(|address| {
if let Some(is_banned) = cache.cache_get(address) {
is_banned.then(|| banned.insert(*address));
if let Some(metadata) = &mut cache.get(address) {
metadata.is_banned.then(|| banned.insert(*address));
false
} else {
true
Expand All @@ -92,11 +189,18 @@ impl Users {
)
.await;

let mut cache = onchain.cache.lock().expect("unpoisoned");
let mut cache = onchain.cache.write().await;
let now = Instant::now();
for (address, result) in to_cache {
match result {
Ok(is_banned) => {
cache.cache_set(address, is_banned);
cache.insert(
address,
UserMetadata {
is_banned,
last_updated: now,
},
);
is_banned.then(|| banned.insert(address));
}
Err(err) => {
Expand Down

0 comments on commit 12aa87b

Please sign in to comment.