diff --git a/tracing-journald/Cargo.toml b/tracing-journald/Cargo.toml index 4e52ced32f..ef895bd8c5 100644 --- a/tracing-journald/Cargo.toml +++ b/tracing-journald/Cargo.toml @@ -16,6 +16,7 @@ keywords = ["tracing", "journald"] rust-version = "1.42.0" [dependencies] +libc = "0.2.107" tracing-core = { path = "../tracing-core", version = "0.1.10" } tracing-subscriber = { path = "../tracing-subscriber", version = "0.3" } diff --git a/tracing-journald/src/lib.rs b/tracing-journald/src/lib.rs index 54068f0f3d..06dc45e812 100644 --- a/tracing-journald/src/lib.rs +++ b/tracing-journald/src/lib.rs @@ -49,6 +49,11 @@ use tracing_core::{ }; use tracing_subscriber::{layer::Context, registry::LookupSpan}; +#[cfg(target_os = "linux")] +mod memfd; +#[cfg(target_os = "linux")] +mod socket; + /// Sends events and their fields to journald /// /// [journald conventions] for structured field names differ from typical tracing idioms, and journald @@ -109,6 +114,48 @@ impl Layer { self.field_prefix = x; self } + + #[cfg(not(unix))] + fn send_payload(&self, _opayload: &[u8]) -> io::Result<()> { + Err(io::Error::new( + io::ErrorKind::Unsupported, + "journald not supported on non-Unix", + )) + } + + #[cfg(unix)] + fn send_payload(&self, payload: &[u8]) -> io::Result { + self.socket.send(payload).or_else(|error| { + if Some(libc::EMSGSIZE) == error.raw_os_error() { + self.send_large_payload(payload) + } else { + Err(error) + } + }) + } + + #[cfg(all(unix, not(target_os = "linux")))] + fn send_large_payload(&self, _payload: &[u8]) -> io::Result { + Err(io::Error::new( + io::ErrorKind::Unsupported, + "Large payloads not supported on non-Linux OS", + )) + } + + /// Send large payloads to journald via a memfd. + #[cfg(target_os = "linux")] + fn send_large_payload(&self, payload: &[u8]) -> io::Result { + // If the payload's too large for a single datagram, send it through a memfd, see + // https://systemd.io/JOURNAL_NATIVE_PROTOCOL/ + use std::os::unix::prelude::AsRawFd; + // Write the whole payload to a memfd + let mut mem = memfd::create_sealable()?; + mem.write_all(payload)?; + // Fully seal the memfd to signal journald that its backing data won't resize anymore + // and so is safe to mmap. + memfd::seal_fully(mem.as_raw_fd())?; + socket::send_one_fd(&self.socket, mem.as_raw_fd()) + } } /// Construct a journald layer @@ -174,9 +221,8 @@ where self.field_prefix.as_ref().map(|x| &x[..]), )); - // What could we possibly do on error? - #[cfg(unix)] - let _ = self.socket.send(&buf); + // At this point we can't handle the error anymore so just ignore it. + let _ = self.send_payload(&buf); } } diff --git a/tracing-journald/src/memfd.rs b/tracing-journald/src/memfd.rs new file mode 100644 index 0000000000..5292db2928 --- /dev/null +++ b/tracing-journald/src/memfd.rs @@ -0,0 +1,31 @@ +//! memfd helpers. + +use libc::*; +use std::fs::File; +use std::io::Error; +use std::io::Result; +use std::os::raw::c_uint; +use std::os::unix::prelude::{FromRawFd, RawFd}; + +fn create(flags: c_uint) -> Result { + let fd = unsafe { memfd_create("tracing-journald\0".as_ptr() as *const c_char, flags) }; + if fd < 0 { + Err(Error::last_os_error()) + } else { + Ok(unsafe { File::from_raw_fd(fd as RawFd) }) + } +} + +pub fn create_sealable() -> Result { + create(MFD_ALLOW_SEALING | MFD_CLOEXEC) +} + +pub fn seal_fully(fd: RawFd) -> Result<()> { + let all_seals = F_SEAL_SHRINK | F_SEAL_GROW | F_SEAL_WRITE | F_SEAL_SEAL; + let result = unsafe { fcntl(fd, F_ADD_SEALS, all_seals) }; + if result < 0 { + Err(Error::last_os_error()) + } else { + Ok(()) + } +} diff --git a/tracing-journald/src/socket.rs b/tracing-journald/src/socket.rs new file mode 100644 index 0000000000..2b38a84859 --- /dev/null +++ b/tracing-journald/src/socket.rs @@ -0,0 +1,66 @@ +//! socket helpers. + +use std::io::{Error, Result}; +use std::mem::{size_of, zeroed}; +use std::os::unix::net::UnixDatagram; +use std::os::unix::prelude::{AsRawFd, RawFd}; +use std::ptr; + +use libc::*; + +const CMSG_BUFSIZE: usize = 64; + +#[repr(C)] +union AlignedBuffer { + buffer: T, + align: cmsghdr, +} + +fn assert_cmsg_bufsize() { + let space_one_fd = unsafe { CMSG_SPACE(size_of::() as u32) }; + assert!( + space_one_fd <= CMSG_BUFSIZE as u32, + "cmsghdr buffer too small (< {}) to hold a single fd", + space_one_fd + ); +} + +#[cfg(test)] +#[test] +fn cmsg_buffer_size_for_one_fd() { + assert_cmsg_bufsize() +} + +pub fn send_one_fd(socket: &UnixDatagram, fd: RawFd) -> Result { + assert_cmsg_bufsize(); + + let mut cmsg_buffer = AlignedBuffer { + buffer: ([0u8; CMSG_BUFSIZE]), + }; + let mut msg: msghdr = unsafe { zeroed() }; + + // We send no data body with this message. + msg.msg_iov = ptr::null_mut(); + msg.msg_iovlen = 0; + + msg.msg_control = unsafe { cmsg_buffer.buffer.as_mut_ptr() as _ }; + msg.msg_controllen = unsafe { CMSG_SPACE(size_of::() as _) as _ }; + + let mut cmsg: &mut cmsghdr = + unsafe { CMSG_FIRSTHDR(&msg).as_mut() }.expect("Control message buffer exhausted"); + + cmsg.cmsg_level = SOL_SOCKET; + cmsg.cmsg_type = SCM_RIGHTS; + cmsg.cmsg_len = unsafe { CMSG_LEN(size_of::() as _) as _ }; + + unsafe { ptr::write(CMSG_DATA(cmsg) as *mut RawFd, fd) }; + + let result = unsafe { sendmsg(socket.as_raw_fd(), &msg, libc::MSG_NOSIGNAL) }; + + if result < 0 { + Err(Error::last_os_error()) + } else { + // sendmsg returns the number of bytes written + Ok(result as usize) + } +} diff --git a/tracing-journald/tests/journal.rs b/tracing-journald/tests/journal.rs index bc58a061f2..25c1412e5d 100644 --- a/tracing-journald/tests/journal.rs +++ b/tracing-journald/tests/journal.rs @@ -54,14 +54,14 @@ impl PartialEq<[u8]> for Field { } } -/// Retry `f` 10 times 100ms apart. +/// Retry `f` 30 times 100ms apart, i.e. a total of three seconds. /// /// When `f` returns an error wait 100ms and try it again, up to ten times. /// If the last attempt failed return the error returned by that attempt. /// /// If `f` returns Ok immediately return the result. fn retry(f: impl Fn() -> Result) -> Result { - let attempts = 10; + let attempts = 30; let interval = Duration::from_millis(100); for attempt in (0..attempts).rev() { match f() { @@ -85,7 +85,8 @@ fn retry(f: impl Fn() -> Result) -> Result { fn read_from_journal(test_name: &str) -> Vec> { let stdout = String::from_utf8( Command::new("journalctl") - .args(&["--user", "--output=json"]) + // We pass --all to circumvent journalctl's default limit of 4096 bytes for field values + .args(&["--user", "--output=json", "--all"]) // Filter by the PID of the current test process .arg(format!("_PID={}", std::process::id())) .arg(format!("TEST_NAME={}", test_name)) @@ -97,10 +98,7 @@ fn read_from_journal(test_name: &str) -> Vec> { stdout .lines() - .map(|l| { - dbg!(l); - serde_json::from_str(l).unwrap() - }) + .map(|l| serde_json::from_str(l).unwrap()) .collect() } @@ -169,3 +167,18 @@ fn internal_null_byte() { assert_eq!(message["PRIORITY"], "6"); }); } + +#[test] +fn large_message() { + let large_string = "b".repeat(512_000); + with_journald(|| { + debug!(test.name = "large_message", "Message: {}", large_string); + + let message = retry_read_one_line_from_journal("large_message"); + assert_eq!( + message["MESSAGE"], + format!("Message: {}", large_string).as_str() + ); + assert_eq!(message["PRIORITY"], "6"); + }); +}