Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
client/authority-discovery: Publish and query on exponential interval (
Browse files Browse the repository at this point in the history
…#7545)

* client/authority-discovery: Publish and query on exponential interval

When a node starts up publishing and querying might fail due to various
reasons, for example due to being not yet fully bootstrapped on the DHT.
Thus one should retry rather sooner than later. On the other hand, a
long running node is likely well connected and thus timely retries are
not needed. For this reasoning use an exponentially increasing interval
for `publish_interval`, `query_interval` and
`priority_group_set_interval` instead of a constant interval.

* client/authority-discovery/src/interval.rs: Add license header

* .maintain/gitlab: Ensure adder collator tests are run on CI
  • Loading branch information
mxinden authored Nov 23, 2020
1 parent f16acff commit 0840c58
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 136 deletions.
3 changes: 3 additions & 0 deletions .maintain/gitlab/check_polkadot_companion_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,6 @@ cd polkadot
# Test Polkadot pr or master branch with this Substrate commit.
cargo update -p sp-io
time cargo test --all --release --verbose --features=real-overseer

cd parachain/test-parachains/adder/collator/
time cargo test --release --verbose --locked --features=real-overseer
62 changes: 62 additions & 0 deletions client/authority-discovery/src/interval.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.

// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use futures::stream::Stream;
use futures::future::FutureExt;
use futures::ready;
use futures_timer::Delay;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

/// Exponentially increasing interval
///
/// Doubles interval duration on each tick until the configured maximum is reached.
pub struct ExpIncInterval {
max: Duration,
next: Duration,
delay: Delay,
}

impl ExpIncInterval {
/// Create a new [`ExpIncInterval`].
pub fn new(start: Duration, max: Duration) -> Self {
let delay = Delay::new(start);
Self {
max,
next: start * 2,
delay,
}
}

/// Fast forward the exponentially increasing interval to the configured maximum.
pub fn set_to_max(&mut self) {
self.next = self.max;
self.delay = Delay::new(self.next);
}
}

impl Stream for ExpIncInterval {
type Item = ();

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
ready!(self.delay.poll_unpin(cx));
self.delay = Delay::new(self.next);
self.next = std::cmp::min(self.max, self.next * 2);

Poll::Ready(Some(()))
}
}
51 changes: 21 additions & 30 deletions client/authority-discovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,50 +38,41 @@ use sp_runtime::traits::Block as BlockT;
use sp_api::ProvideRuntimeApi;

mod error;
mod interval;
mod service;
mod worker;

#[cfg(test)]
mod tests;
mod worker;

/// Configuration of [`Worker`].
pub struct WorkerConfig {
/// The interval in which the node will publish its own address on the DHT.
/// The maximum interval in which the node will publish its own address on the DHT.
///
/// By default this is set to 12 hours.
pub publish_interval: Duration,
/// The interval in which the node will query the DHT for new entries.
/// By default this is set to 1 hour.
pub max_publish_interval: Duration,
/// The maximum interval in which the node will query the DHT for new entries.
///
/// By default this is set to 10 minutes.
pub query_interval: Duration,
/// The time the node will wait before triggering the first DHT query or publish.
///
/// By default this is set to 30 seconds.
///
/// This default is based on the rough boostrap time required by libp2p Kademlia.
pub query_start_delay: Duration,
/// The interval in which the worker will instruct the peerset to connect to a random subset
/// of discovered validators.
///
/// By default this is set to 10 minutes.
pub priority_group_set_interval: Duration,
/// The time the worker will wait after each query interval tick to pass a subset of
/// the cached authority addresses down to the peerset.
///
/// Be aware that the actual delay will be computed by [`Self::query_start_delay`] +
/// [`Self::priority_group_set_start_delay`]
///
/// By default this is set to 5 minutes.
pub priority_group_set_offset: Duration,
pub max_query_interval: Duration,
}

impl Default for WorkerConfig {
fn default() -> Self {
Self {
publish_interval: Duration::from_secs(12 * 60 * 60),
query_interval: Duration::from_secs(10 * 60),
query_start_delay: Duration::from_secs(30),
priority_group_set_interval: Duration::from_secs(10 * 60),
priority_group_set_offset: Duration::from_secs(5 * 60),
// Kademlia's default time-to-live for Dht records is 36h, republishing records every
// 24h through libp2p-kad. Given that a node could restart at any point in time, one can
// not depend on the republishing process, thus publishing own external addresses should
// happen on an interval < 36h.
max_publish_interval: Duration::from_secs(1 * 60 * 60),
// External addresses of remote authorities can change at any given point in time. The
// interval on which to trigger new queries for the current and next authorities is a trade
// off between efficiency and performance.
//
// Querying 700 [`AuthorityId`]s takes ~8m on the Kusama DHT (16th Nov 2020) when
// comparing `authority_discovery_authority_addresses_requested_total` and
// `authority_discovery_dht_event_received`.
max_query_interval: Duration::from_secs(10 * 60),
}
}
}
Expand Down
73 changes: 30 additions & 43 deletions client/authority-discovery/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use crate::{error::{Error, Result}, ServicetoWorkerMsg};
use crate::{error::{Error, Result}, interval::ExpIncInterval, ServicetoWorkerMsg};

use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;

use futures::channel::mpsc;
use futures::{FutureExt, Stream, StreamExt, stream::Fuse};
use futures_timer::Delay;

use addr_cache::AddrCache;
use async_trait::async_trait;
Expand Down Expand Up @@ -54,8 +53,6 @@ mod schema { include!(concat!(env!("OUT_DIR"), "/authority_discovery.rs")); }
#[cfg(test)]
pub mod tests;

type Interval = Box<dyn Stream<Item = ()> + Unpin + Send + Sync>;

const LOG_TARGET: &'static str = "sub-authority-discovery";

/// Name of the Substrate peerset priority group for authorities discovered through the authority
Expand Down Expand Up @@ -113,12 +110,12 @@ pub struct Worker<Client, Network, Block, DhtEventStream> {
dht_event_rx: DhtEventStream,

/// Interval to be proactive, publishing own addresses.
publish_interval: Interval,
publish_interval: ExpIncInterval,
/// Interval at which to request addresses of authorities, refilling the pending lookups queue.
query_interval: Interval,
query_interval: ExpIncInterval,
/// Interval on which to set the peerset priority group to a new random
/// set of addresses.
priority_group_set_interval: Interval,
priority_group_set_interval: ExpIncInterval,

/// Queue of throttled lookups pending to be passed to the network.
pending_lookups: Vec<AuthorityId>,
Expand Down Expand Up @@ -153,31 +150,26 @@ where
prometheus_registry: Option<prometheus_endpoint::Registry>,
config: crate::WorkerConfig,
) -> Self {
// Kademlia's default time-to-live for Dht records is 36h, republishing
// records every 24h through libp2p-kad.
// Given that a node could restart at any point in time, one can not depend on the
// republishing process, thus publishing own external addresses should happen on an interval
// < 36h.
let publish_interval = interval_at(
Instant::now() + config.query_start_delay,
config.publish_interval,
// When a node starts up publishing and querying might fail due to various reasons, for
// example due to being not yet fully bootstrapped on the DHT. Thus one should retry rather
// sooner than later. On the other hand, a long running node is likely well connected and
// thus timely retries are not needed. For this reasoning use an exponentially increasing
// interval for `publish_interval`, `query_interval` and `priority_group_set_interval`
// instead of a constant interval.
let publish_interval = ExpIncInterval::new(
Duration::from_secs(2),
config.max_publish_interval,
);

// External addresses of remote authorities can change at any given point in time. The
// interval on which to trigger new queries for the current authorities is a trade off
// between efficiency and performance.
let query_interval_start = Instant::now() + config.query_start_delay;
let query_interval_duration = config.query_interval;
let query_interval = interval_at(query_interval_start, query_interval_duration);

// Querying 500 [`AuthorityId`]s takes ~1m on the Kusama DHT (10th of August 2020) when
// comparing `authority_discovery_authority_addresses_requested_total` and
// `authority_discovery_dht_event_received`. With that in mind set the peerset priority
// group on the same interval as the [`query_interval`] above,
// just delayed by 5 minutes by default.
let priority_group_set_interval = interval_at(
query_interval_start + config.priority_group_set_offset,
config.priority_group_set_interval,
let query_interval = ExpIncInterval::new(
Duration::from_secs(2),
config.max_query_interval,
);
let priority_group_set_interval = ExpIncInterval::new(
Duration::from_secs(2),
// Trade-off between node connection churn and connectivity. Using half of
// [`crate::WorkerConfig::max_query_interval`] to update priority group once at the
// beginning and once in the middle of each query interval.
config.max_query_interval / 2,
);

let addr_cache = AddrCache::new();
Expand Down Expand Up @@ -413,7 +405,7 @@ where
}

if log_enabled!(log::Level::Debug) {
let hashes = v.iter().map(|(hash, _value)| hash.clone());
let hashes: Vec<_> = v.iter().map(|(hash, _value)| hash.clone()).collect();
debug!(
target: LOG_TARGET,
"Value for hash '{:?}' found on Dht.", hashes,
Expand Down Expand Up @@ -449,6 +441,11 @@ where
}
},
DhtEvent::ValuePut(hash) => {
// Fast forward the exponentially increasing interval to the configured maximum. In
// case this was the first successful address publishing there is no need for a
// timely retry.
self.publish_interval.set_to_max();

if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_put"]).inc();
}
Expand Down Expand Up @@ -661,16 +658,6 @@ fn hash_authority_id(id: &[u8]) -> libp2p::kad::record::Key {
libp2p::kad::record::Key::new(&libp2p::multihash::Sha2_256::digest(id))
}

fn interval_at(start: Instant, duration: Duration) -> Interval {
let stream = futures::stream::unfold(start, move |next| {
let time_until_next = next.saturating_duration_since(Instant::now());

Delay::new(time_until_next).map(move |_| Some(((), next + duration)))
});

Box::new(stream)
}

/// Prometheus metrics for a [`Worker`].
#[derive(Clone)]
pub(crate) struct Metrics {
Expand Down
60 changes: 0 additions & 60 deletions client/authority-discovery/src/worker/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,66 +37,6 @@ use substrate_test_runtime_client::runtime::Block;

use super::*;

#[test]
fn interval_at_with_start_now() {
let start = Instant::now();

let mut interval = interval_at(
std::time::Instant::now(),
std::time::Duration::from_secs(10),
);

futures::executor::block_on(async {
interval.next().await;
});

assert!(
Instant::now().saturating_duration_since(start) < Duration::from_secs(1),
"Expected low resolution instant interval to fire within less than a second.",
);
}

#[test]
fn interval_at_is_queuing_ticks() {
let start = Instant::now();

let interval = interval_at(start, std::time::Duration::from_millis(100));

// Let's wait for 200ms, thus 3 elements should be queued up (1st at 0ms, 2nd at 100ms, 3rd
// at 200ms).
std::thread::sleep(Duration::from_millis(200));

futures::executor::block_on(async {
interval.take(3).collect::<Vec<()>>().await;
});

// Make sure we did not wait for more than 300 ms, which would imply that `at_interval` is
// not queuing ticks.
assert!(
Instant::now().saturating_duration_since(start) < Duration::from_millis(300),
"Expect interval to /queue/ events when not polled for a while.",
);
}

#[test]
fn interval_at_with_initial_delay() {
let start = Instant::now();

let mut interval = interval_at(
std::time::Instant::now() + Duration::from_millis(100),
std::time::Duration::from_secs(10),
);

futures::executor::block_on(async {
interval.next().await;
});

assert!(
Instant::now().saturating_duration_since(start) > Duration::from_millis(100),
"Expected interval with initial delay not to fire right away.",
);
}

#[derive(Clone)]
pub(crate) struct TestApi {
pub(crate) authorities: Vec<AuthorityId>,
Expand Down
6 changes: 3 additions & 3 deletions client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
DiscoveryOut::ValueNotFound(e.into_key(), stats.duration().unwrap_or_else(Default::default))
}
Err(e) => {
warn!(target: "sub-libp2p",
debug!(target: "sub-libp2p",
"Libp2p => Failed to get record: {:?}", e);
DiscoveryOut::ValueNotFound(e.into_key(), stats.duration().unwrap_or_else(Default::default))
}
Expand All @@ -704,7 +704,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
let ev = match res {
Ok(ok) => DiscoveryOut::ValuePut(ok.key, stats.duration().unwrap_or_else(Default::default)),
Err(e) => {
warn!(target: "sub-libp2p",
debug!(target: "sub-libp2p",
"Libp2p => Failed to put record: {:?}", e);
DiscoveryOut::ValuePutFailed(e.into_key(), stats.duration().unwrap_or_else(Default::default))
}
Expand All @@ -716,7 +716,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
Ok(ok) => debug!(target: "sub-libp2p",
"Libp2p => Record republished: {:?}",
ok.key),
Err(e) => warn!(target: "sub-libp2p",
Err(e) => debug!(target: "sub-libp2p",
"Libp2p => Republishing of record {:?} failed with: {:?}",
e.key(), e)
}
Expand Down

0 comments on commit 0840c58

Please sign in to comment.