From 01923dbb0f5cdb64ae04bfd49972e9a86cf9001e Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Sat, 28 Oct 2023 21:35:11 -0400 Subject: [PATCH] Remove agent name from metrics --- src/server/metrics.rs | 59 +++++++++++++++----------------------- src/server/routes/agent.rs | 9 +++--- 2 files changed, 27 insertions(+), 41 deletions(-) diff --git a/src/server/metrics.rs b/src/server/metrics.rs index 21ac6554..2f3b98d9 100644 --- a/src/server/metrics.rs +++ b/src/server/metrics.rs @@ -3,7 +3,6 @@ use crate::experiments::{Assignee, Experiment}; use crate::prelude::*; use crate::server::agents::Agent; use chrono::{DateTime, Utc}; -use prometheus::proto::{Metric, MetricFamily}; use prometheus::{HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec}; const JOBS_METRIC: &str = "crater_completed_jobs_total"; @@ -26,7 +25,7 @@ impl Metrics { pub fn new() -> Fallible { let jobs_opts = prometheus::opts!(JOBS_METRIC, "total completed jobs"); let crater_completed_jobs_total = - prometheus::register_int_counter_vec!(jobs_opts, &["agent", "experiment"])?; + prometheus::register_int_counter_vec!(jobs_opts, &["experiment"])?; let crater_bounced_record_progress = prometheus::register_int_counter!( "crater_bounced_record_progress", "hits with full record progress queue" @@ -63,39 +62,15 @@ impl Metrics { .inc_by(1); } - pub fn record_completed_jobs(&self, agent: &str, experiment: &str, amount: u64) { + pub fn record_completed_jobs(&self, experiment: &str, amount: u64) { self.crater_completed_jobs_total - .with_label_values(&[agent, experiment]) + .with_label_values(&[experiment]) .inc_by(amount); } - fn get_metric_by_name(name: &str) -> Option { - let families = prometheus::gather(); - families.into_iter().find(|fam| fam.get_name() == name) - } - - fn get_label_by_name<'a>(metric: &'a Metric, label: &str) -> Option<&'a str> { - metric - .get_label() - .iter() - .find(|lab| lab.get_name() == label) - .map(|lab| lab.get_value()) - } - fn remove_experiment_jobs(&self, experiment: &str) -> Fallible<()> { - if let Some(metric) = Self::get_metric_by_name(JOBS_METRIC) { - let agents = metric - .get_metric() - .iter() - .filter(|met| Self::get_label_by_name(met, "experiment").unwrap() == experiment) - .map(|met| Self::get_label_by_name(met, "agent").unwrap()) - .collect::>(); - - for agent in agents.iter() { - self.crater_completed_jobs_total - .remove_label_values(&[agent, experiment])?; - } - } + self.crater_completed_jobs_total + .remove_label_values(&[experiment])?; Ok(()) } @@ -143,12 +118,27 @@ mod tests { use crate::server::tokens::Tokens; use chrono::Utc; use lazy_static::lazy_static; - use prometheus::proto::MetricFamily; + use prometheus::proto::{Metric, MetricFamily}; lazy_static! { static ref METRICS: Metrics = Metrics::new().unwrap(); } + impl Metrics { + fn get_metric_by_name(name: &str) -> Option { + let families = prometheus::gather(); + families.into_iter().find(|fam| fam.get_name() == name) + } + + fn get_label_by_name<'a>(metric: &'a Metric, label: &str) -> Option<&'a str> { + metric + .get_label() + .iter() + .find(|lab| lab.get_name() == label) + .map(|lab| lab.get_value()) + } + } + fn test_experiment_presence(metric: &MetricFamily, experiment: &str) -> bool { metric .get_metric() @@ -160,12 +150,9 @@ mod tests { fn test_on_complete_experiment() { let ex1 = "pr-0"; let ex2 = "pr-1"; - let agent1 = "agent-1"; - let agent2 = "agent-2"; - METRICS.record_completed_jobs(agent1, ex1, 1); - METRICS.record_completed_jobs(agent2, ex1, 1); - METRICS.record_completed_jobs(agent2, ex2, 1); + METRICS.record_completed_jobs(ex1, 1); + METRICS.record_completed_jobs(ex2, 1); //test metrics are correctly registered let jobs = Metrics::get_metric_by_name(JOBS_METRIC).unwrap(); diff --git a/src/server/routes/agent.rs b/src/server/routes/agent.rs index 100dea32..e820f033 100644 --- a/src/server/routes/agent.rs +++ b/src/server/routes/agent.rs @@ -167,7 +167,7 @@ fn endpoint_next_crate( #[derive(Clone)] pub struct RecordProgressThread { // String is the worker name - queue: Sender<(ExperimentData, String)>, + queue: Sender>, in_flight_requests: Arc<(Mutex, Condvar)>, } @@ -189,7 +189,7 @@ impl RecordProgressThread { // Panics should already be logged and otherwise there's not much we // can/should do. let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { - let (result, worker_name) = rx.recv().unwrap(); + let result = rx.recv().unwrap(); this.block_until_idle(); let start = std::time::Instant::now(); @@ -204,7 +204,6 @@ impl RecordProgressThread { } metrics.record_completed_jobs( - &worker_name, &ex.name, result.data.results.len() as u64, ); @@ -300,12 +299,12 @@ impl Drop for RequestGuard { fn endpoint_record_progress( result: ExperimentData, data: Arc, - auth: AuthDetails, + _auth: AuthDetails, ) -> Fallible> { match data .record_progress_worker .queue - .try_send((result, auth.name)) + .try_send(result) { Ok(()) => Ok(ApiResponse::Success { result: true }.into_response()?), Err(crossbeam_channel::TrySendError::Full(_)) => {