Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core)!: remove EitherOutput #3341

Merged
merged 29 commits into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
3a84c14
Use single quotes to avoid running commands
thomaseizinger Jan 11, 2023
458aa53
Use intermediary variable
thomaseizinger Jan 13, 2023
341d096
Merge branch 'master' into no-run-title-as-command
thomaseizinger Jan 16, 2023
aed5363
refactor(relay): introduce `Handler::new` functions (#3326)
thomaseizinger Jan 16, 2023
5e97d0e
fix: revert "refactor(relay): introduce `Handler::new` functions" (#3…
thomaseizinger Jan 17, 2023
da97e8f
Replace `EitherError` with `Either`
thomaseizinger Jan 17, 2023
2ce805d
Add changelog entry
thomaseizinger Jan 17, 2023
0a2729d
Remove `EitherTransport` in favor of `Either`
thomaseizinger Jan 17, 2023
3397266
Update core/CHANGELOG.md
thomaseizinger Jan 17, 2023
5802177
Merge branch '2650-remove-either-error' into 2650-remove-either-trans…
thomaseizinger Jan 17, 2023
7060f4b
Add changelog entry
thomaseizinger Jan 17, 2023
82eb6c2
Merge branch 'no-run-title-as-command' into 2650-remove-either-error
thomaseizinger Jan 17, 2023
955ff80
Merge branch 'master' into no-run-title-as-command
thomaseizinger Jan 17, 2023
bb84906
Merge branch 'no-run-title-as-command' into 2650-remove-either-error
thomaseizinger Jan 17, 2023
fdfaf72
Merge branch '2650-remove-either-error' into 2650-remove-either-trans…
thomaseizinger Jan 17, 2023
59f6465
Update changelog entry
thomaseizinger Jan 17, 2023
8872039
Remove `EitherUpgrade`
thomaseizinger Jan 17, 2023
d3e79cf
Update core/CHANGELOG.md
thomaseizinger Jan 17, 2023
1526051
Remove `EitherFuture2`
thomaseizinger Jan 17, 2023
1b25ec1
add changelog entry
thomaseizinger Jan 17, 2023
4164aa8
Merge branch '2650-remove-either-upgrade' into 2650-remove-either-fut…
thomaseizinger Jan 17, 2023
0c37e77
WIP: Remove `EitherOutput`
thomaseizinger Jan 17, 2023
37215eb
Merge branch 'master' into 2650-remove-either-output
thomaseizinger Jan 18, 2023
6301ff3
Make it compile
thomaseizinger Jan 18, 2023
04e0682
Fix rustdoc link
thomaseizinger Jan 18, 2023
aa377be
Add changelog entry
thomaseizinger Jan 18, 2023
ff64c4d
Merge branch 'master' into 2650-remove-either-output
thomaseizinger Jan 19, 2023
79b80dd
Merge branch 'master' into 2650-remove-either-output
mergify[bot] Jan 23, 2023
7218b93
Merge branch 'master' into 2650-remove-either-output
mergify[bot] Jan 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 46 additions & 161 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,167 +25,30 @@ use crate::{
Multiaddr, ProtocolName,
};
use either::Either;
use futures::{
io::{IoSlice, IoSliceMut},
prelude::*,
};
use futures::prelude::*;
use pin_project::pin_project;
use std::{io, pin::Pin, task::Context, task::Poll};

/// Implements `AsyncRead` and `AsyncWrite` and dispatches all method calls to
/// either `First` or `Second`.
#[pin_project(project = EitherOutputProj)]
#[derive(Debug, Copy, Clone)]
pub enum EitherOutput<A, B> {
First(#[pin] A),
Second(#[pin] B),
}

impl<A, B> AsyncRead for EitherOutput<A, B>
where
A: AsyncRead,
B: AsyncRead,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
match self.project() {
EitherOutputProj::First(a) => AsyncRead::poll_read(a, cx, buf),
EitherOutputProj::Second(b) => AsyncRead::poll_read(b, cx, buf),
}
}
use std::{pin::Pin, task::Context, task::Poll};

fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
match self.project() {
EitherOutputProj::First(a) => AsyncRead::poll_read_vectored(a, cx, bufs),
EitherOutputProj::Second(b) => AsyncRead::poll_read_vectored(b, cx, bufs),
}
}
}

impl<A, B> AsyncWrite for EitherOutput<A, B>
where
A: AsyncWrite,
B: AsyncWrite,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
match self.project() {
EitherOutputProj::First(a) => AsyncWrite::poll_write(a, cx, buf),
EitherOutputProj::Second(b) => AsyncWrite::poll_write(b, cx, buf),
}
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
match self.project() {
EitherOutputProj::First(a) => AsyncWrite::poll_write_vectored(a, cx, bufs),
EitherOutputProj::Second(b) => AsyncWrite::poll_write_vectored(b, cx, bufs),
}
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match self.project() {
EitherOutputProj::First(a) => AsyncWrite::poll_flush(a, cx),
EitherOutputProj::Second(b) => AsyncWrite::poll_flush(b, cx),
}
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match self.project() {
EitherOutputProj::First(a) => AsyncWrite::poll_close(a, cx),
EitherOutputProj::Second(b) => AsyncWrite::poll_close(b, cx),
}
}
}

impl<A, B, I> Stream for EitherOutput<A, B>
where
A: TryStream<Ok = I>,
B: TryStream<Ok = I>,
{
type Item = Result<I, Either<A::Error, B::Error>>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.project() {
EitherOutputProj::First(a) => {
TryStream::try_poll_next(a, cx).map(|v| v.map(|r| r.map_err(Either::Left)))
}
EitherOutputProj::Second(b) => {
TryStream::try_poll_next(b, cx).map(|v| v.map(|r| r.map_err(Either::Right)))
}
}
}
}

impl<A, B, I> Sink<I> for EitherOutput<A, B>
where
A: Sink<I>,
B: Sink<I>,
{
type Error = Either<A::Error, B::Error>;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
EitherOutputProj::First(a) => Sink::poll_ready(a, cx).map_err(Either::Left),
EitherOutputProj::Second(b) => Sink::poll_ready(b, cx).map_err(Either::Right),
}
}

fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
match self.project() {
EitherOutputProj::First(a) => Sink::start_send(a, item).map_err(Either::Left),
EitherOutputProj::Second(b) => Sink::start_send(b, item).map_err(Either::Right),
}
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
EitherOutputProj::First(a) => Sink::poll_flush(a, cx).map_err(Either::Left),
EitherOutputProj::Second(b) => Sink::poll_flush(b, cx).map_err(Either::Right),
}
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
EitherOutputProj::First(a) => Sink::poll_close(a, cx).map_err(Either::Left),
EitherOutputProj::Second(b) => Sink::poll_close(b, cx).map_err(Either::Right),
}
}
}

impl<A, B> StreamMuxer for EitherOutput<A, B>
impl<A, B> StreamMuxer for future::Either<A, B>
where
A: StreamMuxer,
B: StreamMuxer,
{
type Substream = EitherOutput<A::Substream, B::Substream>;
type Substream = future::Either<A::Substream, B::Substream>;
type Error = Either<A::Error, B::Error>;

fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner
match as_pin_mut(self) {
future::Either::Left(inner) => inner
.poll_inbound(cx)
.map_ok(EitherOutput::First)
.map_ok(future::Either::Left)
.map_err(Either::Left),
EitherOutputProj::Second(inner) => inner
future::Either::Right(inner) => inner
.poll_inbound(cx)
.map_ok(EitherOutput::Second)
.map_ok(future::Either::Right)
.map_err(Either::Right),
}
}
Expand All @@ -194,32 +57,54 @@ where
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner
match as_pin_mut(self) {
future::Either::Left(inner) => inner
.poll_outbound(cx)
.map_ok(EitherOutput::First)
.map_ok(future::Either::Left)
.map_err(Either::Left),
EitherOutputProj::Second(inner) => inner
future::Either::Right(inner) => inner
.poll_outbound(cx)
.map_ok(EitherOutput::Second)
.map_ok(future::Either::Right)
.map_err(Either::Right),
}
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner.poll_close(cx).map_err(Either::Left),
EitherOutputProj::Second(inner) => inner.poll_close(cx).map_err(Either::Right),
match as_pin_mut(self) {
future::Either::Left(inner) => inner.poll_close(cx).map_err(Either::Left),
future::Either::Right(inner) => inner.poll_close(cx).map_err(Either::Right),
}
}

fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner.poll(cx).map_err(Either::Left),
EitherOutputProj::Second(inner) => inner.poll(cx).map_err(Either::Right),
match as_pin_mut(self) {
future::Either::Left(inner) => inner.poll(cx).map_err(Either::Left),
future::Either::Right(inner) => inner.poll(cx).map_err(Either::Right),
}
}
}

/// Convert `Pin<&mut Either<A, B>>` to `Either<Pin<&mut A>, Pin<&mut B>>`,
/// pinned projections of the inner variants.
///
/// Local function until https://github.com/rust-lang/futures-rs/pull/2691 is merged.
fn as_pin_mut<A, B>(
Copy link
Member

@jxs jxs Jan 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we ask for project to be public? As it seems the same purpose and would allow us to not have unsafe code here.

update:
sorry only now saw rayon-rs/either#76, can't we use as_pin_mut from either::Either?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we ask for project to be public? As it seems the same purpose and would allow us to not have unsafe code here.

See rust-lang/futures-rs#2629 (comment) :)

This is the future::Either type, we can't use either::Either here because it doesn't implement AsyncRead etc

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I saw after, thanks Thomas :)
It it would be nice for either::Either to implement the futures traits as you asked on rayon-rs/either#79. But it seems you also submitted rust-lang/futures-rs#2691 which will allow us to remove the unsafe code right? Ideally what would be really nice was for a single Either type 😀 which will be possible when rust-lang/futures-rs#2691 is merged right? Nonetheless thanks for your work simplifying this Thomas!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I saw after, thanks Thomas :)
It it would be nice for either::Either to implement the futures traits as you asked on rayon-rs/either#79. But it seems you also submitted rust-lang/futures-rs#2691 which will allow us to remove the unsafe code right?

Correct!

Ideally what would be really nice was for a single Either type 😀 which will be possible when rust-lang/futures-rs#2691 is merged right?

Not quite unfortunately, we also use Either in places where it needs to implement std::error::Error for example and futures::Either doesn't AFAIK. I agree that it would be nice but at least after these patches, we don't have a problem with there being two different Either types.

Nonetheless thanks for your work simplifying this Thomas!

You are welcome ☺️

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdyt of then leaving a TODO on the unsafe code to be removed when a futures gets released with rust-lang/futures-rs#2691 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdyt of then leaving a TODO on the unsafe code to be removed when a futures gets released with rust-lang/futures-rs#2691 ?

Do you mean like line 93? 🙈

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah! right LOL 😂

either: Pin<&mut future::Either<A, B>>,
) -> future::Either<Pin<&mut A>, Pin<&mut B>> {
// SAFETY: `get_unchecked_mut` is fine because we don't move anything.
// We can use `new_unchecked` because the `inner` parts are guaranteed
// to be pinned, as they come from `self` which is pinned, and we never
// offer an unpinned `&mut L` or `&mut R` through `Pin<&mut Self>`. We
// also don't have an implementation of `Drop`, nor manual `Unpin`.
unsafe {
match *Pin::get_unchecked_mut(either) {
future::Either::Left(ref mut inner) => future::Either::Left(Pin::new_unchecked(inner)),
future::Either::Right(ref mut inner) => {
future::Either::Right(Pin::new_unchecked(inner))
}
}
}
}
Expand All @@ -238,15 +123,15 @@ where
AFuture: TryFuture<Ok = AInner>,
BFuture: TryFuture<Ok = BInner>,
{
type Output = Result<EitherOutput<AInner, BInner>, Either<AFuture::Error, BFuture::Error>>;
type Output = Result<future::Either<AInner, BInner>, Either<AFuture::Error, BFuture::Error>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
EitherFutureProj::First(a) => TryFuture::try_poll(a, cx)
.map_ok(EitherOutput::First)
.map_ok(future::Either::Left)
.map_err(Either::Left),
EitherFutureProj::Second(a) => TryFuture::try_poll(a, cx)
.map_ok(EitherOutput::Second)
.map_ok(future::Either::Right)
.map_err(Either::Right),
}
}
Expand All @@ -272,7 +157,7 @@ where
B: Transport,
A: Transport,
{
type Output = EitherOutput<A::Output, B::Output>;
type Output = future::Either<A::Output, B::Output>;
type Error = Either<A::Error, B::Error>;
type ListenerUpgrade = EitherFuture<A::ListenerUpgrade, B::ListenerUpgrade>;
type Dial = EitherFuture<A::Dial, B::Dial>;
Expand Down
5 changes: 3 additions & 2 deletions core/src/transport/choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::either::{EitherFuture, EitherOutput};
use crate::either::EitherFuture;
use crate::transport::{ListenerId, Transport, TransportError, TransportEvent};
use either::Either;
use futures::future;
use multiaddr::Multiaddr;
use std::{pin::Pin, task::Context, task::Poll};

Expand All @@ -40,7 +41,7 @@ where
B: Transport,
A: Transport,
{
type Output = EitherOutput<A::Output, B::Output>;
type Output = future::Either<A::Output, B::Output>;
type Error = Either<A::Error, B::Error>;
type ListenerUpgrade = EitherFuture<A::ListenerUpgrade, B::ListenerUpgrade>;
type Dial = EitherFuture<A::Dial, B::Dial>;
Expand Down
7 changes: 4 additions & 3 deletions core/src/upgrade/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
// DEALINGS IN THE SOFTWARE.

use crate::{
either::{EitherFuture, EitherName, EitherOutput},
either::{EitherFuture, EitherName},
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
};
use either::Either;
use futures::future;

impl<A, B> UpgradeInfo for Either<A, B>
where
Expand All @@ -48,7 +49,7 @@ where
A: InboundUpgrade<C, Output = TA, Error = EA>,
B: InboundUpgrade<C, Output = TB, Error = EB>,
{
type Output = EitherOutput<TA, TB>;
type Output = future::Either<TA, TB>;
type Error = Either<EA, EB>;
type Future = EitherFuture<A::Future, B::Future>;

Expand All @@ -70,7 +71,7 @@ where
A: OutboundUpgrade<C, Output = TA, Error = EA>,
B: OutboundUpgrade<C, Output = TB, Error = EB>,
{
type Output = EitherOutput<TA, TB>;
type Output = future::Either<TA, TB>;
type Error = Either<EA, EB>;
type Future = EitherFuture<A::Future, B::Future>;

Expand Down
7 changes: 4 additions & 3 deletions core/src/upgrade/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

use crate::either::EitherFuture;
use crate::{
either::{EitherName, EitherOutput},
either::EitherName,
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
};
use either::Either;
use futures::future;

/// Upgrade that combines two upgrades into one. Supports all the protocols supported by either
/// sub-upgrade.
Expand Down Expand Up @@ -65,7 +66,7 @@ where
A: InboundUpgrade<C, Output = TA, Error = EA>,
B: InboundUpgrade<C, Output = TB, Error = EB>,
{
type Output = EitherOutput<TA, TB>;
type Output = future::Either<TA, TB>;
type Error = Either<EA, EB>;
type Future = EitherFuture<A::Future, B::Future>;

Expand All @@ -82,7 +83,7 @@ where
A: OutboundUpgrade<C, Output = TA, Error = EA>,
B: OutboundUpgrade<C, Output = TB, Error = EB>,
{
type Output = EitherOutput<TA, TB>;
type Output = future::Either<TA, TB>;
type Error = Either<EA, EB>;
type Future = EitherFuture<A::Future, B::Future>;

Expand Down
6 changes: 3 additions & 3 deletions protocols/dcutr/src/handler/relayed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@

use crate::protocol;
use either::Either;
use futures::future;
use futures::future::{BoxFuture, FutureExt};
use instant::Instant;
use libp2p_core::either::EitherOutput;
use libp2p_core::multiaddr::Multiaddr;
use libp2p_core::upgrade::{DeniedUpgrade, NegotiationError, UpgradeError};
use libp2p_core::ConnectedPoint;
Expand Down Expand Up @@ -174,7 +174,7 @@ impl Handler {
>,
) {
match output {
EitherOutput::First(inbound_connect) => {
future::Either::Left(inbound_connect) => {
let remote_addr = match &self.endpoint {
ConnectedPoint::Dialer { address, role_override: _ } => address.clone(),
ConnectedPoint::Listener { ..} => unreachable!("`<Handler as ConnectionHandler>::listen_protocol` denies all incoming substreams as a listener."),
Expand All @@ -187,7 +187,7 @@ impl Handler {
));
}
// A connection listener denies all incoming substreams, thus none can ever be fully negotiated.
EitherOutput::Second(output) => void::unreachable(output),
future::Either::Right(output) => void::unreachable(output),
}
}

Expand Down
Loading