diff --git a/io-engine/src/rebuild/rebuild_job.rs b/io-engine/src/rebuild/rebuild_job.rs index 491427dfa..c068ed0b7 100644 --- a/io-engine/src/rebuild/rebuild_job.rs +++ b/io-engine/src/rebuild/rebuild_job.rs @@ -320,7 +320,7 @@ impl RebuildJob { } } -#[derive(Debug, Clone)] +#[derive(Debug)] struct RebuildFBendChan { sender: async_channel::Sender, } diff --git a/io-engine/src/rebuild/rebuild_job_backend.rs b/io-engine/src/rebuild/rebuild_job_backend.rs index 8aae76f79..1d571dd5a 100644 --- a/io-engine/src/rebuild/rebuild_job_backend.rs +++ b/io-engine/src/rebuild/rebuild_job_backend.rs @@ -336,7 +336,7 @@ impl RebuildJobBackendManager { } } - /// Reply back to the requester with the generic rebuild stats. + /// Reply to the requester with the generic rebuild stats. async fn reply_stats( &mut self, requester: oneshot::Sender, @@ -488,12 +488,32 @@ impl RebuildJobBackendManager { } impl Drop for RebuildJobBackendManager { + /// Close and drain comms channel allowing sender to see the cancellation + /// error, should it attempt to communicate. + /// This is required because it seems if a message was already sent then it + /// will not get dropped until both the receivers and the senders are + /// dropped. fn drop(&mut self) { + // set final stats now so failed stats requesters can still get stats. let stats = self.stats(); info!("{self}: backend dropped; final stats: {stats:?}"); - self.states.write().set_final_stats(stats); + self.states.write().set_final_stats(stats.clone()); + for sender in self.complete_chan.lock().drain(..) { sender.send(self.state()).ok(); } + + let chan = self.info_chan.receiver.clone(); + Reactors::master().send_future(async move { + // we close before draining, ensuring no new messages can be sent + chan.close(); + // now we can drain, and we could just ignore, but let's try to + // reply to any stats requests + while let Ok(message) = chan.recv().await { + if let RebuildJobRequest::GetStats(reply) = message { + reply.send(stats.clone()).ok(); + } + } + }) } } diff --git a/io-engine/src/rebuild/rebuild_state.rs b/io-engine/src/rebuild/rebuild_state.rs index 7dde6c9de..0a417ba4c 100644 --- a/io-engine/src/rebuild/rebuild_state.rs +++ b/io-engine/src/rebuild/rebuild_state.rs @@ -74,8 +74,10 @@ impl RebuildStates { } /// Set the final rebuild statistics. pub(super) fn set_final_stats(&mut self, mut stats: RebuildStats) { - stats.end_time = Some(Utc::now()); - self.final_stats = Some(stats); + if self.final_stats.is_none() { + stats.end_time = Some(Utc::now()); + self.final_stats = Some(stats); + } } /// Set's the next pending state