Skip to content

Commit

Permalink
fix(subscriber): fix memory leak from historical PollOps (#311)
Browse files Browse the repository at this point in the history
## Motivation

Currently, `console-subscriber` retains all `PollOp`s that have been
recorded since the application started, and sends all of them in the
first resource update for a new subscription.

I don't think this is correct, for a few reasons. It's necessary to send
all currently live tasks and resources to new subscriptions, because
future events may reference those tasks or resources. This isn't the
case for `PollOp`s --- unlike tasks, resources, and even async ops, a
`PollOp` doesn't represent an *object*, it represents an *event*, a
single time an object was polled. There's no reason to send all previous
poll ops in the first update for a new subscription.

Storing all the poll ops that have ever occurred results in a memory
leak (see #256); unlike other tracked entities, we never clear anything
out of the vector of all poll ops that have ever occurred in the
program.

## Solution

This branch just removes the `all_poll_ops` `Vec` entirely, and changes
the subscriber to only send new poll ops in the first update.

We *could* change this so that poll ops are associated with a timestamp,
and older ones are cleared out of a vec of all poll ops when they time
out. However, the `tokio-console` CLI doesn't currently have a way of
showing historical poll ops _anyway_, so there's no reason to keep this
data. If that changes, we can put them back, but for now, the simplest
solution is to just remove this.

Fixes #256
  • Loading branch information
hawkw authored Mar 14, 2022
1 parent 3c55912 commit 9178ecf
Showing 1 changed file with 6 additions and 14 deletions.
20 changes: 6 additions & 14 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,10 @@ pub(crate) struct Aggregator {
/// Map of AsyncOp IDs to AsyncOp stats.
async_op_stats: IdData<Arc<stats::AsyncOpStats>>,

/// *All* PollOp events for AsyncOps on Resources.
///
/// This is sent to new clients as part of the initial state.
// TODO: drop the poll ops for async ops that have been dropped
all_poll_ops: ShrinkVec<proto::resources::PollOp>,

/// *New* PollOp events that whave occurred since the last update
/// `PollOp `events that have occurred since the last update
///
/// This is emptied on every state update.
new_poll_ops: Vec<proto::resources::PollOp>,
poll_ops: Vec<proto::resources::PollOp>,

/// The time "state" of the aggregator, such as paused or live.
temporality: Temporality,
Expand Down Expand Up @@ -157,8 +151,7 @@ impl Aggregator {
resource_stats: IdData::default(),
async_ops: IdData::default(),
async_op_stats: IdData::default(),
all_poll_ops: Default::default(),
new_poll_ops: Default::default(),
poll_ops: Default::default(),
temporality: Temporality::Live,
base_time,
}
Expand Down Expand Up @@ -292,8 +285,8 @@ impl Aggregator {

fn resource_update(&mut self, include: Include) -> proto::resources::ResourceUpdate {
let new_poll_ops = match include {
Include::All => (*self.all_poll_ops).clone(),
Include::UpdatedOnly => std::mem::take(&mut self.new_poll_ops),
Include::All => self.poll_ops.clone(),
Include::UpdatedOnly => std::mem::take(&mut self.poll_ops),
};
proto::resources::ResourceUpdate {
new_resources: self.resources.as_proto_list(include, &self.base_time),
Expand Down Expand Up @@ -465,8 +458,7 @@ impl Aggregator {
is_ready,
};

self.all_poll_ops.push(poll_op.clone());
self.new_poll_ops.push(poll_op);
self.poll_ops.push(poll_op);
}

Event::AsyncResourceOp {
Expand Down

0 comments on commit 9178ecf

Please sign in to comment.