Skip to content

Commit

Permalink
slightly refactored bandwidth tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
jstuczyn committed Sep 18, 2024
1 parent 2a6aa13 commit 5753b79
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 23 deletions.
15 changes: 8 additions & 7 deletions common/credential-verification/src/bandwidth_storage_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,14 @@ impl<S: Storage + Clone + 'static> BandwidthStorageManager<S> {
// since we're going to be operating on a fair use policy anyway, even if we crash and let extra few packets
// through, that's completely fine
if self.client_bandwidth.should_sync(self.bandwidth_cfg).await {
let synced_bandwidth = self.sync_storage_bandwidth().await?;
self.client_bandwidth
.update_and_sync_data(synced_bandwidth)
.await
self.sync_storage_bandwidth().await?;
}

Ok(())
}

#[instrument(level = "trace", skip_all)]
async fn sync_storage_bandwidth(&mut self) -> Result<i64> {
async fn sync_storage_bandwidth(&mut self) -> Result<()> {
trace!("syncing client bandwidth with the underlying storage");
let updated = self
.storage
Expand All @@ -119,7 +116,11 @@ impl<S: Storage + Clone + 'static> BandwidthStorageManager<S> {
self.client_bandwidth.delta_since_sync().await,
)
.await?;
Ok(updated)

self.client_bandwidth
.resync_bandwidth_with_storage(updated)
.await;
Ok(())
}

/// Increases the amount of available bandwidth of the connected client by the specified value.
Expand All @@ -134,7 +135,7 @@ impl<S: Storage + Clone + 'static> BandwidthStorageManager<S> {
expiration: OffsetDateTime,
) -> Result<()> {
self.client_bandwidth
.increase_bandwidth_with_flushed(bandwidth.value() as i64, expiration)
.increase_bandwidth(bandwidth.value() as i64, expiration)
.await;

// any increases to bandwidth should get flushed immediately
Expand Down
26 changes: 10 additions & 16 deletions common/credential-verification/src/client_bandwidth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct ClientBandwidth {
#[derive(Debug)]
struct ClientBandwidthInner {
pub(crate) bandwidth: AvailableBandwidth,
pub(crate) last_flushed: OffsetDateTime,
pub(crate) last_synced: OffsetDateTime,

/// the number of bytes the client had during the last sync.
/// it is used to determine whether the current value should be synced with the storage
Expand All @@ -39,7 +39,7 @@ impl ClientBandwidth {
ClientBandwidth {
inner: Arc::new(RwLock::new(ClientBandwidthInner {
bandwidth,
last_flushed: OffsetDateTime::now_utc(),
last_synced: OffsetDateTime::now_utc(),
bytes_at_last_sync: bandwidth.bytes,
bytes_delta_since_sync: 0,
})),
Expand All @@ -53,7 +53,7 @@ impl ClientBandwidth {
return true;
}

if guard.last_flushed + cfg.client_bandwidth_max_flushing_rate < OffsetDateTime::now_utc() {
if guard.last_synced + cfg.client_bandwidth_max_flushing_rate < OffsetDateTime::now_utc() {
return true;
}

Expand Down Expand Up @@ -82,35 +82,29 @@ impl ClientBandwidth {
guard.bytes_delta_since_sync -= decrease;
}

pub(crate) async fn increase_bandwidth_with_flushed(
&self,
increase: i64,
expiration: OffsetDateTime,
) {
pub(crate) async fn increase_bandwidth(&self, increase: i64, new_expiration: OffsetDateTime) {
let mut guard = self.inner.write().await;

guard.bandwidth.bytes += increase;
guard.bandwidth.expiration = expiration;
guard.last_flushed = OffsetDateTime::now_utc();
guard.bytes_at_last_sync = guard.bandwidth.bytes;
guard.bandwidth.expiration = new_expiration;
guard.bytes_delta_since_sync += increase;
}

pub(crate) async fn expire_bandwidth(&self) {
let mut guard = self.inner.write().await;

guard.bandwidth = AvailableBandwidth::default();
guard.last_flushed = OffsetDateTime::now_utc();
guard.last_synced = OffsetDateTime::now_utc();
guard.bytes_at_last_sync = 0;
guard.bytes_delta_since_sync = 0;
}

pub(crate) async fn update_and_sync_data(&self, updated_bandwidth: i64) {
pub(crate) async fn resync_bandwidth_with_storage(&self, stored: i64) {
let mut guard = self.inner.write().await;

guard.bandwidth.bytes = updated_bandwidth;
guard.bytes_at_last_sync = updated_bandwidth;
guard.bandwidth.bytes = stored;
guard.bytes_at_last_sync = stored;
guard.bytes_delta_since_sync = 0;
guard.last_flushed = OffsetDateTime::now_utc();
guard.last_synced = OffsetDateTime::now_utc();
}
}

0 comments on commit 5753b79

Please sign in to comment.