Skip to content

Commit

Permalink
Avoid multiple QueueId HashMaps for RuntimeInfo
Browse files Browse the repository at this point in the history
Signed-off-by: Bob Weinand <bob.weinand@datadoghq.com>
  • Loading branch information
bwoebi committed Jun 27, 2024
1 parent 95c7998 commit 7472b3a
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 98 deletions.
41 changes: 19 additions & 22 deletions sidecar/src/service/runtime_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::service::{
telemetry::{AppInstance, AppOrQueue},
InstanceId, QueueId,
};
use ddcommon::tag::Tag;
use futures::{
future::{self, join_all, Shared},
FutureExt,
Expand All @@ -27,18 +28,26 @@ pub(crate) struct SharedAppManualFut {

/// `RuntimeInfo` is a struct that contains information about a runtime.
/// It contains a map of apps and a map of app or actions.
/// Each app is represented by a shared future that may contain an `Option<AppInstance>`.
/// Each action is represented by an `AppOrQueue` enum. Combining apps and actions are necessary
/// because service and env names are not known until later in the initialization process.
#[derive(Clone, Default)]
pub(crate) struct RuntimeInfo {
pub(crate) apps: Arc<Mutex<AppMap>>,
app_or_actions: Arc<Mutex<HashMap<QueueId, AppOrQueue>>>,
applications: Arc<Mutex<HashMap<QueueId, ActiveApplication>>>,
#[cfg(feature = "tracing")]
remote_config_guards: Arc<Mutex<HashMap<QueueId, RemoteConfigsGuard>>>,
pub(crate) instance_id: InstanceId,
}

/// `ActiveApplications` is a struct the contains information about a known in flight application.
/// Telemetry lifecycles (see `app_or_actions`) and remote_config `remote_config_guard` are bound to
/// it.
/// Each app is represented by a shared future that may contain an `Option<AppInstance>`.
/// Each action is represented by an `AppOrQueue` enum. Combining apps and actions are necessary
/// because service and env names are not known until later in the initialization process.
#[derive(Default)]
pub(crate) struct ActiveApplication {
pub app_or_actions: AppOrQueue,
pub remote_config_guard: Option<RemoteConfigsGuard>,
}

impl RuntimeInfo {
/// Retrieves the `AppInstance` for a given service name and environment name.
///
Expand Down Expand Up @@ -115,26 +124,14 @@ impl RuntimeInfo {
self.apps.lock().unwrap()
}

/// Locks the app or actions map and returns a mutable reference to it.
///
/// # Returns
///
/// * `MutexGuard<HashMap<QueueId, AppOrQueue>>` - A mutable reference to the app or actions
/// map.
pub(crate) fn lock_app_or_actions(&self) -> MutexGuard<HashMap<QueueId, AppOrQueue>> {
self.app_or_actions.lock().unwrap()
}

/// Locks the remote config guards map and returns a mutable reference to it.
/// Locks the applications map and returns a mutable reference to it.
///
/// # Returns
///
/// * `MutexGuard<HashMap<QueueId, RemoteConfigsGuard>>` - A mutable reference to the remote
/// config guards map.
pub(crate) fn lock_remote_config_guards(
&self,
) -> MutexGuard<HashMap<QueueId, RemoteConfigsGuard>> {
self.remote_config_guards.lock().unwrap()
/// * `MutexGuard<HashMap<QueueId, ActiveApplications>>` - A mutable reference to the
/// applications map.
pub(crate) fn lock_applications(&self) -> MutexGuard<HashMap<QueueId, ActiveApplication>> {
self.applications.lock().unwrap()
}
}

Expand Down
181 changes: 105 additions & 76 deletions sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use tokio::task::{JoinError, JoinHandle};

use crate::dogstatsd::DogStatsDAction;
use crate::service::remote_configs::{RemoteConfigNotifyTarget, RemoteConfigs};
use crate::service::runtime_info::ActiveApplication;
use crate::service::telemetry::enqueued_telemetry_stats::EnqueuedTelemetryStats;
use crate::service::tracing::trace_flusher::TraceFlusherStats;
use datadog_ipc::platform::FileBackedHandle;
Expand All @@ -62,6 +63,7 @@ struct SidecarStats {
active_apps: u32,
enqueued_apps: u32,
enqueued_telemetry_data: EnqueuedTelemetryStats,
remote_config_clients: u32,
telemetry_metrics_contexts: u32,
telemetry_worker: TelemetryWorkerStats,
telemetry_worker_errors: u32,
Expand Down Expand Up @@ -167,7 +169,7 @@ impl SidecarServer {

async fn process_interceptor_response(
&self,
result: Result<(HashSet<String>, HashSet<InstanceId>), tokio::task::JoinError>,
result: Result<(HashSet<String>, HashSet<InstanceId>), JoinError>,
) {
match result {
Ok((sessions, instances)) => {
Expand Down Expand Up @@ -396,7 +398,7 @@ impl SidecarServer {
.map(|s| {
s.lock_runtimes()
.values()
.map(|r| r.lock_app_or_actions().len() as u32)
.map(|r| r.lock_applications().len() as u32)
.sum::<u32>()
})
.sum(),
Expand All @@ -406,9 +408,9 @@ impl SidecarServer {
s.lock_runtimes()
.values()
.map(|r| {
r.lock_app_or_actions()
r.lock_applications()
.values()
.filter(|a| matches!(a, AppOrQueue::Queue(_)))
.filter(|a| matches!(a.app_or_actions, AppOrQueue::Queue(_)))
.count() as u32
})
.sum::<u32>()
Expand All @@ -420,9 +422,9 @@ impl SidecarServer {
s.lock_runtimes()
.values()
.map(|r| {
r.lock_app_or_actions()
r.lock_applications()
.values()
.filter_map(|a| match a {
.filter_map(|a| match &a.app_or_actions {
AppOrQueue::Queue(q) => Some(q.stats()),
_ => None,
})
Expand All @@ -431,6 +433,20 @@ impl SidecarServer {
.sum()
})
.sum(),
remote_config_clients: sessions
.values()
.map(|s| {
s.lock_runtimes()
.values()
.map(|r| {
r.lock_applications()
.values()
.filter_map(|a| a.remote_config_guard.as_ref())
.count() as u32
})
.sum::<u32>()
})
.sum(),
telemetry_metrics_contexts: sessions
.values()
.map(|s| {
Expand Down Expand Up @@ -476,58 +492,68 @@ impl SidecarInterface for SidecarServer {
actions: Vec<SidecarAction>,
) -> Self::EnqueueActionsFut {
let rt_info = self.get_runtime(&instance_id);
let mut queue = rt_info.lock_app_or_actions();
match queue.entry(queue_id) {
Entry::Occupied(mut entry) => match entry.get_mut() {
AppOrQueue::Queue(ref mut data) => {
data.process(actions);
}
AppOrQueue::App(service_future) => {
let service_future = service_future.clone();
// drop on stop
if actions.iter().any(|action| {
matches!(
action,
SidecarAction::Telemetry(TelemetryActions::Lifecycle(
LifecycleAction::Stop
))
)
}) {
entry.remove();
rt_info.lock_remote_config_guards().remove(&queue_id);
let mut applications = rt_info.lock_applications();
match applications.entry(queue_id) {
Entry::Occupied(mut entry) => {
let value = entry.get_mut();
match value.app_or_actions {
AppOrQueue::Inactive => {
value.app_or_actions =
AppOrQueue::Queue(EnqueuedTelemetryData::processed(actions));
}
let apps = rt_info.apps.clone();
tokio::spawn(async move {
let service = service_future.await;
let app_future = if let Some(fut) = apps
.lock()
.expect("Unable to acquire lock on apps")
.get(&service)
{
fut.clone()
} else {
return;
};
if let Some(mut app) = app_future.await {
let actions =
EnqueuedTelemetryData::process_immediately(actions, &mut app).await;
app.telemetry.send_msgs(actions).await.ok();
AppOrQueue::Queue(ref mut data) => {
data.process(actions);
}
AppOrQueue::App(ref service_future) => {
let service_future = service_future.clone();
// drop on stop
if actions.iter().any(|action| {
matches!(
action,
SidecarAction::Telemetry(TelemetryActions::Lifecycle(
LifecycleAction::Stop
))
)
}) {
entry.remove();
}
});
let apps = rt_info.apps.clone();
tokio::spawn(async move {
let service = service_future.await;
let app_future = if let Some(fut) = apps
.lock()
.expect("Unable to acquire lock on apps")
.get(&service)
{
fut.clone()
} else {
return;
};
if let Some(mut app) = app_future.await {
let actions =
EnqueuedTelemetryData::process_immediately(actions, &mut app)
.await;
app.telemetry.send_msgs(actions).await.ok();
}
});
}
}
},
}
Entry::Vacant(entry) => {
if actions.len() == 1
&& matches!(
if actions.len() != 1
|| !matches!(
actions[0],
SidecarAction::Telemetry(TelemetryActions::Lifecycle(
LifecycleAction::Stop
))
)
{
rt_info.lock_remote_config_guards().remove(&queue_id);
} else {
entry.insert(AppOrQueue::Queue(EnqueuedTelemetryData::processed(actions)));
entry.insert(ActiveApplication {
app_or_actions: AppOrQueue::Queue(EnqueuedTelemetryData::processed(
actions,
)),
..Default::default()
});
}
}
}
Expand All @@ -549,12 +575,17 @@ impl SidecarInterface for SidecarServer {
let (future, completer) = ManualFuture::new();
let app_or_queue = {
let rt_info = self.get_runtime(&instance_id);
let mut app_or_actions = rt_info.lock_app_or_actions();
match app_or_actions.get(&queue_id) {
Some(AppOrQueue::Queue(_)) => {
app_or_actions.insert(queue_id, AppOrQueue::App(future.shared()))
}
None => Some(AppOrQueue::Queue(EnqueuedTelemetryData::default())),
let mut applications = rt_info.lock_applications();
match applications.get_mut(&queue_id) {
Some(ActiveApplication {
app_or_actions: ref mut app @ AppOrQueue::Queue(_),
..
}) => Some(std::mem::replace(app, AppOrQueue::App(future.shared()))),
None
| Some(ActiveApplication {
app_or_actions: AppOrQueue::Inactive,
..
}) => Some(AppOrQueue::Queue(EnqueuedTelemetryData::default())),
_ => None,
}
};
Expand Down Expand Up @@ -590,10 +621,7 @@ impl SidecarInterface for SidecarServer {
matches!(action, TelemetryActions::Lifecycle(LifecycleAction::Stop))
}) {
self.get_runtime(&instance_id)
.lock_app_or_actions()
.remove(&queue_id);
self.get_runtime(&instance_id)
.lock_remote_config_guards()
.lock_applications()
.remove(&queue_id);
}

Expand Down Expand Up @@ -785,24 +813,25 @@ impl SidecarInterface for SidecarServer {
let notify_target = RemoteConfigNotifyTarget {
pid: session.pid.load(Ordering::Relaxed),
};
session
.get_runtime(&instance_id.runtime_id)
.lock_remote_config_guards()
.insert(
queue_id,
self.remote_configs.add_runtime(
session
.get_remote_config_invariants()
.as_ref()
.expect("Expecting remote config invariants to be set early")
.clone(),
instance_id.runtime_id,
notify_target,
env_name,
service_name,
app_version,
),
);
let runtime_info = session.get_runtime(&instance_id.runtime_id);
runtime_info
.lock_applications()
.entry(queue_id)
.or_default()
.remote_config_guard = Some(
self.remote_configs.add_runtime(
session
.get_remote_config_invariants()
.as_ref()
.expect("Expecting remote config invariants to be set early")
.clone(),
instance_id.runtime_id,
notify_target,
env_name,
service_name,
app_version,
),
);

no_response()
}
Expand Down
3 changes: 3 additions & 0 deletions sidecar/src/service/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ pub mod enqueued_telemetry_data;
pub mod enqueued_telemetry_stats;

#[allow(clippy::large_enum_variant)]
#[derive(Default)]
pub(crate) enum AppOrQueue {
#[default]
Inactive,
App(Shared<ManualFuture<(String, String)>>),
Queue(EnqueuedTelemetryData),
}

0 comments on commit 7472b3a

Please sign in to comment.