Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add explicit bound to amortized clean-up #2489

Merged
merged 3 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/internet_identity/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::stats::activity_stats::activity_counter::active_anchor_counter::Activ
use crate::stats::activity_stats::activity_counter::authn_method_counter::AuthnMethodCounter;
use crate::stats::activity_stats::activity_counter::domain_active_anchor_counter::DomainActiveAnchorCounter;
use crate::stats::activity_stats::ActivityStats;
use crate::stats::event_stats::EventKey;
use crate::storage::anchor::Anchor;
use crate::storage::MAX_ENTRIES;
use crate::{random_salt, Storage};
Expand Down Expand Up @@ -103,6 +104,11 @@ pub struct PersistentState {
// event_aggregations is expected to have a lot of entries, thus counting by iterating over it is not
// an option.
pub event_aggregations_count: u64,
// Key into the event_data BTreeMap where the 24h tracking window starts.
// This key is used to prune old entries from the 24h event aggregations.
// If it is `none`, then the 24h pruning window starts from the newest entry in the event_data
// BTreeMap minus 24h.
pub event_stats_24h_pruning_start: Option<EventKey>,
}

impl Default for PersistentState {
Expand All @@ -118,6 +124,7 @@ impl Default for PersistentState {
max_inflight_captchas: DEFAULT_MAX_INFLIGHT_CAPTCHAS,
event_data_count: 0,
event_aggregations_count: 0,
event_stats_24h_pruning_start: None,
}
}
}
Expand Down
166 changes: 122 additions & 44 deletions src/internet_identity/src/stats/event_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use crate::stats::event_stats::event_aggregations::AGGREGATIONS;
use crate::stats::event_stats::Event::PruneEvent;
use crate::storage::Storage;
use crate::{state, DAY_NS, MINUTE_NS};
use candid::CandidType;
use ic_cdk::api::call::CallResult;
use ic_cdk::api::time;
use ic_cdk::{caller, id, trap};
Expand All @@ -73,12 +74,15 @@ use std::time::Duration;

/// This module defines the aggregations over the events.
mod event_aggregations;

pub use event_aggregations::*;

const MAX_EVENTS_TO_PRUNE: usize = 100;
frederikrothenberger marked this conversation as resolved.
Show resolved Hide resolved

#[cfg(test)]
mod tests;

#[derive(Deserialize, Serialize, Clone, Eq, PartialEq, Debug, Ord, PartialOrd)]
#[derive(Deserialize, Serialize, CandidType, Clone, Eq, PartialEq, Debug, Ord, PartialOrd)]
pub struct EventKey {
/// Timestamp of the event.
pub time: Timestamp,
Expand All @@ -104,6 +108,19 @@ impl EventKey {
counter: u16::MAX,
}
}

pub fn next_key(&self) -> Self {
match self.counter {
u16::MAX => Self {
time: self.time + 1,
counter: 0,
},
_ => Self {
time: self.time,
counter: self.counter + 1,
},
}
}
}

#[derive(Deserialize, Serialize, Clone, Eq, PartialEq, Debug)]
Expand Down Expand Up @@ -276,69 +293,130 @@ fn update_events_internal<M: Memory>(event: EventData, now: Timestamp, s: &mut S
s.event_data_count += 1;
});

let pruned_24h = if let Some((prev_key, _)) = s.event_data.iter_upper_bound(&current_key).next()
{
// `timestamp` denotes the time of the last event recorded just before `now`
// The difference (now - timestamp) is the time that has passed since the last event
// and hence the last time the daily stats were updated.
// Therefore, we need to prune all events from the daily aggregations that happened in
// that same difference, but 24 hours ago.
//
// |<----------------------- 24h ----------------------->|
// |--|--------------------------------------------------|--| --> time
// ^ ^ ^ ^
// | | | └ now
// | └ now - 24h (prune_window_end) └ timestamp
// └ timestamp - 24h (prune_window_start)

let prune_window_start = prev_key.time - DAY_NS;
let prune_window_end = now - DAY_NS;
s.event_data
.range(EventKey::min_key(prune_window_start)..=EventKey::max_key(prune_window_end))
.collect::<Vec<_>>()
} else {
// there is no event before `now` in the db, so the list of pruned events is empty
vec![]
};

let mut aggregations_db_wrapper = CountingAggregationsWrapper(&mut s.event_aggregations);
// Update 24h aggregations
AGGREGATIONS.iter().for_each(|aggregation| {
update_aggregation(
|(_, data)| aggregation.process_event(AggregationWindow::Day, data),
current_key.clone(),
event.clone(),
&pruned_24h,
&mut CountingAggregationsWrapper(&mut s.event_aggregations),
);
});
update_aggregations(
&current_key,
&event,
&prune_events_24h(now, &current_key, &s.event_data),
AggregationWindow::Day,
&mut aggregations_db_wrapper,
);

// This pruning _deletes_ the data older than 30 days. Do this after the 24h aggregation
// otherwise the daily stats become inaccurate on the unlikely event that there is no activity
// for 30 days.
let pruned_30d = prune_events(&mut s.event_data, now);

let pruned_30d = prune_events_30d(&mut s.event_data, now);
// Update 30d aggregations
update_aggregations(
&current_key,
&event,
&pruned_30d,
AggregationWindow::Month,
&mut aggregations_db_wrapper,
);
}

/// Iterates over all aggregations and updates them based on the new event and the pruned events.
fn update_aggregations<M: Memory>(
event_key: &EventKey,
event_data: &EventData,
pruned_events: &[(EventKey, EventData)],
window: AggregationWindow,
aggregations_db: &mut CountingAggregationsWrapper<M>,
) {
AGGREGATIONS.iter().for_each(|aggregation| {
update_aggregation(
|(_, data)| aggregation.process_event(AggregationWindow::Month, data),
current_key.clone(),
event.clone(),
&pruned_30d,
&mut CountingAggregationsWrapper(&mut s.event_aggregations),
|(_, data)| aggregation.process_event(window.clone(), data),
event_key.clone(),
event_data.clone(),
pruned_events,
aggregations_db,
);
});
}

/// Adds an event to the event_data map and simultaneously removes events older than the retention period (30d).
/// Collects events older than 24h that need to be removed from the 24h aggregations.
/// Given events are kept for 30 days, the events are not deleted from the supplied `db`.
/// Instead, this function simply updates the `event_stats_24h_pruning_start` state variable
/// that denotes the first event that should be pruned in the next call.
///
/// Returns a vec of tuples of the pruned events and their timestamps, at most [MAX_EVENTS_TO_PRUNE].
fn prune_events_24h<M: Memory>(
frederikrothenberger marked this conversation as resolved.
Show resolved Hide resolved
now: Timestamp,
current_key: &EventKey,
db: &StableBTreeMap<EventKey, EventData, M>,
) -> Vec<(EventKey, EventData)> {
/// Calculates the 24h window start based on the current key:
/// - The current key is used to find the last event right before it.
/// - The timestamp of that key is then shifted back 24h.
///
/// This assumes that the 24h window has been correctly pruned in the past, including up to the
/// previous event.
///
/// ```
/// |<----------------------- 24h ----------------------->|
/// |--|--------------------------------------------------|--| --> time
/// ^ ^ ^ ^
/// | | | └ current_key
/// | └ now - 24h (prune_window_end) └ previous event
/// └ previous event timestamp - 24h (prune_window_start)
/// ```
fn prune_window_start_from_current_key<M: Memory>(
current_key: &EventKey,
db: &StableBTreeMap<EventKey, EventData, M>,
) -> Option<EventKey> {
db.iter_upper_bound(current_key)
.next()
.map(|(k, _)| EventKey::min_key(k.time - DAY_NS))
}

let prune_window_start =
// Continue pruning from the last key. This value will be set if the 24h window has been pruned
// before.
state::persistent_state(|s| s.event_stats_24h_pruning_start.clone()).or_else(|| {
// Alternatively, calculate it from the current key. This is necessary in two cases:
// - the events have never been pruned before because they are not yet 24h old.
// - this is the first event after an II upgrade from a version that did not have this
// state variable to track the beginning of the 24h window.
prune_window_start_from_current_key(current_key, db)
});

let Some(prune_start_key) = prune_window_start else {
// there is no key to start pruning from, so the list of pruned events is empty.
return vec![];
};

// Always aim to prune up to 24h ago, event if past attempts did not manage due to the
frederikrothenberger marked this conversation as resolved.
Show resolved Hide resolved
// MAX_EVENTS_TO_PRUNE limit.
let prune_window_end = now - DAY_NS;
let events = db
.range(prune_start_key..=EventKey::max_key(prune_window_end))
.take(MAX_EVENTS_TO_PRUNE)
.collect::<Vec<_>>();

// update the persistent state with they key _after_ the one being pointed to by the last event
// so that the next amortized pruning can continue from there.
if let Some((k, _)) = events.last() {
state::persistent_state_mut(|s| {
s.event_stats_24h_pruning_start = Some(k.next_key());
});
}
events
}

/// Removes events older than the retention period (30d).
/// Returns a vec of tuples of the pruned events and their timestamps.
fn prune_events<M: Memory>(
/// Prunes at most [MAX_EVENTS_TO_PRUNE].
fn prune_events_30d<M: Memory>(
db: &mut StableBTreeMap<EventKey, EventData, M>,
now: Timestamp,
) -> Vec<(EventKey, EventData)> {
const RETENTION_PERIOD: u64 = 30 * DAY_NS;

let pruned_events: Vec<_> = db
.range(..=EventKey::max_key(now - RETENTION_PERIOD))
.take(MAX_EVENTS_TO_PRUNE)
.collect();
for entry in &pruned_events {
let entry: &(EventKey, EventData) = entry;
Expand Down
84 changes: 84 additions & 0 deletions src/internet_identity/src/stats/event_stats/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,90 @@ fn should_prune_monthly_events_after_30d() {
);
}

#[test]
fn should_prune_at_most_100_events_24h() {
frederikrothenberger marked this conversation as resolved.
Show resolved Hide resolved
let mut storage = test_storage();
let event = EventData {
event: Event::PrepareDelegation(PrepareDelegationEvent {
ii_domain: Some(IIDomain::Ic0App),
frontend: EXAMPLE_URL.to_string(),
session_duration_ns: to_ns(SESS_DURATION_SEC),
}),
};
let aggregation_key = AggregationKey::new(
PrepareDelegationCount,
Day,
Some(IIDomain::Ic0App),
EXAMPLE_URL.to_string(),
);

for _ in 0..107 {
update_events_internal(event.clone(), TIMESTAMP, &mut storage);
}

update_events_internal(event.clone(), TIMESTAMP + DAY_NS, &mut storage);
assert_event_count_consistent(&mut storage);

// Of the 107 initial events, 100 should be pruned, 1 was added to trigger the pruning
// --> 8 expected events
assert_eq!(storage.event_aggregations.get(&aggregation_key).unwrap(), 8);
assert_eq!(
persistent_state(|s| s.event_stats_24h_pruning_start.clone()).unwrap(),
EventKey {
time: TIMESTAMP,
counter: 100
}
);
update_events_internal(event.clone(), TIMESTAMP + 2 * DAY_NS, &mut storage);
assert_event_count_consistent(&mut storage);
// Prune again, after another 24h leaving only the event that triggered the pruning
// --> 1 expected events
assert_eq!(storage.event_aggregations.get(&aggregation_key).unwrap(), 1);
assert_eq!(
persistent_state(|s| s.event_stats_24h_pruning_start.clone()).unwrap(),
EventKey {
time: TIMESTAMP + DAY_NS,
frederikrothenberger marked this conversation as resolved.
Show resolved Hide resolved
counter: 108
}
);
}

#[test]
fn should_prune_at_most_100_events_30d() {
let mut storage = test_storage();
let event = EventData {
event: Event::PrepareDelegation(PrepareDelegationEvent {
ii_domain: Some(IIDomain::Ic0App),
frontend: EXAMPLE_URL.to_string(),
session_duration_ns: to_ns(SESS_DURATION_SEC),
}),
};
let aggregation_key = AggregationKey::new(
PrepareDelegationCount,
Month,
Some(IIDomain::Ic0App),
EXAMPLE_URL.to_string(),
);

for _ in 0..107 {
update_events_internal(event.clone(), TIMESTAMP, &mut storage);
}

update_events_internal(event.clone(), TIMESTAMP + 30 * DAY_NS, &mut storage);
assert_event_count_consistent(&mut storage);

// Of the 107 initial events, 100 should be pruned, 1 was added to trigger the pruning
// --> 8 expected events
assert_eq!(storage.event_aggregations.get(&aggregation_key).unwrap(), 8);
assert_eq!(storage.event_data.len(), 8);
update_events_internal(event.clone(), TIMESTAMP + 60 * DAY_NS, &mut storage);
assert_event_count_consistent(&mut storage);
// Prune again, after another 30d leaving only the event that triggered the pruning
// --> 1 expected events
assert_eq!(storage.event_aggregations.get(&aggregation_key).unwrap(), 1);
assert_eq!(storage.event_data.len(), 1);
}

#[test]
fn should_account_for_dapps_changing_session_lifetime() {
let mut storage = test_storage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::stats::activity_stats::activity_counter::active_anchor_counter::Activ
use crate::stats::activity_stats::activity_counter::authn_method_counter::AuthnMethodCounter;
use crate::stats::activity_stats::activity_counter::domain_active_anchor_counter::DomainActiveAnchorCounter;
use crate::stats::activity_stats::ActivityStats;
use crate::stats::event_stats::EventKey;
use candid::{CandidType, Deserialize};
use ic_stable_structures::storable::Bound;
use ic_stable_structures::Storable;
Expand All @@ -30,6 +31,7 @@ pub struct StorablePersistentState {
event_data_count: Option<u64>,
// opt of backwards compatibility
event_aggregations_count: Option<u64>,
event_stats_24h_pruning_start: Option<EventKey>,
}

impl Storable for StorablePersistentState {
Expand Down Expand Up @@ -66,6 +68,7 @@ impl From<PersistentState> for StorablePersistentState {
max_inflight_captchas: s.max_inflight_captchas,
event_data_count: Some(s.event_data_count),
event_aggregations_count: Some(s.event_aggregations_count),
event_stats_24h_pruning_start: s.event_stats_24h_pruning_start,
}
}
}
Expand All @@ -82,6 +85,7 @@ impl From<StorablePersistentState> for PersistentState {
max_inflight_captchas: s.max_inflight_captchas,
event_data_count: s.event_data_count.unwrap_or_default(),
event_aggregations_count: s.event_aggregations_count.unwrap_or_default(),
event_stats_24h_pruning_start: s.event_stats_24h_pruning_start,
}
}
}
Expand Down Expand Up @@ -120,6 +124,7 @@ mod tests {
max_inflight_captchas: DEFAULT_MAX_INFLIGHT_CAPTCHAS,
event_data_count: Some(0),
event_aggregations_count: Some(0),
event_stats_24h_pruning_start: None,
};

assert_eq!(StorablePersistentState::default(), expected_defaults);
Expand All @@ -137,6 +142,7 @@ mod tests {
max_inflight_captchas: DEFAULT_MAX_INFLIGHT_CAPTCHAS,
event_data_count: 0,
event_aggregations_count: 0,
event_stats_24h_pruning_start: None,
};
assert_eq!(PersistentState::default(), expected_defaults);
}
Expand Down
Loading
Loading