Skip to content

Commit

Permalink
fix(subscriber): Don't save poll_ops if no-one is receiving them (#501)
Browse files Browse the repository at this point in the history
Do not record poll_ops if there are no current connected clients
(watchers). Without this `Aggregator::poll_ops` would grow forever.

Follow up to #311 and
fix for these two:
- #184
- #500

Fixes #184 

Co-authored-by: Graham King <grahamk@nvidia.com>
Co-authored-by: Hayden Stainsby <hds@caffeineconcepts.com>
  • Loading branch information
3 people authored Jan 22, 2024
1 parent aab98e7 commit 1656c79
Showing 1 changed file with 18 additions and 14 deletions.
32 changes: 18 additions & 14 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
use super::{Command, Event, Shared, Watch};
use crate::{
stats::{self, Unsent},
ToProto, WatchRequest,
};
use console_api as proto;
use proto::resources::resource;
use tokio::sync::{mpsc, Notify};

use std::{
sync::{
atomic::{AtomicBool, Ordering::*},
Arc,
},
time::{Duration, Instant},
};

use console_api as proto;
use proto::resources::resource;
use tokio::sync::{mpsc, Notify};
use tracing_core::{span::Id, Metadata};

use super::{Command, Event, Shared, Watch};
use crate::{
stats::{self, Unsent},
ToProto, WatchRequest,
};

mod id_data;
mod shrink;
use self::id_data::{IdData, Include};
Expand Down Expand Up @@ -269,6 +270,9 @@ impl Aggregator {
.drop_closed(&mut self.resource_stats, now, self.retention, has_watchers);
self.async_ops
.drop_closed(&mut self.async_op_stats, now, self.retention, has_watchers);
if !has_watchers {
self.poll_ops.clear();
}
}

/// Add the task subscription to the watchers after sending the first update
Expand Down Expand Up @@ -305,14 +309,10 @@ impl Aggregator {
}

fn resource_update(&mut self, include: Include) -> proto::resources::ResourceUpdate {
let new_poll_ops = match include {
Include::All => self.poll_ops.clone(),
Include::UpdatedOnly => std::mem::take(&mut self.poll_ops),
};
proto::resources::ResourceUpdate {
new_resources: self.resources.as_proto_list(include, &self.base_time),
stats_update: self.resource_stats.as_proto(include, &self.base_time),
new_poll_ops,
new_poll_ops: std::mem::take(&mut self.poll_ops),
dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
}
}
Expand Down Expand Up @@ -472,6 +472,10 @@ impl Aggregator {
task_id,
is_ready,
} => {
// CLI doesn't show historical poll ops, so don't save them if no-one is watching
if self.watchers.is_empty() {
return;
}
let poll_op = proto::resources::PollOp {
metadata: Some(metadata.into()),
resource_id: Some(resource_id.into()),
Expand Down

0 comments on commit 1656c79

Please sign in to comment.