Skip to content

Commit

Permalink
appender: fix WorkerGuard not waiting for writer destruction (#1713)
Browse files Browse the repository at this point in the history
## Motivation

Can be though of as a continuation to #1120 and #1125.

Example with problematic racy behavior:
```
use std::io::Write;

struct TestDrop<T: Write>(T);

impl<T: Write> Drop for TestDrop<T> {
    fn drop(&mut self) {
        println!("Dropped");
    }
}

impl<T: Write> Write for TestDrop<T> {
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
        self.0.write(buf)
    }
    fn flush(&mut self) -> std::io::Result<()> {
        self.0.flush()
    }
}

fn main() {
    let writer = TestDrop(std::io::stdout());
    let (non_blocking, _guard) = tracing_appender::non_blocking(writer);
    tracing_subscriber::fmt().with_writer(non_blocking).init();
}
```

Running this test case in a loop with `while ./test | grep Dropped; do
done`, it can be seen that sometimes writer (`TestDrop`) is not dropped
and the message is not printed. I suppose that proper destruction of
non-blocking writer should properly destroy underlying writer.

## Solution

Solution involves joining `Worker` thread (that owns writer) after
waiting for it to almost finish avoiding potential deadlock (see
#1120 (comment))
  • Loading branch information
trtt authored Nov 13, 2021
1 parent c6055aa commit b439705
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 14 deletions.
34 changes: 24 additions & 10 deletions tracing-appender/src/non_blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub const DEFAULT_BUFFERED_LINES_LIMIT: usize = 128_000;
#[must_use]
#[derive(Debug)]
pub struct WorkerGuard {
_guard: Option<JoinHandle<()>>,
handle: Option<JoinHandle<()>>,
sender: Sender<Msg>,
shutdown: Sender<()>,
}
Expand Down Expand Up @@ -259,7 +259,7 @@ impl<'a> MakeWriter<'a> for NonBlocking {
impl WorkerGuard {
fn new(handle: JoinHandle<()>, sender: Sender<Msg>, shutdown: Sender<()>) -> Self {
WorkerGuard {
_guard: Some(handle),
handle: Some(handle),
sender,
shutdown,
}
Expand All @@ -268,21 +268,35 @@ impl WorkerGuard {

impl Drop for WorkerGuard {
fn drop(&mut self) {
match self
.sender
.send_timeout(Msg::Shutdown, Duration::from_millis(100))
{
let timeout = Duration::from_millis(100);
match self.sender.send_timeout(Msg::Shutdown, timeout) {
Ok(_) => {
// Attempt to wait for `Worker` to flush all messages before dropping. This happens
// when the `Worker` calls `recv()` on a zero-capacity channel. Use `send_timeout`
// so that drop is not blocked indefinitely.
// TODO: Make timeout configurable.
let _ = self.shutdown.send_timeout((), Duration::from_millis(1000));
let timeout = Duration::from_millis(1000);
match self.shutdown.send_timeout((), timeout) {
Err(SendTimeoutError::Timeout(_)) => {
eprintln!(
"Shutting down logging worker timed out after {:?}.",
timeout
);
}
_ => {
// At this point it is safe to wait for `Worker` destruction without blocking
if let Some(handle) = self.handle.take() {
if handle.join().is_err() {
eprintln!("Logging worker thread panicked");
}
};
}
}
}
Err(SendTimeoutError::Disconnected(_)) => (),
Err(SendTimeoutError::Timeout(e)) => println!(
"Failed to send shutdown signal to logging worker. Error: {:?}",
e
Err(SendTimeoutError::Timeout(_)) => eprintln!(
"Sending shutdown signal to logging worker timed out after {:?}",
timeout
),
}
}
Expand Down
6 changes: 2 additions & 4 deletions tracing-appender/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,15 @@ impl<T: Write + Send + Sync + 'static> Worker<T> {
match self.work() {
Ok(WorkerState::Continue) | Ok(WorkerState::Empty) => {}
Ok(WorkerState::Shutdown) | Ok(WorkerState::Disconnected) => {
drop(self.writer); // drop now in case it blocks
let _ = self.shutdown.recv();
break;
return;
}
Err(_) => {
// TODO: Expose a metric for IO Errors, or print to stderr
}
}
}
if let Err(e) = self.writer.flush() {
eprintln!("Failed to flush. Error: {}", e);
}
})
}
}

0 comments on commit b439705

Please sign in to comment.