Skip to content

Commit

Permalink
fix(rebuild): ensure comms channel is drained on drop
Browse files Browse the repository at this point in the history
When the rebuild backend is dropped, we must also drain the async channel.
This covers a corner case where a message may be sent at the same time as
we're dropping and in this case the message would hang.

This is not a hang for prod as there we have timeouts which would
eventually cancel the future and allow the drop, though this can still
lead to timeouts and confusion.

Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>
  • Loading branch information
tiagolobocastro committed Aug 14, 2024
1 parent 6939b15 commit 3148374
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 5 deletions.
2 changes: 1 addition & 1 deletion io-engine/src/rebuild/rebuild_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ impl RebuildJob {
}
}

#[derive(Debug, Clone)]
#[derive(Debug)]
struct RebuildFBendChan {
sender: async_channel::Sender<RebuildJobRequest>,
}
Expand Down
24 changes: 22 additions & 2 deletions io-engine/src/rebuild/rebuild_job_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ impl RebuildJobBackendManager {
}
}

/// Reply back to the requester with the generic rebuild stats.
/// Reply to the requester with the generic rebuild stats.
async fn reply_stats(
&mut self,
requester: oneshot::Sender<RebuildStats>,
Expand Down Expand Up @@ -488,12 +488,32 @@ impl RebuildJobBackendManager {
}

impl Drop for RebuildJobBackendManager {
/// Close and drain comms channel allowing sender to see the cancellation
/// error, should it attempt to communicate.
/// This is required because it seems if a message was already sent then it
/// will not get dropped until both the receivers and the senders are
/// dropped.
fn drop(&mut self) {
// set final stats now so failed stats requesters can still get stats.
let stats = self.stats();
info!("{self}: backend dropped; final stats: {stats:?}");
self.states.write().set_final_stats(stats);
self.states.write().set_final_stats(stats.clone());

for sender in self.complete_chan.lock().drain(..) {
sender.send(self.state()).ok();
}

let chan = self.info_chan.receiver.clone();
Reactors::master().send_future(async move {
// we close before draining, ensuring no new messages can be sent
chan.close();
// now we can drain, and we could just ignore, but let's try to
// reply to any stats requests
while let Ok(message) = chan.recv().await {
if let RebuildJobRequest::GetStats(reply) = message {
reply.send(stats.clone()).ok();
}
}
})
}
}
6 changes: 4 additions & 2 deletions io-engine/src/rebuild/rebuild_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ impl RebuildStates {
}
/// Set the final rebuild statistics.
pub(super) fn set_final_stats(&mut self, mut stats: RebuildStats) {
stats.end_time = Some(Utc::now());
self.final_stats = Some(stats);
if self.final_stats.is_none() {
stats.end_time = Some(Utc::now());
self.final_stats = Some(stats);
}
}

/// Set's the next pending state
Expand Down

0 comments on commit 3148374

Please sign in to comment.