Skip to content

Commit

Permalink
Merge pull request #511 from cbgbt/prometheus-gauge
Browse files Browse the repository at this point in the history
controller: disable prometheus metric processor memory
  • Loading branch information
cbgbt committed Aug 9, 2023
2 parents 93d14fe + c6af9ed commit 880e401
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 69 deletions.
3 changes: 3 additions & 0 deletions controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ snafu = "0.7"
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }
tracing = "0.1"
validator = { version = "0.16", features = ["derive"] }

[dev-dependencies]
maplit = "1"
53 changes: 12 additions & 41 deletions controller/src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::metrics;

use super::{
metrics::{BrupopControllerMetrics, BrupopHostsData},
scheduler::BrupopCronScheduler,
metrics::BrupopControllerMetrics, scheduler::BrupopCronScheduler,
statemachine::determine_next_node_spec,
};
use models::constants::{BRUPOP_INTERFACE_VERSION, LABEL_BRUPOP_INTERFACE_NAME};
Expand All @@ -16,7 +17,7 @@ use kube::Api;
use kube::ResourceExt;
use opentelemetry::global;
use snafu::ResultExt;
use std::collections::{BTreeMap, HashMap};
use std::collections::BTreeMap;
use std::env;
use tokio::time::{sleep, Duration};

Expand Down Expand Up @@ -167,41 +168,12 @@ impl<T: BottlerocketShadowClient> BrupopController<T> {

#[instrument(skip(self))]
fn emit_metrics(&self) -> Result<()> {
let data = self.fetch_data()?;
self.metrics.emit_metrics(data);
let metrics_data = metrics::BrupopHostsData::from_shadows(&self.all_brss())
.context(controllerclient_error::MetricsComputeSnafu)?;
self.metrics.emit_metrics(metrics_data);
Ok(())
}

/// Fetch the custom resources status for all resources
/// to gather the information on hosts's bottlerocket version
/// and brupop state.
#[instrument(skip(self))]
fn fetch_data(&self) -> Result<BrupopHostsData> {
let mut hosts_version_count_map = HashMap::new();
let mut hosts_state_count_map = HashMap::new();

for brs in self.all_brss() {
if let Some(brs_status) = brs.status {
let current_version = brs_status.current_version().to_string();
let current_state = brs_status.current_state;

*hosts_version_count_map.entry(current_version).or_default() += 1;
*hosts_state_count_map
.entry(serde_plain::to_string(&current_state).context(
controllerclient_error::AssertionSnafu {
msg: "unable to parse current_state".to_string(),
},
)?)
.or_default() += 1;
}
}

Ok(BrupopHostsData::new(
hosts_version_count_map,
hosts_state_count_map,
))
}

#[instrument(skip(self, nodes, brss_name))]
async fn bottlerocketshadows_cleanup(
&self,
Expand Down Expand Up @@ -633,12 +605,6 @@ pub mod controllerclient_error {
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Controller failed due to {}: '{}'", msg, source))]
Assertion {
msg: String,
source: serde_plain::Error,
},

#[snafu(display("Failed to delete node via kubernetes API: '{}'", source))]
DeleteNode { source: kube::Error },

Expand Down Expand Up @@ -676,6 +642,11 @@ pub mod controllerclient_error {
#[snafu(display("Error creating maintenance time: '{}'", source))]
MaintenanceTimeError { source: scheduler_error::Error },

#[snafu(display("Failed to compute cluster metrics: '{}'", source))]
MetricsCompute {
source: crate::metrics::error::MetricsError,
},

#[snafu(display("Unable to find next scheduled time and sleep: '{}'", source))]
SleepUntilNextSchedule { source: scheduler_error::Error },
}
Expand Down
2 changes: 1 addition & 1 deletion controller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn main() -> Result<()> {
selectors::simple::histogram([1.0, 2.0, 5.0, 10.0, 20.0, 50.0]),
aggregation::cumulative_temporality_selector(),
)
.with_memory(true),
.with_memory(false),
)
.build();

Expand Down
158 changes: 131 additions & 27 deletions controller/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use models::node::BottlerocketShadow;
use opentelemetry::{metrics::Meter, Key};
use snafu::ResultExt;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::{Arc, Mutex};
use tracing::instrument;

const HOST_VERSION_KEY: Key = Key::from_static_str("bottlerocket_version");
Expand All @@ -12,35 +13,55 @@ pub struct BrupopControllerMetrics {
brupop_shared_hosts_data: Arc<Mutex<BrupopHostsData>>,
}

#[derive(Debug)]
#[derive(Debug, Default)]
pub struct BrupopHostsData {
hosts_version_count_map: HashMap<String, u64>,
hosts_state_count_map: HashMap<String, u64>,
hosts_version_count: HashMap<String, u64>,
hosts_state_count: HashMap<String, u64>,
}

impl BrupopHostsData {
pub fn new(
hosts_version_count_map: HashMap<String, u64>,
hosts_state_count_map: HashMap<String, u64>,
) -> Self {
BrupopHostsData {
hosts_version_count_map,
hosts_state_count_map,
}
/// Computes point-in-time metrics for the cluster's hosts based on a set of BottlerocketShadows.
pub fn from_shadows(brss: &[BottlerocketShadow]) -> Result<Self, error::MetricsError> {
let mut hosts_version_count = HashMap::new();
let mut hosts_state_count = HashMap::new();

brss.iter()
.filter_map(|brs| brs.status.as_ref())
.try_for_each(|brs_status| {
let host_version = brs_status.current_version().to_string();
let host_state = brs_status.current_state;

*hosts_version_count.entry(host_version).or_default() += 1;
*hosts_state_count
.entry(serde_plain::to_string(&host_state).context(error::SerializeStateSnafu)?)
.or_default() += 1;

Ok(())
})?;
Ok(Self {
hosts_version_count,
hosts_state_count,
})
}
}

impl Default for BrupopHostsData {
fn default() -> Self {
let hosts_version_count_map = HashMap::new();
let hosts_state_count_map = HashMap::new();
BrupopHostsData {
hosts_version_count_map,
hosts_state_count_map,
}
/// Marks all current gauges at 0, then writes the new metrics into the store.
fn update_counters(&mut self, other: &BrupopHostsData) {
update_counter(&mut self.hosts_version_count, &other.hosts_version_count);
update_counter(&mut self.hosts_state_count, &other.hosts_state_count);
}
}

/// Updates a population counter from a stateless input.
///
/// All current state in the counter is set to 0, then new counts are copied from the incoming state.
fn update_counter(base: &mut HashMap<String, u64>, other: &HashMap<String, u64>) {
base.iter_mut().for_each(|(_k, v)| *v = 0);

other.iter().for_each(|(k, v)| {
*base.entry(k.clone()).or_default() = *v;
});
}

impl BrupopControllerMetrics {
#[instrument]
pub fn new(meter: Meter) -> Self {
Expand All @@ -62,16 +83,16 @@ impl BrupopControllerMetrics {

let _ = meter.register_callback(move |cx| {
let data = hosts_data_clone_for_version.lock().unwrap();
for (host_version, count) in &data.hosts_version_count_map {
let labels = vec![HOST_VERSION_KEY.string(host_version.clone())];
for (host_version, count) in &data.hosts_version_count {
let labels = vec![HOST_VERSION_KEY.string(host_version.to_string())];
brupop_hosts_version_observer.observe(cx, *count, &labels);
}
});

let _ = meter.register_callback(move |cx| {
let data = hosts_data_clone_for_state.lock().unwrap();
for (host_state, count) in &data.hosts_state_count_map {
let labels = vec![HOST_STATE_KEY.string(host_state.clone())];
for (host_state, count) in &data.hosts_state_count {
let labels = vec![HOST_STATE_KEY.string(host_state.to_string())];
brupop_hosts_state_observer.observe(cx, *count, &labels);
}
});
Expand All @@ -84,7 +105,90 @@ impl BrupopControllerMetrics {
/// Update shared mut ref to trigger ValueRecorder observe data.
pub fn emit_metrics(&self, data: BrupopHostsData) {
if let Ok(mut host_data) = self.brupop_shared_hosts_data.try_lock() {
*host_data = data;
host_data.update_counters(&data);
}
}
}

pub mod error {
use snafu::Snafu;

#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum MetricsError {
#[snafu(display("Failed to serialize Shadow state: '{}'", source))]
SerializeState { source: serde_plain::Error },
}
}

#[cfg(test)]
mod test {
use std::collections::HashMap;

use maplit::hashmap;

use crate::metrics::update_counter;

#[test]
fn test_update_counter() {
let test_cases = vec![
(
hashmap! {
"a" => 5,
"b" => 10,
"c" => 15,
},
hashmap! {
"a" => 11,

},
hashmap! {
"a" => 11,
"b" => 0,
"c" => 0,
},
),
(
hashmap! {
"a" => 1,
},
hashmap! {
"b" => 11,
"c" => 12,
},
hashmap! {
"a" => 0,
"b" => 11,
"c" => 12,
},
),
(
hashmap! {
"a" => 1,
},
hashmap! {
"a" => 2,
},
hashmap! {
"a" => 2,
},
),
];

fn stringify(hashmap: HashMap<&str, u64>) -> HashMap<String, u64> {
hashmap
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect()
}

for (base, other, expected) in test_cases.into_iter() {
let mut base = stringify(base);
let other = stringify(other);
let expected = stringify(expected);

update_counter(&mut base, &other);
assert_eq!(&base, &expected);
}
}
}

0 comments on commit 880e401

Please sign in to comment.