Skip to content

Commit

Permalink
Merge #1721
Browse files Browse the repository at this point in the history
1721: [ Cherry-Pick ] Fix stuck rebuilds and stuck nexus subsystems r=tiagolobocastro a=tiagolobocastro

    fix(nvmx/retire): disconnect failed controllers
    
    When we are pausing the nexus, all IO must get flushed before
    the subsystem pausing completes.
    If we can't flush the IO then pausing is stuck forever...
    
    The issue we have seen is that when IO's are stuck there's
    nothing which can fail them and allow pause to complete.
    One way this can happen is when the controller is failed as
    it seems in this case the io queues are not getting polled.
    
    A first fix that can be done is to piggy back on the adminq
    polling failure and use this to drive the removal of the
    failed child devices from the nexus per-core channels.
    
    A better approach might be needed in the future to be able
    to timeout the IOs even when no completions are processed
    in a given I/O qpair.
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>

---

    fix(opts): convert adminq poll period to us
    
    This seems to have been mistakenly added as ms.
    In practice this would have caused no harm as this value is not
    currently being overrided by the helm chart.
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>

---

    fix(rebuild): ensure comms channel is drained on drop
    
    When the rebuild backend is dropped, we must also drain the async channel.
    This covers a corner case where a message may be sent at the same time as
    we're dropping and in this case the message would hang.
    
    This is not a hang for prod as there we have timeouts which would
    eventually cancel the future and allow the drop, though this can still
    lead to timeouts and confusion.
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>


Co-authored-by: Tiago Castro <tiagolobocastro@gmail.com>
  • Loading branch information
mayastor-bors and tiagolobocastro committed Aug 27, 2024
2 parents 6939b15 + 428121f commit 08cdf74
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 22 deletions.
50 changes: 43 additions & 7 deletions io-engine-tests/src/compose/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,21 @@ use std::future::Future;
use tokio::sync::oneshot::channel;

use crate::mayastor_test_init_ex;
use io_engine::core::{
mayastor_env_stop,
MayastorCliArgs,
MayastorEnvironment,
Reactor,
Reactors,
GLOBAL_RC,
use io_engine::{
core::{
device_monitor_loop,
mayastor_env_stop,
runtime,
MayastorCliArgs,
MayastorEnvironment,
ProtectedSubsystems,
Reactor,
Reactors,
ResourceLockManager,
ResourceLockManagerConfig,
GLOBAL_RC,
},
grpc,
};
use std::time::Duration;

Expand Down Expand Up @@ -99,6 +107,34 @@ impl<'a> MayastorTest<'a> {
tokio::time::sleep(Duration::from_millis(500)).await;
}
}

/// Starts the device monitor loop which is required to fully
/// remove devices when they are not in use.
pub fn start_device_monitor(&self) {
runtime::spawn(device_monitor_loop());
}

/// Start the gRPC server which can be useful to debug tests.
pub fn start_grpc(&self) {
let cfg = ResourceLockManagerConfig::default()
.with_subsystem(ProtectedSubsystems::POOL, 32)
.with_subsystem(ProtectedSubsystems::NEXUS, 512)
.with_subsystem(ProtectedSubsystems::REPLICA, 1024);
ResourceLockManager::initialize(cfg);

let env = MayastorEnvironment::global_or_default();
runtime::spawn(async {
grpc::MayastorGrpcServer::run(
&env.node_name,
&env.node_nqn,
env.grpc_endpoint.unwrap(),
env.rpc_addr,
env.api_versions,
)
.await
.ok();
});
}
}

impl<'a> Drop for MayastorTest<'a> {
Expand Down
34 changes: 32 additions & 2 deletions io-engine/src/bdev/nexus/nexus_bdev_children.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use super::{
Nexus,
NexusChild,
NexusOperation,
NexusPauseState,
NexusState,
NexusStatus,
PersistOp,
Expand Down Expand Up @@ -787,6 +788,13 @@ impl<'n> DeviceEventListener for Nexus<'n> {
false,
);
}
DeviceEventType::AdminQNoticeCtrlFailed => {
Reactors::master().send_future(Nexus::disconnect_failed_child(
self.name.clone(),
dev_name.to_owned(),
));
}

_ => {
warn!(
"{:?}: ignoring event '{:?}' for device '{}'",
Expand Down Expand Up @@ -917,6 +925,28 @@ impl<'n> Nexus<'n> {
}
}

/// Disconnect a failed child from the given nexus.
async fn disconnect_failed_child(nexus_name: String, dev: String) {
let Some(nex) = nexus_lookup_mut(&nexus_name) else {
warn!(
"Nexus '{nexus_name}': retiring failed device '{dev}': \
nexus already gone"
);
return;
};

info!("Nexus '{nexus_name}': disconnect handlers for controller failed device: '{dev}'");

if nex.io_subsystem_state() == Some(NexusPauseState::Pausing) {
nex.traverse_io_channels_async((), |channel, _| {
channel.disconnect_detached_devices(|h| {
h.get_device().device_name() == dev && h.is_ctrlr_failed()
});
})
.await;
}
}

/// Retires a child device for the given nexus.
async fn child_retire_routine(
nexus_name: String,
Expand Down Expand Up @@ -981,12 +1011,12 @@ impl<'n> Nexus<'n> {
// channels, and all I/Os failing due to this device will eventually
// resubmit and succeeded (if any healthy children are left).
//
// Device disconnection is done in two steps (detach, than disconnect)
// Device disconnection is done in two steps (detach, then disconnect)
// in order to prevent an I/O race when retiring a device.
self.detach_device(&dev).await;

// Disconnect the devices with failed controllers _before_ pause,
// otherwise pause would stuck. Keep all controoled that are _not_
// otherwise pause would get stuck. Keep all controllers which are _not_
// failed (e.g., in the case I/O failed due to ENOSPC).
self.traverse_io_channels_async((), |channel, _| {
channel.disconnect_detached_devices(|h| h.is_ctrlr_failed());
Expand Down
3 changes: 2 additions & 1 deletion io-engine/src/bdev/nexus/nexus_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ impl<'n> Debug for NexusChannel<'n> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{io} chan '{nex}' core:{core}({cur}) [R:{r} W:{w} L:{l} C:{c}]",
"{io} chan '{nex}' core:{core}({cur}) [R:{r} W:{w} D:{d} L:{l} C:{c}]",
io = if self.is_io_chan { "I/O" } else { "Aux" },
nex = self.nexus.nexus_name(),
core = self.core,
cur = Cores::current(),
r = self.readers.len(),
w = self.writers.len(),
d = self.detached.len(),
l = self.io_logs.len(),
c = self.nexus.child_count(),
)
Expand Down
12 changes: 9 additions & 3 deletions io-engine/src/bdev/nvmx/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,13 +843,14 @@ pub extern "C" fn nvme_poll_adminq(ctx: *mut c_void) -> i32 {
if result < 0 {
if context.start_device_destroy() {
error!(
"process adminq: {}: {}",
"process adminq: {}: ctrl failed: {}, error: {}",
context.name,
context.is_failed(),
Errno::from_i32(result.abs())
);
info!("dispatching nexus fault and retire: {}", context.name);
let dev_name = context.name.to_string();
let carc = NVME_CONTROLLERS.lookup_by_name(&dev_name).unwrap();
let dev_name = context.name.as_str();
let carc = NVME_CONTROLLERS.lookup_by_name(dev_name).unwrap();
debug!(
?dev_name,
"notifying listeners of admin command completion failure"
Expand All @@ -863,6 +864,11 @@ pub extern "C" fn nvme_poll_adminq(ctx: *mut c_void) -> i32 {
?num_listeners,
"listeners notified of admin command completion failure"
);
} else if context.report_failed() {
if let Some(carc) = NVME_CONTROLLERS.lookup_by_name(&context.name) {
carc.lock()
.notify_listeners(DeviceEventType::AdminQNoticeCtrlFailed);
}
}
return 1;
}
Expand Down
15 changes: 15 additions & 0 deletions io-engine/src/bdev/nvmx/controller_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub(crate) struct TimeoutConfig {
reset_attempts: u32,
next_reset_time: Instant,
destroy_in_progress: AtomicCell<bool>,
report_failed: AtomicCell<bool>,
}

impl Drop for TimeoutConfig {
Expand All @@ -94,6 +95,7 @@ impl TimeoutConfig {
reset_attempts: MAX_RESET_ATTEMPTS,
next_reset_time: Instant::now(),
destroy_in_progress: AtomicCell::new(false),
report_failed: AtomicCell::new(true),
}
}

Expand All @@ -116,6 +118,19 @@ impl TimeoutConfig {
}
}

/// Check if the SPDK's nvme controller is failed.
pub fn is_failed(&self) -> bool {
self.ctrlr.is_failed
}
/// Check if we need to report the controller failure.
/// We only report this failure once.
pub fn report_failed(&mut self) -> bool {
if !self.is_failed() {
return false;
}
self.report_failed.compare_exchange(true, false).is_ok()
}

fn reset_cb(success: bool, ctx: *mut c_void) {
let timeout_ctx = TimeoutConfig::from_ptr(ctx as *mut TimeoutConfig);

Expand Down
8 changes: 7 additions & 1 deletion io-engine/src/core/device_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@ pub enum DeviceEventType {
DeviceResized,
/// TODO
MediaManagement,
/// TODO
/// Sent when admin q polling fails for the first time.
AdminCommandCompletionFailed,
/// When the adminq poll fails the first time, the controller may not yet
/// be failed.
/// Next time the admin q poll fails, if the controller is noticed as
/// failed for the first time, this event is sent, allowing further
/// clean up to be performed.
AdminQNoticeCtrlFailed,
}

/// TODO
Expand Down
4 changes: 2 additions & 2 deletions io-engine/src/core/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ type Result<T, E = EnvError> = std::result::Result<T, E>;
#[allow(dead_code)]
pub struct MayastorEnvironment {
pub node_name: String,
node_nqn: Option<String>,
pub node_nqn: Option<String>,
pub grpc_endpoint: Option<std::net::SocketAddr>,
pub registration_endpoint: Option<Uri>,
ps_endpoint: Option<String>,
Expand Down Expand Up @@ -421,7 +421,7 @@ pub struct MayastorEnvironment {
nvmf_tgt_interface: Option<String>,
/// NVMF target Command Retry Delay in x100 ms.
pub nvmf_tgt_crdt: [u16; TARGET_CRDT_LEN],
api_versions: Vec<ApiVersion>,
pub api_versions: Vec<ApiVersion>,
skip_sig_handler: bool,
enable_io_all_thrd_nexus_channels: bool,
developer_delay: bool,
Expand Down
2 changes: 1 addition & 1 deletion io-engine/src/rebuild/rebuild_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ impl RebuildJob {
}
}

#[derive(Debug, Clone)]
#[derive(Debug)]
struct RebuildFBendChan {
sender: async_channel::Sender<RebuildJobRequest>,
}
Expand Down
21 changes: 19 additions & 2 deletions io-engine/src/rebuild/rebuild_job_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ impl RebuildJobBackendManager {
}
}

/// Reply back to the requester with the generic rebuild stats.
/// Reply to the requester with the generic rebuild stats.
async fn reply_stats(
&mut self,
requester: oneshot::Sender<RebuildStats>,
Expand Down Expand Up @@ -488,10 +488,27 @@ impl RebuildJobBackendManager {
}

impl Drop for RebuildJobBackendManager {
/// Close and drain comms channel allowing sender to see the cancellation
/// error, should it attempt to communicate.
/// This is required because it seems if a message was already sent then it
/// will not get dropped until both the receivers and the senders are
/// dropped.
fn drop(&mut self) {
// set final stats now so failed stats requesters can still get stats.
let stats = self.stats();
info!("{self}: backend dropped; final stats: {stats:?}");
self.states.write().set_final_stats(stats);
self.states.write().set_final_stats(stats.clone());

// we close before draining, ensuring no new messages can be sent
self.info_chan.receiver.close();
// now we can drain, and we could just ignore, but let's try to
// reply to any stats requests
while let Ok(message) = self.info_chan.receiver.try_recv() {
if let RebuildJobRequest::GetStats(reply) = message {
reply.send(stats.clone()).ok();
}
}

for sender in self.complete_chan.lock().drain(..) {
sender.send(self.state()).ok();
}
Expand Down
6 changes: 4 additions & 2 deletions io-engine/src/rebuild/rebuild_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ impl RebuildStates {
}
/// Set the final rebuild statistics.
pub(super) fn set_final_stats(&mut self, mut stats: RebuildStats) {
stats.end_time = Some(Utc::now());
self.final_stats = Some(stats);
if self.final_stats.is_none() {
stats.end_time = Some(Utc::now());
self.final_stats = Some(stats);
}
}

/// Set's the next pending state
Expand Down
2 changes: 1 addition & 1 deletion io-engine/src/subsys/config/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ impl Default for NvmeBdevOpts {
nvme_adminq_poll_period_us: time_try_from_env(
"NVME_ADMINQ_POLL_PERIOD",
1_000,
TimeUnit::MilliSeconds,
TimeUnit::MicroSeconds,
),
nvme_ioq_poll_period_us: time_try_from_env(
"NVME_IOQ_POLL_PERIOD",
Expand Down

0 comments on commit 08cdf74

Please sign in to comment.