Skip to content

Commit

Permalink
cluster: sort accepted view-change proposals
Browse files Browse the repository at this point in the history
This makes it possible to search a MultiNodeCut by SocketAddr without
doing a linear scan.

Unfortunately, SocketAddr doesn't implement Ord, so we have to base the
sort on tuples of (ip, port).

related: rust-lang/rust#72239
  • Loading branch information
nytopop committed May 24, 2020
1 parent 9f0ce60 commit 268fa86
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 6 deletions.
28 changes: 28 additions & 0 deletions src/cluster/cut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{
collections::HashMap,
convert::TryInto,
net::SocketAddr,
ops::Index,
result,
sync::{Arc, Weak},
};
Expand Down Expand Up @@ -89,6 +90,21 @@ pub struct MultiNodeCut {
pub(crate) kicked: Arc<[Member]>,
}

impl Index<SocketAddr> for MultiNodeCut {
type Output = Member;

/// Binary search for the provided `addr`.
///
/// O(log n)
///
/// # Panics
/// Panics if a member with `addr` doesn't exist in the configuration.
#[inline]
fn index(&self, addr: SocketAddr) -> &Self::Output {
self.lookup(addr).unwrap()
}
}

impl MultiNodeCut {
/// Returns the number of cuts that were skipped between this and the last received
/// cut.
Expand Down Expand Up @@ -139,6 +155,18 @@ impl MultiNodeCut {
pub fn kicked(&self) -> &Arc<[Member]> {
&self.kicked
}

/// Lookup a specific member in the configuration by socket address.
///
/// Executes in O(log n) time.
pub fn lookup(&self, addr: SocketAddr) -> Option<&Member> {
let key = |s: SocketAddr| (s.ip(), s.port());

self.members
.binary_search_by_key(&key(addr), |m| key(m.addr()))
.ok()
.map(|i| &self.members[i])
}
}

/// A cluster member.
Expand Down
13 changes: 10 additions & 3 deletions src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,14 +633,21 @@ impl<St: partition::Strategy> Cluster<St> {

let local_node = self.local_node();

joined.sort_by_key(|m| (m.addr().ip(), m.addr().port()));
kicked.sort_by_key(|m| (m.addr().ip(), m.addr().port()));

let mut members: Vec<_> = (state.nodes.iter())
.map(|node| self.resolve_member(state, node).unwrap())
.collect();

members.sort_by_key(|m| (m.addr().ip(), m.addr().port()));

let cut = MultiNodeCut {
skipped: 0,
local_addr: self.addr,
degraded: !state.nodes.contains(&local_node),
conf_id: state.refresh_config(),
members: (state.nodes.iter())
.map(|node| self.resolve_member(state, node).unwrap())
.collect(),
members: members.into(),
joined: joined.into(),
kicked: kicked.into(),
};
Expand Down
12 changes: 9 additions & 3 deletions src/cluster/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,20 @@ impl<St: Strategy> Cluster<St> {
assert!(state.uuids.insert(uuid));
}

joined.sort_by_key(|m| (m.addr().ip(), m.addr().port()));

let mut members: Vec<_> = (state.nodes.iter())
.map(|node| self.resolve_member(&state, node).unwrap())
.collect();

members.sort_by_key(|m| (m.addr().ip(), m.addr().port()));

let cut = MultiNodeCut {
skipped: 0,
local_addr: self.addr,
degraded: !state.nodes.contains(&self.local_node()),
conf_id: state.refresh_config(),
members: (state.nodes.iter())
.map(|node| self.resolve_member(&state, node).unwrap())
.collect(),
members: members.into(),
joined: joined.into(),
kicked: vec![].into(),
};
Expand Down

0 comments on commit 268fa86

Please sign in to comment.