Skip to content

Commit

Permalink
Merge #1717
Browse files Browse the repository at this point in the history
1717: Cherry-pick 1711 - reactor block_on is harmful as it delays message polling r=tiagolobocastro a=tiagolobocastro

Reactor block_on may prevent spdk thread messages from running and therefore this can lead to starvation of messages pulled from the thread ring, which are not polled during the block_on.
There are still a few uses remaining, most during init setup, so mostly harmless, though the Nexus Bdev destruction which runs on blocking code, does still contain a block_on.

---

    fix(nvmf/target): remove usage of block_on
    
    Split creating from starting the subsystem.
    This way we can start the subsystem in master reactor, and then move
    to the next spdk subsystem.
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>

---

    fix(nexus-child/unplug): remove usage of block_on
    
    Initially this block_on was added because the remove callback was running in blocking
    fashion, but this has since changed and unplug is actually called from async context.
    As such, we don't need the block_on and simply call the async code directly.
    Also, simplify complete notification, as we can simply close the sender.
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>

---

    fix(nvmx/qpair): return errno with absolute value
    
    Otherwise a returned negative value translates into an unknown Errno.
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>

---

    feat: allow custom fabrics connect timeout
    
    Allows passing this via env NVMF_FABRICS_CONNECT_TIMEOUT.
    Also defaults it to 1s for now, rather than 500ms.
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>

Co-authored-by: Tiago Castro <tiagolobocastro@gmail.com>
  • Loading branch information
mayastor-bors and tiagolobocastro committed Aug 12, 2024
2 parents e7383a4 + 2192d31 commit d09082c
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 82 deletions.
51 changes: 31 additions & 20 deletions io-engine/src/bdev/nexus/nexus_bdev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1380,30 +1380,41 @@ impl<'n> BdevOps for Nexus<'n> {
return;
}

let self_ptr = unsafe { unsafe_static_ptr(&self) };

Reactor::block_on(async move {
let self_ref = unsafe { &mut *self_ptr };

// TODO: double-check interaction with rebuild job logic
// TODO: cancel rebuild jobs?
let n = self_ref.children.iter().filter(|c| c.is_opened()).count();

if n > 0 {
warn!(
"{:?}: {} open children remain(s), closing...",
self_ref, n
);
let open_children =
self.children.iter().filter(|c| c.is_opened()).count();
// TODO: This doesn't seem possible to happen at this stage, but seems
// we should still try to handle this in separate future since
// we're handling it here anyway as a block_on is not safe to
// use for running production code.
if open_children > 0 {
tracing::error!(
"BUG => {open_children} child(ren) left open during nexus bdev destruction"
);
let self_ptr = unsafe { unsafe_static_ptr(&self) };
Reactor::block_on(async move {
let self_ref = unsafe { &mut *self_ptr };

// TODO: double-check interaction with rebuild job logic
// TODO: cancel rebuild jobs?
let n =
self_ref.children.iter().filter(|c| c.is_opened()).count();

if n > 0 {
warn!(
"{:?}: {} open children remain(s), closing...",
self_ref, n
);

for child in self_ref.children.iter() {
if child.is_opened() {
child.close().await.ok();
for child in self_ref.children.iter() {
if child.is_opened() {
child.close().await.ok();
}
}
}
}

self_ref.children.clear();
});
self_ref.children.clear();
});
}

self.as_mut().unregister_io_device();
unsafe {
Expand Down
2 changes: 1 addition & 1 deletion io-engine/src/bdev/nexus/nexus_bdev_children.rs
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ impl<'n> Nexus<'n> {
nexus_name,
child_device, "Unplugging nexus child device",
);
child.unplug();
child.unplug().await;
}
None => {
warn!(
Expand Down
34 changes: 12 additions & 22 deletions io-engine/src/bdev/nexus/nexus_child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ use crate::{
BlockDeviceHandle,
CoreError,
DeviceEventSink,
Reactor,
Reactors,
VerboseError,
},
eventing::replica_events::state_change_event_meta,
Expand Down Expand Up @@ -1109,7 +1107,7 @@ impl<'c> NexusChild<'c> {
/// underlying device is removed.
///
/// Note: The descriptor *must* be dropped for the unplug to complete.
pub(crate) fn unplug(&mut self) {
pub(crate) async fn unplug(&mut self) {
info!("{self:?}: unplugging child...");

let state = self.state();
Expand Down Expand Up @@ -1139,12 +1137,10 @@ impl<'c> NexusChild<'c> {
// device-related events directly.
if state != ChildState::Faulted(FaultReason::IoError) {
let nexus_name = self.parent.clone();
Reactor::block_on(async move {
match nexus_lookup_mut(&nexus_name) {
Some(n) => n.reconfigure(DrEvent::ChildUnplug).await,
None => error!("Nexus '{nexus_name}' not found"),
}
});
match nexus_lookup_mut(&nexus_name) {
Some(n) => n.reconfigure(DrEvent::ChildUnplug).await,
None => error!("Nexus '{nexus_name}' not found"),
}
}

if is_destroying {
Expand All @@ -1153,22 +1149,16 @@ impl<'c> NexusChild<'c> {
self.device_descriptor.take();
}

self.unplug_complete();
info!("{self:?}: child successfully unplugged");
self.unplug_complete().await;
}

/// Signal that the child unplug is complete.
fn unplug_complete(&self) {
let sender = self.remove_channel.0.clone();
let name = self.name.clone();
Reactors::current().send_future(async move {
if let Err(e) = sender.send(()).await {
error!(
"Failed to send unplug complete for child '{}': {}",
name, e
);
}
});
async fn unplug_complete(&self) {
if let Err(error) = self.remove_channel.0.send(()).await {
info!("{self:?}: failed to send unplug complete: {error}");
} else {
info!("{self:?}: child successfully unplugged");
}
}

/// create a new nexus child
Expand Down
7 changes: 5 additions & 2 deletions io-engine/src/bdev/nvmx/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,8 +1072,11 @@ pub(crate) mod options {
self.admin_timeout_ms = Some(timeout);
self
}
pub fn with_fabrics_connect_timeout_us(mut self, timeout: u64) -> Self {
self.fabrics_connect_timeout_us = Some(timeout);
pub fn with_fabrics_connect_timeout_us<T: Into<Option<u64>>>(
mut self,
timeout: T,
) -> Self {
self.fabrics_connect_timeout_us = timeout.into();
self
}

Expand Down
4 changes: 2 additions & 2 deletions io-engine/src/bdev/nvmx/qpair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,9 @@ impl<'a> Connection<'a> {
0 => Ok(false),
// Connection is still in progress, keep polling.
1 => Ok(true),
// Error occured during polling.
// Error occurred during polling.
e => {
let e = Errno::from_i32(-e);
let e = Errno::from_i32(e.abs());
error!(?self, "I/O qpair async connection polling error: {e}");
Err(e)
}
Expand Down
6 changes: 6 additions & 0 deletions io-engine/src/bdev/nvmx/uri.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ impl<'probe> NvmeControllerContext<'probe> {
)
.with_transport_retry_count(
Config::get().nvme_bdev_opts.transport_retry_count as u8,
)
.with_fabrics_connect_timeout_us(
crate::subsys::config::opts::try_from_env(
"NVMF_FABRICS_CONNECT_TIMEOUT",
1_000_000,
),
);

let hostnqn = template.hostnqn.clone().or_else(|| {
Expand Down
9 changes: 8 additions & 1 deletion io-engine/src/core/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,15 @@ impl Reactor {
task
}

/// spawn a future locally on the current core block until the future is
/// Spawns a future locally on the current core block until the future is
/// completed. The master core is used.
/// # Warning
/// This code should only be used for testing and not running production!
/// This is because when calling block_on from a thread_poll callback, we
/// may be leaving messages behind, which can lead to timeouts etc...
/// A work-around to make this safe could be to potentially "pull" the
/// messages which haven't been polled, and poll them here before
/// proceeding to re-poll via thread_poll again.
pub fn block_on<F, R>(future: F) -> Option<R>
where
F: Future<Output = R> + 'static,
Expand Down
17 changes: 0 additions & 17 deletions io-engine/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use futures::channel::oneshot::Receiver;
use nix::errno::Errno;
pub use server::MayastorGrpcServer;
use std::{
error::Error,
fmt::{Debug, Display},
future::Future,
time::Duration,
Expand Down Expand Up @@ -158,22 +157,6 @@ macro_rules! spdk_submit {

pub type GrpcResult<T> = std::result::Result<Response<T>, Status>;

/// call the given future within the context of the reactor on the first core
/// on the init thread, while the future is waiting to be completed the reactor
/// is continuously polled so that forward progress can be made
pub fn rpc_call<G, I, L, A>(future: G) -> Result<Response<A>, tonic::Status>
where
G: Future<Output = Result<I, L>> + 'static,
I: 'static,
L: Into<Status> + Error + 'static,
A: 'static + From<I>,
{
Reactor::block_on(future)
.unwrap()
.map(|r| Response::new(A::from(r)))
.map_err(|e| e.into())
}

/// Submit rpc code to the primary reactor.
pub fn rpc_submit<F, R, E>(
future: F,
Expand Down
2 changes: 1 addition & 1 deletion io-engine/src/subsys/config/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ pub struct NvmfTcpTransportOpts {
}

/// try to read an env variable or returns the default when not found
fn try_from_env<T>(name: &str, default: T) -> T
pub(crate) fn try_from_env<T>(name: &str, default: T) -> T
where
T: FromStr + Display + Copy,
<T as FromStr>::Err: Debug + Display,
Expand Down
34 changes: 18 additions & 16 deletions io-engine/src/subsys/nvmf/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use spdk_rs::libspdk::{

use crate::{
constants::NVME_CONTROLLER_MODEL_ID,
core::{Cores, Mthread, Reactor, Reactors},
core::{Cores, Mthread, Reactors},
ffihelper::{AsStr, FfiResult},
subsys::{
nvmf::{
Expand Down Expand Up @@ -277,9 +277,9 @@ impl Target {
Ok(())
}

/// enable discovery for the target -- note that the discovery system is not
/// started
fn enable_discovery(&self) {
/// Create the discovery for the target -- note that the discovery system is
/// not started.
fn create_discovery_subsystem(&self) -> NvmfSubsystem {
debug!("enabling discovery for target");
let discovery = unsafe {
NvmfSubsystem::from(spdk_nvmf_subsystem_create(
Expand All @@ -303,12 +303,7 @@ impl Target {

discovery.allow_any(true);

Reactor::block_on(async {
let nqn = discovery.get_nqn();
if let Err(e) = discovery.start().await {
error!("Error starting subsystem '{}': {}", nqn, e.to_string());
}
});
discovery
}

/// stop all subsystems on this target we are borrowed here
Expand Down Expand Up @@ -362,13 +357,20 @@ impl Target {

/// Final state for the target during init.
pub fn running(&mut self) {
self.enable_discovery();
info!(
"nvmf target accepting new connections and is ready to roll..{}",
'\u{1F483}'
);
let discovery = self.create_discovery_subsystem();

unsafe { spdk_subsystem_init_next(0) }
Reactors::master().send_future(async move {
let nqn = discovery.get_nqn();
if let Err(error) = discovery.start().await {
error!("Error starting subsystem '{nqn}': {error}");
}

info!(
"nvmf target accepting new connections and is ready to roll..{}",
'\u{1F483}'
);
unsafe { spdk_subsystem_init_next(0) }
})
}

/// Shutdown procedure.
Expand Down

0 comments on commit d09082c

Please sign in to comment.