Skip to content

Commit

Permalink
Lock around the sender
Browse files Browse the repository at this point in the history
  • Loading branch information
OmegaJak authored and inikulin committed Aug 13, 2024
1 parent 6264d3c commit d319361
Showing 1 changed file with 28 additions and 16 deletions.
44 changes: 28 additions & 16 deletions foundations/src/telemetry/memory_profiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ use std::io::Read;
use std::os::raw::c_char;
use std::sync::mpsc::{self};
use tempfile::NamedTempFile;
use tokio::sync::oneshot;
use tokio::sync::{oneshot, Mutex as AsyncMutex};

static PROFILER: OnceCell<Option<MemoryProfiler>> = OnceCell::new();
static HEAP_PROFILE_REQUEST_SENDER: OnceCell<
AsyncMutex<mpsc::Sender<oneshot::Sender<Result<String>>>>,
> = OnceCell::new();

mod control {
use super::*;
Expand Down Expand Up @@ -41,12 +44,16 @@ mod control {
}
}

// NOTE: prevent direct construction by the external code.
#[derive(Copy, Clone)]
struct Seal;

/// A safe interface for [jemalloc]'s memory profiling functionality.
///
/// [jemalloc]: https://github.com/jemalloc/jemalloc
#[derive(Clone)]
#[derive(Copy, Clone)]
pub struct MemoryProfiler {
request_heap_profile: mpsc::Sender<oneshot::Sender<Result<String>>>,
_seal: Seal,
}

impl MemoryProfiler {
Expand All @@ -71,7 +78,7 @@ impl MemoryProfiler {

PROFILER
.get_or_try_init(|| init_profiler(settings))
.cloned()
.copied()
}

/// Returns a heap profile.
Expand All @@ -98,8 +105,17 @@ impl MemoryProfiler {
/// }
/// ```
pub async fn heap_profile(&self) -> Result<String> {
let Some(sender_mutex) = HEAP_PROFILE_REQUEST_SENDER.get() else {
return Err("Profile request sender is not initialized".into());
};

// NOTE: we use tokio mutex here, so we can hold the lock across `await` points.
let Ok(sender_guard) = sender_mutex.try_lock() else {
return Err("profiling is already in progress".into());
};

let (response_sender, response_receiver) = oneshot::channel();
self.request_heap_profile.send(response_sender)?;
sender_guard.send(response_sender)?;

response_receiver.await?
}
Expand Down Expand Up @@ -139,6 +155,9 @@ fn init_profiler(settings: &MemoryProfilerSettings) -> BootstrapResult<Option<Me
}

let (request_sender, request_receiver) = mpsc::channel();
HEAP_PROFILE_REQUEST_SENDER
.set(AsyncMutex::new(request_sender))
.map_err(|_| anyhow::anyhow!("request sender had already been initialized"))?;

std::thread::spawn(move || heap_profile_thread(request_receiver));

Expand All @@ -152,9 +171,7 @@ fn init_profiler(settings: &MemoryProfilerSettings) -> BootstrapResult<Option<Me
control::write(control::PROF_ACTIVE, true)
.map_err(|e| BootstrapError::new(e).context("failed to activate profiling"))?;

Ok(Some(MemoryProfiler {
request_heap_profile: request_sender,
}))
Ok(Some(MemoryProfiler { _seal: Seal }))
}

fn heap_profile_thread(receive_request: mpsc::Receiver<oneshot::Sender<Result<String>>>) {
Expand Down Expand Up @@ -195,14 +212,9 @@ fn collect_heap_profile() -> Result<String> {
#[cfg(test)]
mod tests {
use super::*;
use crate::{
security::{
allow_list,
common_syscall_allow_lists::{ASYNC, SERVICE_BASICS},
enable_syscall_sandboxing, ViolationAction,
},
telemetry::settings::MemoryProfilerSettings,
};
use crate::security::common_syscall_allow_lists::{ASYNC, SERVICE_BASICS};
use crate::security::{allow_list, enable_syscall_sandboxing, ViolationAction};
use crate::telemetry::settings::MemoryProfilerSettings;

#[test]
fn sample_interval_out_of_bounds() {
Expand Down

0 comments on commit d319361

Please sign in to comment.