Skip to content

Commit

Permalink
Merge #1672
Browse files Browse the repository at this point in the history
1672: feat(rebuild): prefer a local replica as a rebuild source r=dsavitskiy a=dsavitskiy

This would reduce network traffic when rebuilding a remote replica and a local one is healthy.

Co-authored-by: Dmitry Savitskiy <dmitry.savitskiy@datacore.com>
  • Loading branch information
mayastor-bors and dsavitskiy committed Jun 14, 2024
2 parents b146fa7 + c3816b9 commit 044de92
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 9 deletions.
7 changes: 6 additions & 1 deletion io-engine-tests/src/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ impl NexusBuilder {
self.with_bdev(&bdev)
}

pub fn with_replicas(self, replicas: &[ReplicaBuilder]) -> Self {
let cc = replicas.iter().map(|r| self.replica_uri(r)).collect();
self.with_children(cc)
}

pub fn with_local_replica(self, r: &ReplicaBuilder) -> Self {
if r.rpc() != self.rpc() {
panic!("Replica is not local");
Expand All @@ -152,7 +157,7 @@ impl NexusBuilder {
self
}

fn replica_uri(&self, r: &ReplicaBuilder) -> String {
pub fn replica_uri(&self, r: &ReplicaBuilder) -> String {
if r.rpc() == self.rpc() {
r.bdev()
} else {
Expand Down
26 changes: 18 additions & 8 deletions io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,11 @@ impl<'n> Nexus<'n> {
info!("{self:?}: start rebuild request for {child_uri}");

// Find a healthy child to rebuild from.
let src_child_uri = match self
.children_iter()
.find(|c| c.is_healthy() && c.uri() != child_uri)
{
Some(child) => Ok(child.uri().to_owned()),
None => Err(Error::NoRebuildSource {
let Some(src_child_uri) = self.find_src_replica(child_uri) else {
return Err(Error::NoRebuildSource {
name: name.clone(),
}),
}?;
});
};

let dst_child_uri = match self.lookup_child(child_uri) {
Some(c) if c.is_opened_unsync() => {
Expand Down Expand Up @@ -157,6 +153,20 @@ impl<'n> Nexus<'n> {
})
}

/// Finds the best suited source replica for the given destination.
fn find_src_replica(&self, dst_uri: &str) -> Option<String> {
let candidates: Vec<_> = self
.children_iter()
.filter(|c| c.is_healthy() && c.uri() != dst_uri)
.collect();

candidates
.iter()
.find(|c| c.is_local().unwrap_or(false))
.or_else(|| candidates.first())
.map(|c| c.uri().to_owned())
}

/// TODO
async fn create_rebuild_job(
&self,
Expand Down
222 changes: 222 additions & 0 deletions io-engine/tests/nexus_rebuild_source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
pub mod common;

use common::{
compose::{
rpc::v1::{
nexus::{ChildState, ChildStateReason},
GrpcConnect,
SharedRpcHandle,
},
Binary,
Builder,
},
nexus::NexusBuilder,
pool::PoolBuilder,
replica::ReplicaBuilder,
};

const POOL_SIZE: u64 = 200;
const REPL_SIZE: u64 = 50;
const NEXUS_SIZE: u64 = REPL_SIZE;

struct TestNode {
idx: usize,
ms: SharedRpcHandle,
pool: PoolBuilder,
replicas: Vec<ReplicaBuilder>,
}

impl TestNode {
async fn next_replica(&mut self) -> ReplicaBuilder {
let mut repl = ReplicaBuilder::new(self.ms.clone())
.with_pool(&self.pool)
.with_name(&format!(
"repl_{i}_{j}",
i = self.idx,
j = self.replicas.len()
))
.with_new_uuid()
.with_size_mb(REPL_SIZE);

repl.create().await.unwrap();
repl.share().await.unwrap();
self.replicas.push(repl.clone());
repl
}

async fn clear(&mut self) {
for i in 0 .. self.replicas.len() {
self.replicas[i].destroy().await.unwrap();
}
self.replicas.clear();
}
}

async fn test_src_selection(
nodes: &mut Vec<TestNode>,
nex_node: usize,
child_cfg: Vec<usize>,
dst: usize,
expected_src_idx: usize,
) {
let to = std::time::Duration::from_secs(1);

let mut replicas = Vec::new();
for i in 0 .. child_cfg.len() {
replicas.push(nodes[child_cfg[i]].next_replica().await);
}

let mut nex = NexusBuilder::new(nodes[nex_node].ms.clone())
.with_name("nexus0")
.with_new_uuid()
.with_size_mb(NEXUS_SIZE)
.with_replicas(&replicas);

nex.create().await.unwrap();

println!("---------");
println!(
"> {child_cfg:?}: expect to rebuild #{dst} from #{expected_src_idx}"
);
let children = nex.get_nexus().await.unwrap().children;

for (idx, child) in children.iter().enumerate() {
println!(" [{idx}] {c:?}", c = child.uri);
}

let r = &replicas[dst];
println!(" rebuilding #{dst}: {uri}", uri = nex.replica_uri(r));

nex.offline_child_replica(r).await.unwrap();
nex.wait_replica_state(
r,
ChildState::Degraded,
Some(ChildStateReason::ByClient),
to,
)
.await
.unwrap();
nex.online_child_replica(r).await.unwrap();
nex.wait_children_online(to).await.unwrap();

let rec = nex
.get_rebuild_history()
.await
.unwrap()
.first()
.unwrap()
.clone();

let dst_idx = children
.iter()
.position(|c| c.uri == rec.child_uri)
.unwrap();
let src_idx = children.iter().position(|c| c.uri == rec.src_uri).unwrap();

println!(
" rebuilt #{dst_idx}: {dst} from #{src_idx}: {src}",
src = rec.src_uri,
dst = rec.child_uri
);

assert_eq!(
src_idx, expected_src_idx,
"Expected child index {expected_src_idx}, got {src_idx}"
);

nex.destroy().await.unwrap();
for node in nodes {
node.clear().await;
}
}

/// Should prefer a local replica for rebuild source.
#[tokio::test]
async fn nexus_rebuild_prefer_local_replica() {
common::composer_init();

let test = Builder::new()
.name("cargo-test")
.network("10.1.0.0/16")
.unwrap()
.add_container_bin(
"ms_0",
Binary::from_dbg("io-engine").with_args(vec![
"-l",
"1,2",
"-Fcolor,compact,host,nodate",
]),
)
.add_container_bin(
"ms_1",
Binary::from_dbg("io-engine").with_args(vec![
"-l",
"3,4",
"-Fcolor,compact,host,nodate",
]),
)
.add_container_bin(
"ms_2",
Binary::from_dbg("io-engine").with_args(vec![
"-l",
"5,6",
"-Fcolor,compact,host,nodate",
]),
)
.with_clean(true)
.build()
.await
.unwrap();

let conn = GrpcConnect::new(&test);

let mut nodes = Vec::new();

for idx in 0 .. 3 {
let ms = conn.grpc_handle_shared(&format!("ms_{idx}")).await.unwrap();

let mut pool = PoolBuilder::new(ms.clone())
.with_name(&format!("pool_{idx}"))
.with_new_uuid()
.with_malloc(&format!("mem_{idx}"), POOL_SIZE);

pool.create().await.unwrap();

nodes.push(TestNode {
idx,
ms,
pool,
replicas: Vec::new(),
});
}

// All local, should select first avail.
test_src_selection(&mut nodes, 0, vec![0, 0, 0], 0, 1).await;
test_src_selection(&mut nodes, 0, vec![0, 0, 0], 1, 0).await;
test_src_selection(&mut nodes, 0, vec![0, 0, 0], 2, 0).await;

// Local-remote-remote, should prefer the local one (here it is #0).
test_src_selection(&mut nodes, 0, vec![0, 1, 2], 0, 1).await;
test_src_selection(&mut nodes, 0, vec![0, 1, 2], 1, 0).await;
test_src_selection(&mut nodes, 0, vec![0, 1, 2], 2, 0).await;

// Remote-local-remote, should prefer the local one (here it is #1).
test_src_selection(&mut nodes, 0, vec![1, 0, 2], 0, 1).await;
test_src_selection(&mut nodes, 0, vec![1, 0, 2], 1, 0).await;
test_src_selection(&mut nodes, 0, vec![1, 0, 2], 2, 1).await;

// Remote-remote-local, should prefer the local one (here it is #2).
test_src_selection(&mut nodes, 0, vec![1, 2, 0], 0, 2).await;
test_src_selection(&mut nodes, 0, vec![1, 2, 0], 1, 2).await;
test_src_selection(&mut nodes, 0, vec![1, 2, 0], 2, 0).await;

// Remote-local-local, should prefer the first avail local one (#1 or #2).
test_src_selection(&mut nodes, 0, vec![1, 0, 0], 0, 1).await;
test_src_selection(&mut nodes, 0, vec![1, 0, 0], 1, 2).await;
test_src_selection(&mut nodes, 0, vec![1, 0, 0], 2, 1).await;

// All remote, should prefer the first avail.
test_src_selection(&mut nodes, 0, vec![1, 1, 1], 0, 1).await;
test_src_selection(&mut nodes, 0, vec![1, 1, 1], 1, 0).await;
test_src_selection(&mut nodes, 0, vec![1, 1, 1], 2, 0).await;
}

0 comments on commit 044de92

Please sign in to comment.