Skip to content

Commit

Permalink
[EASY] Change token quality cache to DashMap (#2974)
Browse files Browse the repository at this point in the history
# Description
Since the token quality cache is under the hood a `HashMap`, we would
benefit by changing it to a `DashMap`.

# Changes
- Change `RwLock` to `DashMap` in token quality cache.

## How to test
1. Regression test

---------

Co-authored-by: ilya <ilya@cow.fi>
  • Loading branch information
m-lord-renkse and squadgazzz authored Sep 12, 2024
1 parent fd63ff4 commit 8c863d5
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 55 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ cached = { workspace = true }
chrono = { workspace = true, features = ["clock"] }
clap = { workspace = true }
contracts = { path = "../contracts" }
dashmap = "6.1.0"
database = { path = "../database" }
ttl_cache = "0.5"
derivative = { workspace = true }
Expand Down
78 changes: 23 additions & 55 deletions crates/shared/src/bad_token/cache.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,32 @@
use {
super::{BadTokenDetecting, TokenQuality},
anyhow::Result,
dashmap::DashMap,
futures::future::join_all,
primitive_types::H160,
std::{
collections::HashMap,
ops::Div,
sync::Arc,
time::{Duration, Instant},
},
tokio::sync::RwLock,
};

pub struct CachingDetector {
inner: Box<dyn BadTokenDetecting>,
cache: RwLock<HashMap<H160, (Instant, TokenQuality)>>,
cache: DashMap<H160, (Instant, TokenQuality)>,
cache_expiry: Duration,
prefetch_time: Duration,
}

#[async_trait::async_trait]
impl BadTokenDetecting for CachingDetector {
async fn detect(&self, token: H160) -> Result<TokenQuality> {
if let Some(quality) = self.get_from_cache(&token, Instant::now()).await {
if let Some(quality) = self.get_from_cache(&token, Instant::now()) {
return Ok(quality);
}

let result = self.inner.detect(token).await?;
self.insert_into_cache(token, result.clone()).await;
self.cache.insert(token, (Instant::now(), result.clone()));
Ok(result)
}
}
Expand All @@ -52,29 +51,16 @@ impl CachingDetector {
detector
}

async fn get_from_cache(&self, token: &H160, now: Instant) -> Option<TokenQuality> {
match self.cache.read().await.get(token) {
Some((instant, quality))
if now.checked_duration_since(*instant).unwrap_or_default() < self.cache_expiry =>
{
Some(quality.clone())
}
_ => None,
}
}

async fn insert_into_cache(&self, token: H160, quality: TokenQuality) {
self.cache
.write()
.await
.insert(token, (Instant::now(), quality));
fn get_from_cache(&self, token: &H160, now: Instant) -> Option<TokenQuality> {
let (instant, quality) = self.cache.get(token)?.value().clone();
let still_valid = now.saturating_duration_since(instant) < self.cache_expiry;
still_valid.then_some(quality)
}

async fn insert_many_into_cache(&self, tokens: impl Iterator<Item = (H160, TokenQuality)>) {
let mut cache = self.cache.write().await;
fn insert_many_into_cache(&self, tokens: impl Iterator<Item = (H160, TokenQuality)>) {
let now = Instant::now();
tokens.into_iter().for_each(|(token, quality)| {
cache.insert(token, (now, quality));
self.cache.insert(token, (now, quality));
});
}

Expand All @@ -93,22 +79,14 @@ impl CachingDetector {
loop {
let start = Instant::now();

let expired_tokens: Vec<H160> = {
let cache = detector.cache.read().await;
let now = Instant::now();
cache
.iter()
.filter_map(|(token, (instant, _))| {
(now.checked_duration_since(*instant).unwrap_or_default()
>= prefetch_time_to_expire)
.then_some(*token)
})
.collect()
};

let results = join_all(expired_tokens.into_iter().map(|token| {
let futures = detector.cache.iter().filter_map(|entry| {
let (token, (instant, _)) = entry.pair();
let (token, instant) = (*token, *instant);
if start.saturating_duration_since(instant) < prefetch_time_to_expire {
return None;
}
let detector = detector.clone();
async move {
Some(async move {
match detector.inner.detect(token).await {
Ok(result) => Some((token, result)),
Err(err) => {
Expand All @@ -120,17 +98,13 @@ impl CachingDetector {
None
}
}
}
}))
.await
.into_iter()
.flatten();
})
});

detector.insert_many_into_cache(results).await;
let results = join_all(futures).await;
detector.insert_many_into_cache(results.into_iter().flatten());

let remaining_sleep = maintenance_timeout
.checked_sub(start.elapsed())
.unwrap_or_default();
let remaining_sleep = maintenance_timeout.saturating_sub(start.elapsed());
tokio::time::sleep(remaining_sleep).await;
}
});
Expand Down Expand Up @@ -175,18 +149,12 @@ mod tests {
Duration::from_millis(200),
);
let now = Instant::now();
detector
.cache
.write()
.await
.insert(token, (now, TokenQuality::Good));
detector.cache.insert(token, (now, TokenQuality::Good));
assert!(detector
.get_from_cache(&token, now + Duration::from_secs(1))
.await
.is_some());
assert!(detector
.get_from_cache(&token, now + Duration::from_secs(3))
.await
.is_none());
}

Expand Down

0 comments on commit 8c863d5

Please sign in to comment.