Skip to content

Commit

Permalink
protocols/plaintext: Do not UVI encode goodput (#1765)
Browse files Browse the repository at this point in the history
* protocols/plaintext: Do not UVI encode goodput

Only prefix handshake messages with the message length in bytes as an
unsigned varint. Return a plain socket once handshaking succeeded.

* *: Bump minor version in changelogs and cargo tomls
  • Loading branch information
mxinden authored Sep 17, 2020
1 parent 3c72b07 commit 87f3cb8
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 75 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
- [`parity-multiaddr` CHANGELOG](misc/multiaddr/CHANGELOG.md)
- [`libp2p-core-derive` CHANGELOG](misc/core-derive/CHANGELOG.md)

# Version 0.28.2 [unreleased]
# Version 0.29.0 [unreleased]

- Update `libp2p-core`, `libp2p-gossipsub`, `libp2p-mplex`, `libp2p-noise`, `libp2p-websocket` and `parity-multiaddr`.
- Update `libp2p-core`, `libp2p-gossipsub`, `libp2p-mplex`, `libp2p-noise`,
`libp2p-plaintext`, `libp2p-websocket` and `parity-multiaddr`.

# Version 0.28.1 [2020-09-10]

Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "libp2p"
edition = "2018"
description = "Peer-to-peer networking library"
version = "0.28.2"
version = "0.29.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down Expand Up @@ -71,7 +71,7 @@ libp2p-kad = { version = "0.23.1", path = "protocols/kad", optional = true }
libp2p-mplex = { version = "0.22.1", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.24.1", path = "protocols/noise", optional = true }
libp2p-ping = { version = "0.22.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.22.1", path = "protocols/plaintext", optional = true }
libp2p-plaintext = { version = "0.23.0", path = "protocols/plaintext", optional = true }
libp2p-pnet = { version = "0.19.1", path = "protocols/pnet", optional = true }
libp2p-request-response = { version = "0.3.0", path = "protocols/request-response", optional = true }
libp2p-swarm = { version = "0.22.0", path = "swarm" }
Expand Down
6 changes: 5 additions & 1 deletion protocols/plaintext/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
# 0.22.1 [unreleased]
# 0.23.0 [unreleased]

- Improve error logging
[PR 1759](https://github.com/libp2p/rust-libp2p/pull/1759).

- Update dependencies.

- Only prefix handshake messages with the message length in bytes as an unsigned
varint. Return a plain socket once handshaking succeeded. See [issue
1760](https://github.com/libp2p/rust-libp2p/issues/1760) for details.

# 0.22.0 [2020-09-09]

- Bump `libp2p-core` dependency.
Expand Down
3 changes: 1 addition & 2 deletions protocols/plaintext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "libp2p-plaintext"
edition = "2018"
description = "Plaintext encryption dummy protocol for libp2p"
version = "0.22.1"
version = "0.23.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand All @@ -16,7 +16,6 @@ futures_codec = "0.4.0"
libp2p-core = { version = "0.22.0", path = "../../core" }
log = "0.4.8"
prost = "0.6.1"
rw-stream-sink = "0.2.0"
unsigned-varint = { version = "0.5.1", features = ["futures-codec"] }
void = "1.0.2"

Expand Down
10 changes: 5 additions & 5 deletions protocols/plaintext/src/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,21 +111,21 @@ impl HandshakeContext<Local> {
}

pub async fn handshake<S>(socket: S, config: PlainText2Config)
-> Result<(Framed<S, UviBytes<BytesMut>>, Remote), PlainTextError>
-> Result<(S, Remote), PlainTextError>
where
S: AsyncRead + AsyncWrite + Send + Unpin,
{
// The handshake messages all start with a variable-length integer indicating the size.
let mut socket = Framed::new(socket, UviBytes::default());
let mut framed_socket = Framed::new(socket, UviBytes::default());

trace!("starting handshake");
let context = HandshakeContext::new(config)?;

trace!("sending exchange to remote");
socket.send(BytesMut::from(&context.state.exchange_bytes[..])).await?;
framed_socket.send(BytesMut::from(&context.state.exchange_bytes[..])).await?;

trace!("receiving the remote's exchange");
let context = match socket.next().await {
let context = match framed_socket.next().await {
Some(p) => context.with_remote(p?)?,
None => {
debug!("unexpected eof while waiting for remote's exchange");
Expand All @@ -135,5 +135,5 @@ where
};

trace!("received exchange from remote; pubkey = {:?}", context.state.public_key);
Ok((socket, context.state))
Ok((framed_socket.into_inner(), context.state))
}
71 changes: 8 additions & 63 deletions protocols/plaintext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@
// DEALINGS IN THE SOFTWARE.

use crate::error::PlainTextError;
use crate::handshake::Remote;

use bytes::BytesMut;
use futures::future::{self, Ready};
use futures::prelude::*;
use futures::{future::BoxFuture, Sink, Stream};
use futures_codec::Framed;
use futures::future::BoxFuture;
use libp2p_core::{
identity,
InboundUpgrade,
Expand All @@ -35,9 +32,7 @@ use libp2p_core::{
PublicKey,
};
use log::debug;
use rw_stream_sink::RwStreamSink;
use std::{io, iter, pin::Pin, task::{Context, Poll}};
use unsigned_varint::codec::UviBytes;
use void::Void;

mod error;
Expand Down Expand Up @@ -153,76 +148,26 @@ impl PlainText2Config {
T: AsyncRead + AsyncWrite + Send + Unpin + 'static
{
debug!("Starting plaintext handshake.");
let (stream_sink, remote) = PlainTextMiddleware::handshake(socket, self).await?;
let (socket, remote) = handshake::handshake(socket, self).await?;
debug!("Finished plaintext handshake.");

Ok((
remote.peer_id,
PlainTextOutput {
stream: RwStreamSink::new(stream_sink),
socket,
remote_key: remote.public_key,
}
))
}
}

pub struct PlainTextMiddleware<S> {
inner: Framed<S, UviBytes<BytesMut>>,
}

impl<S> PlainTextMiddleware<S>
where
S: AsyncRead + AsyncWrite + Send + Unpin,
{
async fn handshake(socket: S, config: PlainText2Config)
-> Result<(PlainTextMiddleware<S>, Remote), PlainTextError>
{
let (inner, remote) = handshake::handshake(socket, config).await?;
Ok((PlainTextMiddleware { inner }, remote))
}
}

impl<S> Sink<BytesMut> for PlainTextMiddleware<S>
where
S: AsyncRead + AsyncWrite + Unpin,
{
type Error = io::Error;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Sink::poll_ready(Pin::new(&mut self.inner), cx)
}

fn start_send(mut self: Pin<&mut Self>, item: BytesMut) -> Result<(), Self::Error> {
Sink::start_send(Pin::new(&mut self.inner), item)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Sink::poll_flush(Pin::new(&mut self.inner), cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Sink::poll_close(Pin::new(&mut self.inner), cx)
}
}

impl<S> Stream for PlainTextMiddleware<S>
where
S: AsyncRead + AsyncWrite + Unpin,
{
type Item = Result<BytesMut, io::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Stream::poll_next(Pin::new(&mut self.inner), cx)
}
}

/// Output of the plaintext protocol.
pub struct PlainTextOutput<S>
where
S: AsyncRead + AsyncWrite + Unpin,
{
/// The plaintext stream.
pub stream: RwStreamSink<PlainTextMiddleware<S>>,
pub socket: S,
/// The public key of the remote.
pub remote_key: PublicKey,
}
Expand All @@ -231,26 +176,26 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for PlainTextOutput<S> {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
-> Poll<Result<usize, io::Error>>
{
AsyncRead::poll_read(Pin::new(&mut self.stream), cx, buf)
AsyncRead::poll_read(Pin::new(&mut self.socket), cx, buf)
}
}

impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for PlainTextOutput<S> {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8])
-> Poll<Result<usize, io::Error>>
{
AsyncWrite::poll_write(Pin::new(&mut self.stream), cx, buf)
AsyncWrite::poll_write(Pin::new(&mut self.socket), cx, buf)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Result<(), io::Error>>
{
AsyncWrite::poll_flush(Pin::new(&mut self.stream), cx)
AsyncWrite::poll_flush(Pin::new(&mut self.socket), cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Result<(), io::Error>>
{
AsyncWrite::poll_close(Pin::new(&mut self.stream), cx)
AsyncWrite::poll_close(Pin::new(&mut self.socket), cx)
}
}

0 comments on commit 87f3cb8

Please sign in to comment.