Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unsub backoff spec changes #2403

Merged
merged 10 commits into from
Jan 17, 2022
4 changes: 4 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@

- Fix `GossipsubConfigBuilder::build()` requiring `&self` to live for `'static` (see [PR 2409])

- Implement Unsubscribe backoff as per [libp2p specs PR 383] (see [PR 2403]).

[PR 2346]: https://github.com/libp2p/rust-libp2p/pull/2346
[PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339
[PR 2327]: https://github.com/libp2p/rust-libp2p/pull/2327
[PR 2408]: https://github.com/libp2p/rust-libp2p/pull/2408
[PR 2409]: https://github.com/libp2p/rust-libp2p/pull/2409
[PR 2403]: https://github.com/libp2p/rust-libp2p/pull/2403
[libp2p specs PR 383]: https://github.com/libp2p/specs/pull/383

# 0.34.0 [2021-11-16]

Expand Down
26 changes: 21 additions & 5 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,7 @@ where
topic_hash: &TopicHash,
peer: &PeerId,
do_px: bool,
on_unsubscribe: bool,
) -> GossipsubControlAction {
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.prune(peer, topic_hash.clone());
Expand Down Expand Up @@ -1088,14 +1089,19 @@ where
Vec::new()
};

let backoff = if on_unsubscribe {
self.config.unsubscribe_backoff()
} else {
self.config.prune_backoff()
};

// update backoff
self.backoffs
.update_backoff(topic_hash, peer, self.config.prune_backoff());
self.backoffs.update_backoff(topic_hash, peer, backoff);

GossipsubControlAction::Prune {
topic_hash: topic_hash.clone(),
peers,
backoff: Some(self.config.prune_backoff().as_secs()),
backoff: Some(backoff.as_secs()),
}
}

Expand All @@ -1111,7 +1117,9 @@ where
for peer in peers {
// Send a PRUNE control message
debug!("LEAVE: Sending PRUNE to peer: {:?}", peer);
let control = self.make_prune(topic_hash, &peer, self.config.do_px());
let on_unsubscribe = true;
let control =
self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe);
Self::control_pool_add(&mut self.control_pool, peer, control);

// If the peer did not previously exist in any mesh, inform the handler
Expand Down Expand Up @@ -1487,9 +1495,10 @@ where

if !to_prune_topics.is_empty() {
// build the prune messages to send
let on_unsubscribe = false;
let prune_messages = to_prune_topics
.iter()
.map(|t| self.make_prune(t, peer_id, do_px))
.map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
.collect();
// Send the prune messages to the peer
debug!(
Expand Down Expand Up @@ -2598,6 +2607,9 @@ where
// NOTE: In this case a peer has been added to a topic mesh, and removed from another.
// It therefore must be in at least one mesh and we do not need to inform the handler
// of its removal from another.

// The following prunes are not due to unsubscribing.
let on_unsubscribe = false;
if let Some(topics) = to_prune.remove(&peer) {
let mut prunes = topics
.iter()
Expand All @@ -2606,6 +2618,7 @@ where
topic_hash,
&peer,
self.config.do_px() && !no_px.contains(&peer),
on_unsubscribe,
)
})
.collect::<Vec<_>>();
Expand All @@ -2630,13 +2643,16 @@ where
}

// handle the remaining prunes
// The following prunes are not due to unsubscribing.
let on_unsubscribe = false;
for (peer, topics) in to_prune.iter() {
let mut remaining_prunes = Vec::new();
for topic_hash in topics {
let prune = self.make_prune(
topic_hash,
peer,
self.config.do_px() && !no_px.contains(peer),
on_unsubscribe,
);
remaining_prunes.push(prune);
// inform the handler
Expand Down
71 changes: 71 additions & 0 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2032,6 +2032,77 @@ mod tests {
);
}

#[test]
fn test_unsubscribe_backoff() {
const HEARTBEAT_INTERVAL: Duration = Duration::from_millis(100);
let config = GossipsubConfigBuilder::default()
.backoff_slack(1)
// ensure a prune_backoff > unsubscribe_backoff
.prune_backoff(Duration::from_secs(5))
.unsubscribe_backoff(1)
.heartbeat_interval(HEARTBEAT_INTERVAL)
.build()
.unwrap();

let topic = String::from("test");
// only one peer => mesh too small and will try to regraft as early as possible
let (mut gs, _, topics) = inject_nodes1()
.peer_no(1)
.topics(vec![topic.clone()])
.to_subscribe(true)
.gs_config(config)
.create_network();

let _ = gs.unsubscribe(&Topic::new(topic.clone()));

assert_eq!(
count_control_msgs(&gs, |_, m| match m {
GossipsubControlAction::Prune { backoff, .. } => backoff == &Some(1),
_ => false,
}),
1,
"Peer should be pruned with `unsubscribe_backoff`."
);

let _ = gs.subscribe(&Topic::new(topics[0].to_string()));

// forget all events until now
flush_events(&mut gs);

// call heartbeat
gs.heartbeat();

// Sleep for one second and apply 10 regular heartbeats (interval = 100ms).
for _ in 0..10 {
sleep(HEARTBEAT_INTERVAL);
gs.heartbeat();
}

// Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat
// is needed).
assert_eq!(
count_control_msgs(&gs, |_, m| match m {
GossipsubControlAction::Graft { .. } => true,
_ => false,
}),
0,
"Graft message created too early within backoff period"
);

// Heartbeat one more time this should graft now
sleep(HEARTBEAT_INTERVAL);
gs.heartbeat();

// check that graft got created
assert!(
count_control_msgs(&gs, |_, m| match m {
GossipsubControlAction::Graft { .. } => true,
_ => false,
}) > 0,
"No graft message was created after backoff period"
);
}

#[test]
fn test_flood_publish() {
let config: GossipsubConfig = GossipsubConfig::default();
Expand Down
26 changes: 26 additions & 0 deletions protocols/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub struct GossipsubConfig {
do_px: bool,
prune_peers: usize,
prune_backoff: Duration,
unsubscribe_backoff: Duration,
backoff_slack: u32,
flood_publish: bool,
graft_flood_threshold: Duration,
Expand Down Expand Up @@ -276,6 +277,15 @@ impl GossipsubConfig {
self.prune_backoff
}

/// Controls the backoff time when unsubscribing from a topic.
///
/// This is how long to wait before resubscribing to the topic. A short backoff period in case
/// of an unsubscribe event allows reaching a healthy mesh in a more timely manner. The default
/// is 10 seconds.
pub fn unsubscribe_backoff(&self) -> Duration {
self.unsubscribe_backoff
}

/// Number of heartbeat slots considered as slack for backoffs. This gurantees that we wait
/// at least backoff_slack heartbeats after a backoff is over before we try to graft. This
/// solves problems occuring through high latencies. In particular if
Expand Down Expand Up @@ -421,6 +431,7 @@ impl Default for GossipsubConfigBuilder {
do_px: false,
prune_peers: 0, // NOTE: Increasing this currently has little effect until Signed records are implemented.
prune_backoff: Duration::from_secs(60),
unsubscribe_backoff: Duration::from_secs(10),
backoff_slack: 1,
flood_publish: true,
graft_flood_threshold: Duration::from_secs(10),
Expand Down Expand Up @@ -636,6 +647,16 @@ impl GossipsubConfigBuilder {
self
}

/// Controls the backoff time when unsubscribing from a topic.
///
/// This is how long to wait before resubscribing to the topic. A short backoff period in case
/// of an unsubscribe event allows reaching a healthy mesh in a more timely manner. The default
/// is 10 seconds.
pub fn unsubscribe_backoff(&mut self, unsubscribe_backoff: u64) -> &mut Self {
self.config.unsubscribe_backoff = Duration::from_secs(unsubscribe_backoff);
self
}

/// Number of heartbeat slots considered as slack for backoffs. This gurantees that we wait
/// at least backoff_slack heartbeats after a backoff is over before we try to graft. This
/// solves problems occuring through high latencies. In particular if
Expand Down Expand Up @@ -777,6 +798,11 @@ impl GossipsubConfigBuilder {
"The following inequality doesn't hold mesh_outbound_min <= self.config.mesh_n / 2",
);
}

if self.config.unsubscribe_backoff.as_millis() == 0 {
return Err("The unsubscribe_backoff parameter should be positive.");
}

Ok(self.config.clone())
}
}
Expand Down