Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rendezvous): rewrite using libp2p-request-response #4051

Merged
merged 37 commits into from
Jun 14, 2023
Merged
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
9c694dc
response_request::Codec implementation
dgarus May 31, 2023
9da802f
Rendezvous based on request_response behaviour
dgarus Jun 8, 2023
8c80b08
Merge branch 'master' into 3878-rendezvous-based-on-req-resp
dgarus Jun 8, 2023
7fdfe43
Merge branch 'master' into 3878-rendezvous-based-on-req-resp
dgarus Jun 9, 2023
2a794e0
Fix test failure
dgarus Jun 9, 2023
386b503
Fixing clippy fail
dgarus Jun 9, 2023
59663ef
Fixing clippy fail
dgarus Jun 9, 2023
1e738f8
Fixing clippy fail
dgarus Jun 9, 2023
ed26e82
Merge branch 'master' into 3878-rendezvous-based-on-req-resp
dgarus Jun 12, 2023
c0751f9
Update protocols/rendezvous/Cargo.toml
dgarus Jun 13, 2023
4ce78c5
Update protocols/rendezvous/src/codec.rs
dgarus Jun 13, 2023
111b640
Fixed review comments
dgarus Jun 13, 2023
2990635
Fixed review comments
dgarus Jun 13, 2023
3d7c5ee
Fixed review comments
dgarus Jun 13, 2023
a8fa740
Fixed review comments
dgarus Jun 13, 2023
1656e29
Merge branch 'master' into 3878-rendezvous-based-on-req-resp
dgarus Jun 13, 2023
a33519a
Merge branch 'master' into 3878-rendezvous-based-on-req-resp
dgarus Jun 13, 2023
20dd3a3
Fixed review comments
dgarus Jun 13, 2023
fe3d347
Merge branch 'master' into 3878-rendezvous-based-on-req-resp
dgarus Jun 13, 2023
7deff18
Merge branch 'master' into 3878-rendezvous-based-on-req-resp
dgarus Jun 13, 2023
677fafc
Merge branch 'master' into 3878-rendezvous-based-on-req-resp
dgarus Jun 13, 2023
5d0cfdc
Update protocols/rendezvous/src/client.rs
dgarus Jun 13, 2023
1d2716b
Use combinator for handling EOF
thomaseizinger Jun 13, 2023
75f9e0f
Handle OutboundFailure for register and discover requests
thomaseizinger Jun 13, 2023
1a615c4
Fixed review comments
dgarus Jun 14, 2023
01965ac
Merge branch 'master' into 3878-rendezvous-based-on-req-resp
dgarus Jun 14, 2023
6b64e22
Merge remote-tracking branch 'origin/3878-rendezvous-based-on-req-res…
dgarus Jun 14, 2023
9a6e0dd
Fixed the server's behaviour fn poll()
dgarus Jun 14, 2023
15921f6
Fixed fmt fail
dgarus Jun 14, 2023
eb99764
Refactoring
dgarus Jun 14, 2023
b71b961
Refactoring server
dgarus Jun 14, 2023
dfeb11e
Removed the recursion
dgarus Jun 14, 2023
0a1e48a
fn poll is wrapped into the loop
dgarus Jun 14, 2023
58e75f6
Got rid of RendezvousCodec
dgarus Jun 14, 2023
ba058d5
fixed fmt
dgarus Jun 14, 2023
216cf1c
Merge branch 'master' into 3878-rendezvous-based-on-req-resp
dgarus Jun 14, 2023
d4c65d4
Fixed review comment
dgarus Jun 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 73 additions & 34 deletions protocols/rendezvous/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
// DEALINGS IN THE SOFTWARE.

use crate::codec::Message::*;
use crate::codec::{Cookie, ErrorCode, Message, Namespace, NewRegistration, Registration, Ttl};
use crate::codec::{
Codec, Cookie, ErrorCode, Message, Namespace, NewRegistration, Registration, Ttl,
};
use futures::future::BoxFuture;
use futures::future::FutureExt;
use futures::stream::FuturesUnordered;
Expand Down Expand Up @@ -237,56 +239,93 @@ impl NetworkBehaviour for Behaviour {
cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
if let Some(event) = self.error_events.pop_front() {
return Poll::Ready(ToSwarm::GenerateEvent(event));
}
use libp2p_request_response as req_res;

loop {
if let Some(event) = self.error_events.pop_front() {
return Poll::Ready(ToSwarm::GenerateEvent(event));
}

let result = self.inner.poll(cx, params).map(|to_swarm| {
to_swarm.map_out(|event| match event {
libp2p_request_response::Event::Message {
peer: _,
match self.inner.poll(cx, params) {
Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::Message {
message:
libp2p_request_response::Message::Response {
req_res::Message::Response {
request_id,
response,
},
} => self.handle_response(&request_id, response),
libp2p_request_response::Event::OutboundFailure {
peer: _,
..
})) => {
let event = self.handle_response(&request_id, response);

return Poll::Ready(ToSwarm::GenerateEvent(event));
}
Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::OutboundFailure {
request_id,
error: _,
} => {
let (rendezvous_node, namespace) = self
.waiting_for_register
.remove(&request_id)
.unwrap_or_else(|| panic!("unknown request_id: {request_id}"));
Event::RegisterFailed(RegisterError::Remote {
rendezvous_node,
namespace,
error: ErrorCode::Unavailable,
})
..
})) => {
if let Some((rendezvous_node, namespace)) =
self.waiting_for_register.remove(&request_id)
{
return Poll::Ready(ToSwarm::GenerateEvent(Event::RegisterFailed(
RegisterError::Remote {
rendezvous_node,
namespace,
error: ErrorCode::Unavailable,
},
)));
};

if let Some((rendezvous_node, namespace)) =
self.waiting_for_discovery.remove(&request_id)
{
return Poll::Ready(ToSwarm::GenerateEvent(Event::DiscoverFailed {
rendezvous_node,
namespace,
error: ErrorCode::Unavailable,
}));
};

continue; // not a request we care about
}
libp2p_request_response::Event::InboundFailure { .. } | libp2p_request_response::Event::ResponseSent { .. } | libp2p_request_response::Event::Message {
peer: _,
message: libp2p_request_response::Message::Request { .. },
} => {
Poll::Ready(ToSwarm::GenerateEvent(
req_res::Event::InboundFailure { .. }
| req_res::Event::ResponseSent { .. }
| req_res::Event::Message {
message: req_res::Message::Request { .. },
..
},
)) => {
unreachable!("rendezvous clients never receive requests")
}
})
});
Poll::Ready(
other @ (ToSwarm::ExternalAddrConfirmed(_)
| ToSwarm::ExternalAddrExpired(_)
| ToSwarm::NewExternalAddrCandidate(_)
| ToSwarm::NotifyHandler { .. }
| ToSwarm::Dial { .. }
| ToSwarm::CloseConnection { .. }
| ToSwarm::ListenOn { .. }
| ToSwarm::RemoveListener { .. }),
) => {
let new_to_swarm =
other.map_out(|_| unreachable!("we manually map `GenerateEvent` variants"));

return Poll::Ready(new_to_swarm);
}
Poll::Pending => {}
dgarus marked this conversation as resolved.
Show resolved Hide resolved
}

if result.is_pending() {
if let Some(expired_registration) =
futures::ready!(self.expiring_registrations.poll_next_unpin(cx))
if let Poll::Ready(Some(expired_registration)) =
self.expiring_registrations.poll_next_unpin(cx)
{
self.discovered_peers.remove(&expired_registration);
return Poll::Ready(ToSwarm::GenerateEvent(Event::Expired {
peer: expired_registration.0,
}));
}
}

result
return Poll::Pending;
}
}

fn handle_pending_outbound_connection(
Expand Down