Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

client/authority-discovery: Publish and query on exponential interval #7545

Merged
5 commits merged into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .maintain/gitlab/check_polkadot_companion_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,6 @@ cd polkadot
# Test Polkadot pr or master branch with this Substrate commit.
cargo update -p sp-io
time cargo test --all --release --verbose --features=real-overseer

cd parachain/test-parachains/adder/collator/
time cargo test --release --verbose --locked --features=real-overseer
62 changes: 62 additions & 0 deletions client/authority-discovery/src/interval.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.

// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use futures::stream::Stream;
mxinden marked this conversation as resolved.
Show resolved Hide resolved
use futures::future::FutureExt;
use futures::ready;
use futures_timer::Delay;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

/// Exponentially increasing interval
///
/// Doubles interval duration on each tick until the configured maximum is reached.
pub struct ExpIncInterval {
max: Duration,
next: Duration,
delay: Delay,
}

impl ExpIncInterval {
/// Create a new [`ExpIncInterval`].
pub fn new(start: Duration, max: Duration) -> Self {
let delay = Delay::new(start);
Self {
max,
next: start * 2,
delay,
}
}

/// Fast forward the exponentially increasing interval to the configured maximum.
pub fn set_to_max(&mut self) {
self.next = self.max;
self.delay = Delay::new(self.next);
}
}

impl Stream for ExpIncInterval {
type Item = ();

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
ready!(self.delay.poll_unpin(cx));
self.delay = Delay::new(self.next);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.delay = Delay::new(self.next);
self.delay = Delay::new(self.next);
// Poll once to register the future in the current task.
let _ = self.delay.poll_unpin(cx);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed this need to be done, otherwise the thing doesn't work at all.
However I'd suggest to put a loop around the entire body of poll_next instead of doing a dummy poll.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch @bkchr, thanks!

@tomaka can you expand on your idea of a loop around the entire body?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, no, you actually don't need this. (this code had passed my review the other day)

Since you return Poll::Ready, the caller has to call poll_next again, and this dummy registration isn't necessary.

To phrase it differently: the documentation of poll_next mentions that you pass a Waker that is woken up once the task is ready. In this case, you're returning Ready, so there's no need to register the Waker.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh yeah, I mixed Stream with Future. Sorry!

self.next = std::cmp::min(self.max, self.next * 2);

Poll::Ready(Some(()))
}
}
51 changes: 21 additions & 30 deletions client/authority-discovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,50 +38,41 @@ use sp_runtime::traits::Block as BlockT;
use sp_api::ProvideRuntimeApi;

mod error;
mod interval;
mod service;
mod worker;

#[cfg(test)]
mod tests;
mod worker;

/// Configuration of [`Worker`].
pub struct WorkerConfig {
/// The interval in which the node will publish its own address on the DHT.
/// The maximum interval in which the node will publish its own address on the DHT.
///
/// By default this is set to 12 hours.
pub publish_interval: Duration,
/// The interval in which the node will query the DHT for new entries.
/// By default this is set to 1 hour.
pub max_publish_interval: Duration,
/// The maximum interval in which the node will query the DHT for new entries.
///
/// By default this is set to 10 minutes.
pub query_interval: Duration,
/// The time the node will wait before triggering the first DHT query or publish.
///
/// By default this is set to 30 seconds.
///
/// This default is based on the rough boostrap time required by libp2p Kademlia.
pub query_start_delay: Duration,
/// The interval in which the worker will instruct the peerset to connect to a random subset
/// of discovered validators.
///
/// By default this is set to 10 minutes.
pub priority_group_set_interval: Duration,
/// The time the worker will wait after each query interval tick to pass a subset of
/// the cached authority addresses down to the peerset.
///
/// Be aware that the actual delay will be computed by [`Self::query_start_delay`] +
/// [`Self::priority_group_set_start_delay`]
///
/// By default this is set to 5 minutes.
pub priority_group_set_offset: Duration,
pub max_query_interval: Duration,
}

impl Default for WorkerConfig {
fn default() -> Self {
Self {
publish_interval: Duration::from_secs(12 * 60 * 60),
query_interval: Duration::from_secs(10 * 60),
query_start_delay: Duration::from_secs(30),
priority_group_set_interval: Duration::from_secs(10 * 60),
priority_group_set_offset: Duration::from_secs(5 * 60),
// Kademlia's default time-to-live for Dht records is 36h, republishing records every
// 24h through libp2p-kad. Given that a node could restart at any point in time, one can
// not depend on the republishing process, thus publishing own external addresses should
// happen on an interval < 36h.
max_publish_interval: Duration::from_secs(1 * 60 * 60),
// External addresses of remote authorities can change at any given point in time. The
// interval on which to trigger new queries for the current and next authorities is a trade
// off between efficiency and performance.
//
// Querying 700 [`AuthorityId`]s takes ~8m on the Kusama DHT (16th Nov 2020) when
// comparing `authority_discovery_authority_addresses_requested_total` and
// `authority_discovery_dht_event_received`.
max_query_interval: Duration::from_secs(10 * 60),
}
}
}
Expand Down
73 changes: 30 additions & 43 deletions client/authority-discovery/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use crate::{error::{Error, Result}, ServicetoWorkerMsg};
use crate::{error::{Error, Result}, interval::ExpIncInterval, ServicetoWorkerMsg};

use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;

use futures::channel::mpsc;
use futures::{FutureExt, Stream, StreamExt, stream::Fuse};
use futures_timer::Delay;

use addr_cache::AddrCache;
use async_trait::async_trait;
Expand Down Expand Up @@ -54,8 +53,6 @@ mod schema { include!(concat!(env!("OUT_DIR"), "/authority_discovery.rs")); }
#[cfg(test)]
pub mod tests;

type Interval = Box<dyn Stream<Item = ()> + Unpin + Send + Sync>;

const LOG_TARGET: &'static str = "sub-authority-discovery";

/// Name of the Substrate peerset priority group for authorities discovered through the authority
Expand Down Expand Up @@ -113,12 +110,12 @@ pub struct Worker<Client, Network, Block, DhtEventStream> {
dht_event_rx: DhtEventStream,

/// Interval to be proactive, publishing own addresses.
publish_interval: Interval,
publish_interval: ExpIncInterval,
/// Interval at which to request addresses of authorities, refilling the pending lookups queue.
query_interval: Interval,
query_interval: ExpIncInterval,
/// Interval on which to set the peerset priority group to a new random
/// set of addresses.
priority_group_set_interval: Interval,
priority_group_set_interval: ExpIncInterval,

/// Queue of throttled lookups pending to be passed to the network.
pending_lookups: Vec<AuthorityId>,
Expand Down Expand Up @@ -153,31 +150,26 @@ where
prometheus_registry: Option<prometheus_endpoint::Registry>,
config: crate::WorkerConfig,
) -> Self {
// Kademlia's default time-to-live for Dht records is 36h, republishing
// records every 24h through libp2p-kad.
// Given that a node could restart at any point in time, one can not depend on the
// republishing process, thus publishing own external addresses should happen on an interval
// < 36h.
let publish_interval = interval_at(
Instant::now() + config.query_start_delay,
config.publish_interval,
// When a node starts up publishing and querying might fail due to various reasons, for
// example due to being not yet fully bootstrapped on the DHT. Thus one should retry rather
// sooner than later. On the other hand, a long running node is likely well connected and
// thus timely retries are not needed. For this reasoning use an exponentially increasing
// interval for `publish_interval`, `query_interval` and `priority_group_set_interval`
// instead of a constant interval.
let publish_interval = ExpIncInterval::new(
Duration::from_secs(2),
config.max_publish_interval,
);

// External addresses of remote authorities can change at any given point in time. The
// interval on which to trigger new queries for the current authorities is a trade off
// between efficiency and performance.
let query_interval_start = Instant::now() + config.query_start_delay;
let query_interval_duration = config.query_interval;
let query_interval = interval_at(query_interval_start, query_interval_duration);

// Querying 500 [`AuthorityId`]s takes ~1m on the Kusama DHT (10th of August 2020) when
// comparing `authority_discovery_authority_addresses_requested_total` and
// `authority_discovery_dht_event_received`. With that in mind set the peerset priority
// group on the same interval as the [`query_interval`] above,
// just delayed by 5 minutes by default.
let priority_group_set_interval = interval_at(
query_interval_start + config.priority_group_set_offset,
config.priority_group_set_interval,
let query_interval = ExpIncInterval::new(
Duration::from_secs(2),
config.max_query_interval,
);
let priority_group_set_interval = ExpIncInterval::new(
Duration::from_secs(2),
// Trade-off between node connection churn and connectivity. Using half of
// [`crate::WorkerConfig::max_query_interval`] to update priority group once at the
// beginning and once in the middle of each query interval.
config.max_query_interval / 2,
);

let addr_cache = AddrCache::new();
Expand Down Expand Up @@ -413,7 +405,7 @@ where
}

if log_enabled!(log::Level::Debug) {
let hashes = v.iter().map(|(hash, _value)| hash.clone());
let hashes: Vec<_> = v.iter().map(|(hash, _value)| hash.clone()).collect();
debug!(
target: LOG_TARGET,
"Value for hash '{:?}' found on Dht.", hashes,
Expand Down Expand Up @@ -449,6 +441,11 @@ where
}
},
DhtEvent::ValuePut(hash) => {
// Fast forward the exponentially increasing interval to the configured maximum. In
// case this was the first successful address publishing there is no need for a
// timely retry.
self.publish_interval.set_to_max();

if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_put"]).inc();
}
Expand Down Expand Up @@ -661,16 +658,6 @@ fn hash_authority_id(id: &[u8]) -> libp2p::kad::record::Key {
libp2p::kad::record::Key::new(&libp2p::multihash::Sha2_256::digest(id))
}

fn interval_at(start: Instant, duration: Duration) -> Interval {
let stream = futures::stream::unfold(start, move |next| {
let time_until_next = next.saturating_duration_since(Instant::now());

Delay::new(time_until_next).map(move |_| Some(((), next + duration)))
});

Box::new(stream)
}

/// Prometheus metrics for a [`Worker`].
#[derive(Clone)]
pub(crate) struct Metrics {
Expand Down
60 changes: 0 additions & 60 deletions client/authority-discovery/src/worker/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,66 +37,6 @@ use substrate_test_runtime_client::runtime::Block;

use super::*;

#[test]
fn interval_at_with_start_now() {
let start = Instant::now();

let mut interval = interval_at(
std::time::Instant::now(),
std::time::Duration::from_secs(10),
);

futures::executor::block_on(async {
interval.next().await;
});

assert!(
Instant::now().saturating_duration_since(start) < Duration::from_secs(1),
"Expected low resolution instant interval to fire within less than a second.",
);
}

#[test]
fn interval_at_is_queuing_ticks() {
let start = Instant::now();

let interval = interval_at(start, std::time::Duration::from_millis(100));

// Let's wait for 200ms, thus 3 elements should be queued up (1st at 0ms, 2nd at 100ms, 3rd
// at 200ms).
std::thread::sleep(Duration::from_millis(200));

futures::executor::block_on(async {
interval.take(3).collect::<Vec<()>>().await;
});

// Make sure we did not wait for more than 300 ms, which would imply that `at_interval` is
// not queuing ticks.
assert!(
Instant::now().saturating_duration_since(start) < Duration::from_millis(300),
"Expect interval to /queue/ events when not polled for a while.",
);
}

#[test]
fn interval_at_with_initial_delay() {
let start = Instant::now();

let mut interval = interval_at(
std::time::Instant::now() + Duration::from_millis(100),
std::time::Duration::from_secs(10),
);

futures::executor::block_on(async {
interval.next().await;
});

assert!(
Instant::now().saturating_duration_since(start) > Duration::from_millis(100),
"Expected interval with initial delay not to fire right away.",
);
}

#[derive(Clone)]
pub(crate) struct TestApi {
pub(crate) authorities: Vec<AuthorityId>,
Expand Down
6 changes: 3 additions & 3 deletions client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
DiscoveryOut::ValueNotFound(e.into_key(), stats.duration().unwrap_or_else(Default::default))
}
Err(e) => {
warn!(target: "sub-libp2p",
debug!(target: "sub-libp2p",
"Libp2p => Failed to get record: {:?}", e);
DiscoveryOut::ValueNotFound(e.into_key(), stats.duration().unwrap_or_else(Default::default))
}
Expand All @@ -704,7 +704,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
let ev = match res {
Ok(ok) => DiscoveryOut::ValuePut(ok.key, stats.duration().unwrap_or_else(Default::default)),
Err(e) => {
warn!(target: "sub-libp2p",
debug!(target: "sub-libp2p",
"Libp2p => Failed to put record: {:?}", e);
DiscoveryOut::ValuePutFailed(e.into_key(), stats.duration().unwrap_or_else(Default::default))
}
Expand All @@ -716,7 +716,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
Ok(ok) => debug!(target: "sub-libp2p",
"Libp2p => Record republished: {:?}",
ok.key),
Err(e) => warn!(target: "sub-libp2p",
Err(e) => debug!(target: "sub-libp2p",
"Libp2p => Republishing of record {:?} failed with: {:?}",
e.key(), e)
}
Expand Down