Skip to content

Commit

Permalink
Merge branch 'master' into no-run-title-as-command
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaseizinger authored Jan 16, 2023
2 parents 458aa53 + e3c7023 commit 341d096
Show file tree
Hide file tree
Showing 31 changed files with 1,229 additions and 1,367 deletions.
7 changes: 7 additions & 0 deletions protocols/dcutr/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@
[issue 2217]: https://github.com/libp2p/rust-libp2p/issues/2217
[PR 3214]: https://github.com/libp2p/rust-libp2p/pull/3214

# 0.8.1

- Skip unparsable multiaddr in `InboundUpgrade::upgrade_inbound` and
`OutboundUpgrade::upgrade_outbound`. See [PR 3300].

[PR 3300]: https://github.com/libp2p/rust-libp2p/pull/3300

# 0.8.0

- Update to `prost-codec` `v0.3.0`.
Expand Down
27 changes: 2 additions & 25 deletions protocols/dcutr/src/handler/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
//! [`ConnectionHandler`] handling direct connection upgraded through a relayed connection.

use libp2p_core::connection::ConnectionId;
use libp2p_core::upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade};
use libp2p_core::upgrade::DeniedUpgrade;
use libp2p_swarm::handler::ConnectionEvent;
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
NegotiatedSubstream, SubstreamProtocol,
SubstreamProtocol,
};
use std::task::{Context, Poll};
use void::Void;
Expand Down Expand Up @@ -62,31 +62,8 @@ impl ConnectionHandler for Handler {
SubstreamProtocol::new(DeniedUpgrade, ())
}

fn inject_fully_negotiated_inbound(
&mut self,
_: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::InboundOpenInfo,
) {
}

fn inject_fully_negotiated_outbound(
&mut self,
_: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::OutboundOpenInfo,
) {
}

fn on_behaviour_event(&mut self, _: Self::InEvent) {}

fn inject_dial_upgrade_error(
&mut self,
_: Self::OutboundOpenInfo,
_: ConnectionHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error,
>,
) {
}

fn connection_keep_alive(&self) -> KeepAlive {
KeepAlive::No
}
Expand Down
22 changes: 16 additions & 6 deletions protocols/dcutr/src/protocol/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,23 @@ impl upgrade::InboundUpgrade<NegotiatedSubstream> for Upgrade {
} else {
obs_addrs
.into_iter()
.map(Multiaddr::try_from)
.filter_map(|a| match Multiaddr::try_from(a) {
Ok(a) => Some(a),
Err(e) => {
log::debug!("Unable to parse multiaddr: {e}");
None
}
})
// Filter out relayed addresses.
.filter(|a| match a {
Ok(a) => !a.iter().any(|p| p == Protocol::P2pCircuit),
Err(_) => true,
.filter(|a| {
if a.iter().any(|p| p == Protocol::P2pCircuit) {
log::debug!("Dropping relayed address {a}");
false
} else {
true
}
})
.collect::<Result<Vec<Multiaddr>, _>>()
.map_err(|_| UpgradeError::InvalidAddrs)?
.collect::<Vec<Multiaddr>>()
};

let r#type = hole_punch::Type::from_i32(r#type).ok_or(UpgradeError::ParseTypeField)?;
Expand Down Expand Up @@ -124,6 +133,7 @@ pub enum UpgradeError {
StreamClosed,
#[error("Expected at least one address in reservation.")]
NoAddresses,
#[deprecated(since = "0.8.1", note = "Error is no longer constructed.")]
#[error("Invalid addresses.")]
InvalidAddrs,
#[error("Failed to parse response type field.")]
Expand Down
22 changes: 16 additions & 6 deletions protocols/dcutr/src/protocol/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,23 @@ impl upgrade::OutboundUpgrade<NegotiatedSubstream> for Upgrade {
} else {
obs_addrs
.into_iter()
.map(Multiaddr::try_from)
.filter_map(|a| match Multiaddr::try_from(a) {
Ok(a) => Some(a),
Err(e) => {
log::debug!("Unable to parse multiaddr: {e}");
None
}
})
// Filter out relayed addresses.
.filter(|a| match a {
Ok(a) => !a.iter().any(|p| p == Protocol::P2pCircuit),
Err(_) => true,
.filter(|a| {
if a.iter().any(|p| p == Protocol::P2pCircuit) {
log::debug!("Dropping relayed address {a}");
false
} else {
true
}
})
.collect::<Result<Vec<Multiaddr>, _>>()
.map_err(|_| UpgradeError::InvalidAddrs)?
.collect::<Vec<Multiaddr>>()
};

let msg = HolePunch {
Expand Down Expand Up @@ -128,6 +137,7 @@ pub enum UpgradeError {
NoAddresses,
#[error("Invalid expiration timestamp in reservation.")]
InvalidReservationExpiration,
#[deprecated(since = "0.8.1", note = "Error is no longer constructed.")]
#[error("Invalid addresses in reservation.")]
InvalidAddrs,
#[error("Failed to parse response type field.")]
Expand Down
7 changes: 3 additions & 4 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,7 @@ impl From<MessageAuthenticity> for PublishConfig {
///
/// The TopicSubscriptionFilter allows applications to implement specific filters on topics to
/// prevent unwanted messages being propagated and evaluated.
pub struct Gossipsub<
D: DataTransform = IdentityTransform,
F: TopicSubscriptionFilter = AllowAllSubscriptionFilter,
> {
pub struct Gossipsub<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
/// Configuration providing gossipsub performance parameters.
config: GossipsubConfig,

Expand Down Expand Up @@ -1419,6 +1416,8 @@ where
peer_score.add_penalty(peer_id, 1);

// check the flood cutoff
// See: https://github.com/rust-lang/rust-clippy/issues/10061
#[allow(unknown_lints, clippy::unchecked_duration_subtraction)]
let flood_cutoff = (backoff_time
+ self.config.graft_flood_threshold())
- self.config.prune_backoff();
Expand Down
2 changes: 1 addition & 1 deletion protocols/relay/src/priv_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl Behaviour {
}
}
hash_map::Entry::Vacant(_) => {
unreachable!("`inject_connection_closed` for unconnected peer.")
unreachable!("`on_connection_closed` for unconnected peer.")
}
};
}
Expand Down
2 changes: 1 addition & 1 deletion protocols/rendezvous/src/handler/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl SubstreamHandler for Stream {
Stream::PendingRead(Framed::new(substream, RendezvousCodec::default()))
}

fn inject_event(self, event: Self::InEvent) -> Self {
fn on_event(self, event: Self::InEvent) -> Self {
match (event, self) {
(InEvent::RegisterResponse { ttl }, Stream::PendingBehaviour(substream)) => {
Stream::PendingSend(substream, Message::RegisterResponse(Ok(ttl)))
Expand Down
2 changes: 1 addition & 1 deletion protocols/rendezvous/src/handler/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl SubstreamHandler for Stream {
}))
}

fn inject_event(self, event: Self::InEvent) -> Self {
fn on_event(self, event: Self::InEvent) -> Self {
void::unreachable(event)
}

Expand Down
71 changes: 37 additions & 34 deletions protocols/rendezvous/src/substream_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ use futures::future::{self, BoxFuture, Fuse, FusedFuture};
use futures::FutureExt;
use instant::Instant;
use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use libp2p_swarm::handler::{InboundUpgradeSend, OutboundUpgradeSend};
use libp2p_swarm::handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
NegotiatedSubstream, SubstreamProtocol,
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, SubstreamProtocol,
};
use std::collections::{HashMap, VecDeque};
use std::fmt;
Expand All @@ -52,7 +51,7 @@ pub trait SubstreamHandler: Sized {
fn upgrade(open_info: Self::OpenInfo)
-> SubstreamProtocol<PassthroughProtocol, Self::OpenInfo>;
fn new(substream: NegotiatedSubstream, info: Self::OpenInfo) -> Self;
fn inject_event(self, event: Self::InEvent) -> Self;
fn on_event(self, event: Self::InEvent) -> Self;
fn advance(self, cx: &mut Context<'_>) -> Result<Next<Self, Self::OutEvent>, Self::Error>;
}

Expand Down Expand Up @@ -367,35 +366,47 @@ where
TInboundSubstreamHandler::upgrade(())
}

fn inject_fully_negotiated_inbound(
fn on_connection_event(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output,
_: Self::InboundOpenInfo,
) {
self.inbound_substreams.insert(
self.next_inbound_substream_id.fetch_and_increment(),
TInboundSubstreamHandler::new(protocol, ()),
);
}

fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
info: Self::OutboundOpenInfo,
event: ConnectionEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
self.outbound_substreams.insert(
self.next_outbound_substream_id.fetch_and_increment(),
TOutboundSubstreamHandler::new(protocol, info),
);
match event {
ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
protocol, ..
}) => {
self.inbound_substreams.insert(
self.next_inbound_substream_id.fetch_and_increment(),
TInboundSubstreamHandler::new(protocol, ()),
);
}
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
protocol,
info,
}) => {
self.outbound_substreams.insert(
self.next_outbound_substream_id.fetch_and_increment(),
TOutboundSubstreamHandler::new(protocol, info),
);
}
// TODO: Handle upgrade errors properly
ConnectionEvent::AddressChange(_)
| ConnectionEvent::ListenUpgradeError(_)
| ConnectionEvent::DialUpgradeError(_) => {}
}
}

fn inject_event(&mut self, event: Self::InEvent) {
fn on_behaviour_event(&mut self, event: Self::InEvent) {
match event {
InEvent::NewSubstream { open_info } => self.new_substreams.push_back(open_info),
InEvent::NotifyInboundSubstream { id, message } => {
match self.inbound_substreams.remove(&id) {
Some(handler) => {
let new_handler = handler.inject_event(message);
let new_handler = handler.on_event(message);

self.inbound_substreams.insert(id, new_handler);
}
Expand All @@ -407,7 +418,7 @@ where
InEvent::NotifyOutboundSubstream { id, message } => {
match self.outbound_substreams.remove(&id) {
Some(handler) => {
let new_handler = handler.inject_event(message);
let new_handler = handler.on_event(message);

self.outbound_substreams.insert(id, new_handler);
}
Expand All @@ -419,14 +430,6 @@ where
}
}

fn inject_dial_upgrade_error(
&mut self,
_: Self::OutboundOpenInfo,
_: ConnectionHandlerUpgrErr<Void>,
) {
// TODO: Handle upgrade errors properly
}

fn connection_keep_alive(&self) -> KeepAlive {
// Rudimentary keep-alive handling, to be extended as needed as this abstraction is used more by other protocols.

Expand Down Expand Up @@ -537,7 +540,7 @@ impl SubstreamHandler for void::Void {
unreachable!("we should never yield a substream")
}

fn inject_event(self, event: Self::InEvent) -> Self {
fn on_event(self, event: Self::InEvent) -> Self {
void::unreachable(event)
}

Expand Down
2 changes: 1 addition & 1 deletion protocols/request-response/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ where
Err(oneshot::Canceled) => {
// The inbound upgrade has errored or timed out reading
// or waiting for the request. The handler is informed
// via `inject_listen_upgrade_error`.
// via `on_connection_event` call with `ConnectionEvent::ListenUpgradeError`.
}
}
}
Expand Down
10 changes: 7 additions & 3 deletions swarm-derive/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
# 0.31.0
# 0.32.0 [unreleased]

- Replace `NetworkBehaviour` Derive macro deprecated `inject_*` method implementations
with the new `on_swarm_event` and `on_connection_handler_event`.
See [PR 3011].
See [PR 3011] and [PR 3264].

[PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011
[PR 3264]: https://github.com/libp2p/rust-libp2p/pull/3264

# 0.31.0

- Add `prelude` configuration option.
The derive-macro generates code that needs to refer to various symbols. See [PR 3055].

- Update `rust-version` to reflect the actual MSRV: 1.60.0. See [PR 3090].

[PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011
[PR 3055]: https://github.com/libp2p/rust-libp2p/pull/3055
[PR 3090]: https://github.com/libp2p/rust-libp2p/pull/3090

Expand Down
2 changes: 1 addition & 1 deletion swarm-derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-swarm-derive"
edition = "2021"
rust-version = "1.60.0"
description = "Procedural macros of libp2p-swarm"
version = "0.31.0"
version = "0.32.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
Loading

0 comments on commit 341d096

Please sign in to comment.