From 7472b3adac99c0e92ef16c67508ca7716217fefa Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Thu, 27 Jun 2024 20:24:05 +0200 Subject: [PATCH] Avoid multiple QueueId HashMaps for RuntimeInfo Signed-off-by: Bob Weinand --- sidecar/src/service/runtime_info.rs | 41 +++--- sidecar/src/service/sidecar_server.rs | 181 +++++++++++++++----------- sidecar/src/service/telemetry/mod.rs | 3 + 3 files changed, 127 insertions(+), 98 deletions(-) diff --git a/sidecar/src/service/runtime_info.rs b/sidecar/src/service/runtime_info.rs index 9f2f8215d..97819c933 100644 --- a/sidecar/src/service/runtime_info.rs +++ b/sidecar/src/service/runtime_info.rs @@ -6,6 +6,7 @@ use crate::service::{ telemetry::{AppInstance, AppOrQueue}, InstanceId, QueueId, }; +use ddcommon::tag::Tag; use futures::{ future::{self, join_all, Shared}, FutureExt, @@ -27,18 +28,26 @@ pub(crate) struct SharedAppManualFut { /// `RuntimeInfo` is a struct that contains information about a runtime. /// It contains a map of apps and a map of app or actions. -/// Each app is represented by a shared future that may contain an `Option`. -/// Each action is represented by an `AppOrQueue` enum. Combining apps and actions are necessary -/// because service and env names are not known until later in the initialization process. #[derive(Clone, Default)] pub(crate) struct RuntimeInfo { pub(crate) apps: Arc>, - app_or_actions: Arc>>, + applications: Arc>>, #[cfg(feature = "tracing")] - remote_config_guards: Arc>>, pub(crate) instance_id: InstanceId, } +/// `ActiveApplications` is a struct the contains information about a known in flight application. +/// Telemetry lifecycles (see `app_or_actions`) and remote_config `remote_config_guard` are bound to +/// it. +/// Each app is represented by a shared future that may contain an `Option`. +/// Each action is represented by an `AppOrQueue` enum. Combining apps and actions are necessary +/// because service and env names are not known until later in the initialization process. +#[derive(Default)] +pub(crate) struct ActiveApplication { + pub app_or_actions: AppOrQueue, + pub remote_config_guard: Option, +} + impl RuntimeInfo { /// Retrieves the `AppInstance` for a given service name and environment name. /// @@ -115,26 +124,14 @@ impl RuntimeInfo { self.apps.lock().unwrap() } - /// Locks the app or actions map and returns a mutable reference to it. - /// - /// # Returns - /// - /// * `MutexGuard>` - A mutable reference to the app or actions - /// map. - pub(crate) fn lock_app_or_actions(&self) -> MutexGuard> { - self.app_or_actions.lock().unwrap() - } - - /// Locks the remote config guards map and returns a mutable reference to it. + /// Locks the applications map and returns a mutable reference to it. /// /// # Returns /// - /// * `MutexGuard>` - A mutable reference to the remote - /// config guards map. - pub(crate) fn lock_remote_config_guards( - &self, - ) -> MutexGuard> { - self.remote_config_guards.lock().unwrap() + /// * `MutexGuard>` - A mutable reference to the + /// applications map. + pub(crate) fn lock_applications(&self) -> MutexGuard> { + self.applications.lock().unwrap() } } diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index 077f95bbd..26e4b69b1 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -40,6 +40,7 @@ use tokio::task::{JoinError, JoinHandle}; use crate::dogstatsd::DogStatsDAction; use crate::service::remote_configs::{RemoteConfigNotifyTarget, RemoteConfigs}; +use crate::service::runtime_info::ActiveApplication; use crate::service::telemetry::enqueued_telemetry_stats::EnqueuedTelemetryStats; use crate::service::tracing::trace_flusher::TraceFlusherStats; use datadog_ipc::platform::FileBackedHandle; @@ -62,6 +63,7 @@ struct SidecarStats { active_apps: u32, enqueued_apps: u32, enqueued_telemetry_data: EnqueuedTelemetryStats, + remote_config_clients: u32, telemetry_metrics_contexts: u32, telemetry_worker: TelemetryWorkerStats, telemetry_worker_errors: u32, @@ -167,7 +169,7 @@ impl SidecarServer { async fn process_interceptor_response( &self, - result: Result<(HashSet, HashSet), tokio::task::JoinError>, + result: Result<(HashSet, HashSet), JoinError>, ) { match result { Ok((sessions, instances)) => { @@ -396,7 +398,7 @@ impl SidecarServer { .map(|s| { s.lock_runtimes() .values() - .map(|r| r.lock_app_or_actions().len() as u32) + .map(|r| r.lock_applications().len() as u32) .sum::() }) .sum(), @@ -406,9 +408,9 @@ impl SidecarServer { s.lock_runtimes() .values() .map(|r| { - r.lock_app_or_actions() + r.lock_applications() .values() - .filter(|a| matches!(a, AppOrQueue::Queue(_))) + .filter(|a| matches!(a.app_or_actions, AppOrQueue::Queue(_))) .count() as u32 }) .sum::() @@ -420,9 +422,9 @@ impl SidecarServer { s.lock_runtimes() .values() .map(|r| { - r.lock_app_or_actions() + r.lock_applications() .values() - .filter_map(|a| match a { + .filter_map(|a| match &a.app_or_actions { AppOrQueue::Queue(q) => Some(q.stats()), _ => None, }) @@ -431,6 +433,20 @@ impl SidecarServer { .sum() }) .sum(), + remote_config_clients: sessions + .values() + .map(|s| { + s.lock_runtimes() + .values() + .map(|r| { + r.lock_applications() + .values() + .filter_map(|a| a.remote_config_guard.as_ref()) + .count() as u32 + }) + .sum::() + }) + .sum(), telemetry_metrics_contexts: sessions .values() .map(|s| { @@ -476,58 +492,68 @@ impl SidecarInterface for SidecarServer { actions: Vec, ) -> Self::EnqueueActionsFut { let rt_info = self.get_runtime(&instance_id); - let mut queue = rt_info.lock_app_or_actions(); - match queue.entry(queue_id) { - Entry::Occupied(mut entry) => match entry.get_mut() { - AppOrQueue::Queue(ref mut data) => { - data.process(actions); - } - AppOrQueue::App(service_future) => { - let service_future = service_future.clone(); - // drop on stop - if actions.iter().any(|action| { - matches!( - action, - SidecarAction::Telemetry(TelemetryActions::Lifecycle( - LifecycleAction::Stop - )) - ) - }) { - entry.remove(); - rt_info.lock_remote_config_guards().remove(&queue_id); + let mut applications = rt_info.lock_applications(); + match applications.entry(queue_id) { + Entry::Occupied(mut entry) => { + let value = entry.get_mut(); + match value.app_or_actions { + AppOrQueue::Inactive => { + value.app_or_actions = + AppOrQueue::Queue(EnqueuedTelemetryData::processed(actions)); } - let apps = rt_info.apps.clone(); - tokio::spawn(async move { - let service = service_future.await; - let app_future = if let Some(fut) = apps - .lock() - .expect("Unable to acquire lock on apps") - .get(&service) - { - fut.clone() - } else { - return; - }; - if let Some(mut app) = app_future.await { - let actions = - EnqueuedTelemetryData::process_immediately(actions, &mut app).await; - app.telemetry.send_msgs(actions).await.ok(); + AppOrQueue::Queue(ref mut data) => { + data.process(actions); + } + AppOrQueue::App(ref service_future) => { + let service_future = service_future.clone(); + // drop on stop + if actions.iter().any(|action| { + matches!( + action, + SidecarAction::Telemetry(TelemetryActions::Lifecycle( + LifecycleAction::Stop + )) + ) + }) { + entry.remove(); } - }); + let apps = rt_info.apps.clone(); + tokio::spawn(async move { + let service = service_future.await; + let app_future = if let Some(fut) = apps + .lock() + .expect("Unable to acquire lock on apps") + .get(&service) + { + fut.clone() + } else { + return; + }; + if let Some(mut app) = app_future.await { + let actions = + EnqueuedTelemetryData::process_immediately(actions, &mut app) + .await; + app.telemetry.send_msgs(actions).await.ok(); + } + }); + } } - }, + } Entry::Vacant(entry) => { - if actions.len() == 1 - && matches!( + if actions.len() != 1 + || !matches!( actions[0], SidecarAction::Telemetry(TelemetryActions::Lifecycle( LifecycleAction::Stop )) ) { - rt_info.lock_remote_config_guards().remove(&queue_id); - } else { - entry.insert(AppOrQueue::Queue(EnqueuedTelemetryData::processed(actions))); + entry.insert(ActiveApplication { + app_or_actions: AppOrQueue::Queue(EnqueuedTelemetryData::processed( + actions, + )), + ..Default::default() + }); } } } @@ -549,12 +575,17 @@ impl SidecarInterface for SidecarServer { let (future, completer) = ManualFuture::new(); let app_or_queue = { let rt_info = self.get_runtime(&instance_id); - let mut app_or_actions = rt_info.lock_app_or_actions(); - match app_or_actions.get(&queue_id) { - Some(AppOrQueue::Queue(_)) => { - app_or_actions.insert(queue_id, AppOrQueue::App(future.shared())) - } - None => Some(AppOrQueue::Queue(EnqueuedTelemetryData::default())), + let mut applications = rt_info.lock_applications(); + match applications.get_mut(&queue_id) { + Some(ActiveApplication { + app_or_actions: ref mut app @ AppOrQueue::Queue(_), + .. + }) => Some(std::mem::replace(app, AppOrQueue::App(future.shared()))), + None + | Some(ActiveApplication { + app_or_actions: AppOrQueue::Inactive, + .. + }) => Some(AppOrQueue::Queue(EnqueuedTelemetryData::default())), _ => None, } }; @@ -590,10 +621,7 @@ impl SidecarInterface for SidecarServer { matches!(action, TelemetryActions::Lifecycle(LifecycleAction::Stop)) }) { self.get_runtime(&instance_id) - .lock_app_or_actions() - .remove(&queue_id); - self.get_runtime(&instance_id) - .lock_remote_config_guards() + .lock_applications() .remove(&queue_id); } @@ -785,24 +813,25 @@ impl SidecarInterface for SidecarServer { let notify_target = RemoteConfigNotifyTarget { pid: session.pid.load(Ordering::Relaxed), }; - session - .get_runtime(&instance_id.runtime_id) - .lock_remote_config_guards() - .insert( - queue_id, - self.remote_configs.add_runtime( - session - .get_remote_config_invariants() - .as_ref() - .expect("Expecting remote config invariants to be set early") - .clone(), - instance_id.runtime_id, - notify_target, - env_name, - service_name, - app_version, - ), - ); + let runtime_info = session.get_runtime(&instance_id.runtime_id); + runtime_info + .lock_applications() + .entry(queue_id) + .or_default() + .remote_config_guard = Some( + self.remote_configs.add_runtime( + session + .get_remote_config_invariants() + .as_ref() + .expect("Expecting remote config invariants to be set early") + .clone(), + instance_id.runtime_id, + notify_target, + env_name, + service_name, + app_version, + ), + ); no_response() } diff --git a/sidecar/src/service/telemetry/mod.rs b/sidecar/src/service/telemetry/mod.rs index a609f5d6c..99d61a218 100644 --- a/sidecar/src/service/telemetry/mod.rs +++ b/sidecar/src/service/telemetry/mod.rs @@ -11,7 +11,10 @@ pub mod enqueued_telemetry_data; pub mod enqueued_telemetry_stats; #[allow(clippy::large_enum_variant)] +#[derive(Default)] pub(crate) enum AppOrQueue { + #[default] + Inactive, App(Shared>), Queue(EnqueuedTelemetryData), }