Skip to content

Commit

Permalink
Add stats and explicitly set poll interval
Browse files Browse the repository at this point in the history
Signed-off-by: Bob Weinand <bob.weinand@datadoghq.com>
  • Loading branch information
bwoebi committed Sep 19, 2024
1 parent 034cdbd commit 7f40ba3
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 13 deletions.
23 changes: 23 additions & 0 deletions remote-config/src/fetch/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ use ddcommon::{connector, Endpoint};
use http::uri::Scheme;
use hyper::http::uri::PathAndQuery;
use hyper::{Client, StatusCode};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256, Sha512};
use std::collections::{HashMap, HashSet};
use std::mem::transmute;
use std::ops::Add;
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;
use tracing::{debug, trace, warn};
Expand Down Expand Up @@ -79,6 +81,21 @@ pub struct ConfigFetcherState<S> {
pub expire_unused_files: bool,
}

#[derive(Default, Serialize, Deserialize)]
pub struct ConfigFetcherStateStats {
pub active_files: u32,
}

impl Add for ConfigFetcherStateStats {
type Output = Self;

fn add(self, rhs: Self) -> Self::Output {
ConfigFetcherStateStats {
active_files: self.active_files + rhs.active_files,
}
}
}

pub struct ConfigFetcherFilesLock<'a, S> {
inner: MutexGuard<'a, HashMap<Arc<RemoteConfigPath>, StoredTargetFile<S>>>,
}
Expand Down Expand Up @@ -146,6 +163,12 @@ impl<S> ConfigFetcherState<S> {
}
}
}

pub fn stats(&self) -> ConfigFetcherStateStats {
ConfigFetcherStateStats {
active_files: self.target_files_by_path.lock().unwrap().len() as u32,
}
}
}

pub struct ConfigFetcher<S: FileStorage> {
Expand Down
56 changes: 55 additions & 1 deletion remote-config/src/fetch/multitarget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@

use crate::fetch::{
ConfigApplyState, ConfigFetcherState, ConfigInvariants, FileStorage, RefcountedFile,
RefcountingStorage, SharedFetcher,
RefcountingStorage, RefcountingStorageStats, SharedFetcher,
};
use crate::Target;
use futures_util::future::Shared;
use futures_util::FutureExt;
use manual_future::ManualFuture;
use serde::{Deserialize, Serialize};
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::default::Default;
use std::fmt::Debug;
use std::hash::Hash;
use std::ops::Add;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
Expand Down Expand Up @@ -52,6 +54,31 @@ where
fetcher_semaphore: Semaphore,
}

#[derive(Default, Serialize, Deserialize)]
pub struct MultiTargetStats {
known_runtimes: u32,
starting_fetchers: u32,
active_fetchers: u32,
inactive_fetchers: u32,
removing_fetchers: u32,
storage: RefcountingStorageStats,
}

impl Add for MultiTargetStats {
type Output = Self;

fn add(self, rhs: Self) -> Self::Output {
MultiTargetStats {
known_runtimes: self.known_runtimes + rhs.known_runtimes,
starting_fetchers: self.starting_fetchers + rhs.starting_fetchers,
active_fetchers: self.active_fetchers + rhs.active_fetchers,
inactive_fetchers: self.inactive_fetchers + rhs.inactive_fetchers,
removing_fetchers: self.removing_fetchers + rhs.removing_fetchers,
storage: self.storage + rhs.storage,
}
}
}

enum KnownTargetStatus {
Pending,
Alive,
Expand Down Expand Up @@ -477,6 +504,33 @@ where
pub fn invariants(&self) -> &ConfigInvariants {
self.storage.invariants()
}

pub fn stats(&self) -> MultiTargetStats {
let (starting_fetchers, active_fetchers, inactive_fetchers, removing_fetchers) = {
let services = self.services.lock().unwrap();
let mut starting = 0;
let mut active = 0;
let mut inactive = 0;
let mut removing = 0;
for (_, known_target) in services.iter() {
match *known_target.status.lock().unwrap() {
KnownTargetStatus::Pending => starting += 1,
KnownTargetStatus::Alive => active += 1,
KnownTargetStatus::RemoveAt(_) => inactive += 1,
KnownTargetStatus::Removing(_) => removing += 1,
}
}
(starting, active, inactive, removing)
};
MultiTargetStats {
known_runtimes: self.runtimes.lock().unwrap().len() as u32,
starting_fetchers,
active_fetchers,
inactive_fetchers,
removing_fetchers,
storage: self.storage.stats(),
}
}
}

#[cfg(test)]
Expand Down
30 changes: 28 additions & 2 deletions remote-config/src/fetch/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
// SPDX-License-Identifier: Apache-2.0

use crate::fetch::{
ConfigApplyState, ConfigClientState, ConfigFetcher, ConfigFetcherState, ConfigInvariants,
FileStorage,
ConfigApplyState, ConfigClientState, ConfigFetcher, ConfigFetcherState,
ConfigFetcherStateStats, ConfigInvariants, FileStorage,
};
use crate::{RemoteConfigPath, Target};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::ops::Add;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
Expand Down Expand Up @@ -138,6 +140,23 @@ where
run_id: Arc<RunnersGeneration>,
}

#[derive(Default, Serialize, Deserialize)]
pub struct RefcountingStorageStats {
pub inactive_files: u32,
pub fetcher: ConfigFetcherStateStats,
}

impl Add for RefcountingStorageStats {
type Output = Self;

fn add(self, rhs: Self) -> Self::Output {
RefcountingStorageStats {
inactive_files: self.inactive_files + rhs.inactive_files,
fetcher: self.fetcher + rhs.fetcher,
}
}
}

impl<S: FileStorage + Clone> Clone for RefcountingStorage<S>
where
S::StoredFile: RefcountedFile,
Expand Down Expand Up @@ -191,6 +210,13 @@ where
pub fn invariants(&self) -> &ConfigInvariants {
&self.state.invariants
}

pub fn stats(&self) -> RefcountingStorageStats {
RefcountingStorageStats {
inactive_files: self.inactive.lock().unwrap().len() as u32,
fetcher: self.state.stats(),
}
}
}

impl<S: FileStorage + Clone> FileStorage for RefcountingStorage<S>
Expand Down
4 changes: 4 additions & 0 deletions sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ pub unsafe extern "C" fn ddog_sidecar_session_set_config(
language: ffi::CharSlice,
tracer_version: ffi::CharSlice,
flush_interval_milliseconds: u32,
remote_config_poll_interval_millis: u32,
telemetry_heartbeat_interval_millis: u32,
exception_hash_rate_limiter_seconds: u32,
force_flush_size: usize,
Expand Down Expand Up @@ -515,6 +516,9 @@ pub unsafe extern "C" fn ddog_sidecar_session_set_config(
language: language.to_utf8_lossy().into(),
tracer_version: tracer_version.to_utf8_lossy().into(),
flush_interval: Duration::from_millis(flush_interval_milliseconds as u64),
remote_config_poll_interval: Duration::from_millis(
remote_config_poll_interval_millis as u64
),
telemetry_heartbeat_interval: Duration::from_millis(
telemetry_heartbeat_interval_millis as u64
),
Expand Down
1 change: 1 addition & 0 deletions sidecar/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub struct SessionConfig {
pub language: String,
pub tracer_version: String,
pub flush_interval: Duration,
pub remote_config_poll_interval: Duration,
pub telemetry_heartbeat_interval: Duration,
pub exception_hash_rate_limiter_seconds: u32,
pub force_flush_size: usize,
Expand Down
14 changes: 13 additions & 1 deletion sidecar/src/service/remote_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

use crate::shm_remote_config::{ShmRemoteConfigs, ShmRemoteConfigsGuard};
use datadog_remote_config::fetch::{ConfigInvariants, NotifyTarget};
use datadog_remote_config::fetch::{ConfigInvariants, MultiTargetStats, NotifyTarget};
use std::collections::hash_map::Entry;
use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use zwohash::HashMap;

#[cfg(windows)]
Expand Down Expand Up @@ -96,6 +97,7 @@ impl RemoteConfigs {
pub fn add_runtime(
&self,
invariants: ConfigInvariants,
poll_interval: Duration,
runtime_id: String,
notify_target: RemoteConfigNotifyTarget,
env: String,
Expand All @@ -112,6 +114,7 @@ impl RemoteConfigs {
Box::new(move || {
this.lock().unwrap().remove(&invariants);
}),
poll_interval,
))
}
}
Expand All @@ -123,4 +126,13 @@ impl RemoteConfigs {
rc.shutdown();
}
}

pub fn stats(&self) -> MultiTargetStats {
self.0
.lock()
.unwrap()
.values()
.map(|rc| rc.stats())
.fold(MultiTargetStats::default(), |a, b| a + b)
}
}
2 changes: 2 additions & 0 deletions sidecar/src/service/session_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub(crate) struct SessionInfo {
tracer_config: Arc<Mutex<tracer::Config>>,
dogstatsd: Arc<Mutex<dogstatsd::Flusher>>,
remote_config_invariants: Arc<Mutex<Option<ConfigInvariants>>>,
pub(crate) remote_config_interval: Arc<Mutex<Duration>>,
#[cfg(windows)]
pub(crate) remote_config_notify_function:
Arc<Mutex<crate::service::remote_configs::RemoteConfigNotifyFunction>>,
Expand All @@ -48,6 +49,7 @@ impl Clone for SessionInfo {
tracer_config: self.tracer_config.clone(),
dogstatsd: self.dogstatsd.clone(),
remote_config_invariants: self.remote_config_invariants.clone(),
remote_config_interval: self.remote_config_interval.clone(),
#[cfg(windows)]
remote_config_notify_function: self.remote_config_notify_function.clone(),
log_guard: self.log_guard.clone(),
Expand Down
6 changes: 5 additions & 1 deletion sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::service::tracing::trace_flusher::TraceFlusherStats;
use datadog_ipc::platform::FileBackedHandle;
use datadog_ipc::tarpc::server::{Channel, InFlightRequest};
use datadog_live_debugger::sender::DebuggerType;
use datadog_remote_config::fetch::ConfigInvariants;
use datadog_remote_config::fetch::{ConfigInvariants, MultiTargetStats};
use datadog_trace_utils::tracer_header_tags::TracerHeaderTags;
use ddcommon::tag::Tag;
use tinybytes;
Expand All @@ -69,6 +69,7 @@ struct SidecarStats {
enqueued_apps: u32,
enqueued_telemetry_data: EnqueuedTelemetryStats,
remote_config_clients: u32,
remote_configs: MultiTargetStats,
telemetry_metrics_contexts: u32,
telemetry_worker: TelemetryWorkerStats,
telemetry_worker_errors: u32,
Expand Down Expand Up @@ -385,6 +386,7 @@ impl SidecarServer {
.sum::<u32>()
})
.sum(),
remote_configs: self.remote_configs.stats(),
telemetry_metrics_contexts: sessions
.values()
.map(|s| {
Expand Down Expand Up @@ -694,6 +696,7 @@ impl SidecarInterface for SidecarServer {
products: config.remote_config_products,
capabilities: config.remote_config_capabilities,
});
*session.remote_config_interval.lock().unwrap() = config.remote_config_poll_interval;
self.trace_flusher
.interval_ms
.store(config.flush_interval.as_millis() as u64, Ordering::Relaxed);
Expand Down Expand Up @@ -881,6 +884,7 @@ impl SidecarInterface for SidecarServer {
.as_ref()
.expect("Expecting remote config invariants to be set early")
.clone(),
*session.remote_config_interval.lock().unwrap(),
instance_id.runtime_id,
notify_target,
env_name.clone(),
Expand Down
22 changes: 14 additions & 8 deletions sidecar/src/shm_remote_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use datadog_ipc::platform::{FileBackedHandle, MappedMem, NamedShmHandle};
use datadog_ipc::rate_limiter::ShmLimiter;
use datadog_remote_config::fetch::{
ConfigInvariants, FileRefcountData, FileStorage, MultiTargetFetcher, MultiTargetHandlers,
NotifyTarget, RefcountedFile,
MultiTargetStats, NotifyTarget, RefcountedFile,
};
use datadog_remote_config::{RemoteConfigPath, RemoteConfigProduct, RemoteConfigValue, Target};
use priority_queue::PriorityQueue;
Expand Down Expand Up @@ -270,19 +270,20 @@ pub struct ShmRemoteConfigs<N: NotifyTarget + 'static>(
// pertaining to that env refcounting RemoteConfigIdentifier tuples by their unique runtime_id

impl<N: NotifyTarget + 'static> ShmRemoteConfigs<N> {
pub fn new(invariants: ConfigInvariants, on_dead: Box<dyn FnOnce() + Sync + Send>) -> Self {
let is_test = invariants.endpoint.test_token.is_some();
pub fn new(
invariants: ConfigInvariants,
on_dead: Box<dyn FnOnce() + Sync + Send>,
interval: Duration,
) -> Self {
let storage = ConfigFileStorage {
invariants: invariants.clone(),
writers: Default::default(),
on_dead: Arc::new(Mutex::new(Some(on_dead))),
};
let fetcher = MultiTargetFetcher::new(storage, invariants);
if is_test {
fetcher
.remote_config_interval
.store(10_000_000, Ordering::Relaxed);
}
fetcher
.remote_config_interval
.store(interval.as_nanos() as u64, Ordering::Relaxed);
ShmRemoteConfigs(fetcher)
}

Expand Down Expand Up @@ -315,6 +316,10 @@ impl<N: NotifyTarget + 'static> ShmRemoteConfigs<N> {
pub fn shutdown(&self) {
self.0.shutdown();
}

pub fn stats(&self) -> MultiTargetStats {
self.0.stats()
}
}

fn read_config(path: &str) -> anyhow::Result<(RemoteConfigValue, u32)> {
Expand Down Expand Up @@ -586,6 +591,7 @@ mod tests {
Box::new(|| {
tokio::spawn(on_dead_completer.complete(()));
}),
Duration::from_millis(10),
);

let mut manager = RemoteConfigManager::new(server.dummy_invariants());
Expand Down

0 comments on commit 7f40ba3

Please sign in to comment.