Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rendezvous): rewrite using libp2p-request-response #4051

Merged
merged 37 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
9c694dc
response_request::Codec implementation
dgarus May 31, 2023
9da802f
Rendezvous based on request_response behaviour
dgarus Jun 8, 2023
8c80b08
Merge branch 'master' into 3878-rendezvous-based-on-req-resp
dgarus Jun 8, 2023
7fdfe43
Merge branch 'master' into 3878-rendezvous-based-on-req-resp
dgarus Jun 9, 2023
2a794e0
Fix test failure
dgarus Jun 9, 2023
386b503
Fixing clippy fail
dgarus Jun 9, 2023
59663ef
Fixing clippy fail
dgarus Jun 9, 2023
1e738f8
Fixing clippy fail
dgarus Jun 9, 2023
ed26e82
Merge branch 'master' into 3878-rendezvous-based-on-req-resp
dgarus Jun 12, 2023
c0751f9
Update protocols/rendezvous/Cargo.toml
dgarus Jun 13, 2023
4ce78c5
Update protocols/rendezvous/src/codec.rs
dgarus Jun 13, 2023
111b640
Fixed review comments
dgarus Jun 13, 2023
2990635
Fixed review comments
dgarus Jun 13, 2023
3d7c5ee
Fixed review comments
dgarus Jun 13, 2023
a8fa740
Fixed review comments
dgarus Jun 13, 2023
1656e29
Merge branch 'master' into 3878-rendezvous-based-on-req-resp
dgarus Jun 13, 2023
a33519a
Merge branch 'master' into 3878-rendezvous-based-on-req-resp
dgarus Jun 13, 2023
20dd3a3
Fixed review comments
dgarus Jun 13, 2023
fe3d347
Merge branch 'master' into 3878-rendezvous-based-on-req-resp
dgarus Jun 13, 2023
7deff18
Merge branch 'master' into 3878-rendezvous-based-on-req-resp
dgarus Jun 13, 2023
677fafc
Merge branch 'master' into 3878-rendezvous-based-on-req-resp
dgarus Jun 13, 2023
5d0cfdc
Update protocols/rendezvous/src/client.rs
dgarus Jun 13, 2023
1d2716b
Use combinator for handling EOF
thomaseizinger Jun 13, 2023
75f9e0f
Handle OutboundFailure for register and discover requests
thomaseizinger Jun 13, 2023
1a615c4
Fixed review comments
dgarus Jun 14, 2023
01965ac
Merge branch 'master' into 3878-rendezvous-based-on-req-resp
dgarus Jun 14, 2023
6b64e22
Merge remote-tracking branch 'origin/3878-rendezvous-based-on-req-res…
dgarus Jun 14, 2023
9a6e0dd
Fixed the server's behaviour fn poll()
dgarus Jun 14, 2023
15921f6
Fixed fmt fail
dgarus Jun 14, 2023
eb99764
Refactoring
dgarus Jun 14, 2023
b71b961
Refactoring server
dgarus Jun 14, 2023
dfeb11e
Removed the recursion
dgarus Jun 14, 2023
0a1e48a
fn poll is wrapped into the loop
dgarus Jun 14, 2023
58e75f6
Got rid of RendezvousCodec
dgarus Jun 14, 2023
ba058d5
fixed fmt
dgarus Jun 14, 2023
216cf1c
Merge branch 'master' into 3878-rendezvous-based-on-req-resp
dgarus Jun 14, 2023
d4c65d4
Fixed review comment
dgarus Jun 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 133 additions & 89 deletions protocols/rendezvous/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,56 +237,77 @@ impl NetworkBehaviour for Behaviour {
cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
use libp2p_request_response as req_res;

if let Some(event) = self.error_events.pop_front() {
return Poll::Ready(ToSwarm::GenerateEvent(event));
}

let result = self.inner.poll(cx, params).map(|to_swarm| {
to_swarm.map_out(|event| match event {
libp2p_request_response::Event::Message {
peer: _,
loop {
match self.inner.poll(cx, params) {
Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::Message {
message:
libp2p_request_response::Message::Response {
req_res::Message::Response {
request_id,
response,
},
} => self.handle_response(&request_id, response),
libp2p_request_response::Event::OutboundFailure {
peer: _,
..
})) => {
if let Some(event) = self.handle_response(&request_id, response) {
return Poll::Ready(ToSwarm::GenerateEvent(event));
}

continue; // not a request we care about
}
Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::OutboundFailure {
request_id,
error: _,
} => {
let (rendezvous_node, namespace) = self
.waiting_for_register
.remove(&request_id)
.unwrap_or_else(|| panic!("unknown request_id: {request_id}"));
Event::RegisterFailed(RegisterError::Remote {
rendezvous_node,
namespace,
error: ErrorCode::Unavailable,
})
..
})) => {
if let Some(event) = self.event_for_outbound_failure(&request_id) {
return Poll::Ready(ToSwarm::GenerateEvent(event));
}

continue; // not a request we care about
}
libp2p_request_response::Event::InboundFailure { .. } | libp2p_request_response::Event::ResponseSent { .. } | libp2p_request_response::Event::Message {
peer: _,
message: libp2p_request_response::Message::Request { .. },
} => {
Poll::Ready(ToSwarm::GenerateEvent(
req_res::Event::InboundFailure { .. }
| req_res::Event::ResponseSent { .. }
| req_res::Event::Message {
message: req_res::Message::Request { .. },
..
},
)) => {
unreachable!("rendezvous clients never receive requests")
}
})
});
Poll::Ready(
other @ (ToSwarm::ExternalAddrConfirmed(_)
| ToSwarm::ExternalAddrExpired(_)
| ToSwarm::NewExternalAddrCandidate(_)
| ToSwarm::NotifyHandler { .. }
| ToSwarm::Dial { .. }
| ToSwarm::CloseConnection { .. }
| ToSwarm::ListenOn { .. }
| ToSwarm::RemoveListener { .. }),
) => {
let new_to_swarm =
other.map_out(|_| unreachable!("we manually map `GenerateEvent` variants"));

return Poll::Ready(new_to_swarm);
}
Poll::Pending => {}
dgarus marked this conversation as resolved.
Show resolved Hide resolved
}

if result.is_pending() {
if let Some(expired_registration) =
futures::ready!(self.expiring_registrations.poll_next_unpin(cx))
if let Poll::Ready(Some(expired_registration)) =
self.expiring_registrations.poll_next_unpin(cx)
{
self.discovered_peers.remove(&expired_registration);
return Poll::Ready(ToSwarm::GenerateEvent(Event::Expired {
peer: expired_registration.0,
}));
}
}

result
return Poll::Pending;
}
}

fn handle_pending_outbound_connection(
Expand Down Expand Up @@ -314,73 +335,96 @@ impl NetworkBehaviour for Behaviour {
}

impl Behaviour {
fn handle_response(&mut self, request_id: &RequestId, response: Message) -> Event {
fn event_for_outbound_failure(&mut self, req_id: &RequestId) -> Option<Event> {
if let Some((rendezvous_node, namespace)) = self.waiting_for_register.remove(req_id) {
return Some(Event::RegisterFailed(RegisterError::Remote {
rendezvous_node,
namespace,
error: ErrorCode::Unavailable,
}));
};

if let Some((rendezvous_node, namespace)) = self.waiting_for_discovery.remove(req_id) {
return Some(Event::DiscoverFailed {
rendezvous_node,
namespace,
error: ErrorCode::Unavailable,
});
};

None
}

fn handle_response(&mut self, request_id: &RequestId, response: Message) -> Option<Event> {
match response {
RegisterResponse(result) => {
let (rendezvous_node, namespace) = self
.waiting_for_register
.remove(request_id)
.unwrap_or_else(|| panic!("unknown request_id: {request_id}"));
match result {
Ok(ttl) => Event::Registered {
rendezvous_node,
ttl,
namespace,
},
Err(error_code) => Event::RegisterFailed(RegisterError::Remote {
rendezvous_node,
namespace,
error: error_code,
}),
if let Some((rendezvous_node, namespace)) =
self.waiting_for_register.remove(request_id)
{
return Some(match result {
dgarus marked this conversation as resolved.
Show resolved Hide resolved
Ok(ttl) => Event::Registered {
rendezvous_node,
ttl,
namespace,
},
Err(error_code) => Event::RegisterFailed(RegisterError::Remote {
rendezvous_node,
namespace,
error: error_code,
}),
});
}

None
}
DiscoverResponse(response) => {
let (rendezvous_node, ns) = self
.waiting_for_discovery
.remove(request_id)
.unwrap_or_else(|| panic!("unknown request_id: {request_id}"));
let res = match response {
Ok((registrations, cookie)) => {
self.discovered_peers
.extend(registrations.iter().map(|registration| {
let peer_id = registration.record.peer_id();
let namespace = registration.namespace.clone();

let addresses = registration.record.addresses().to_vec();

((peer_id, namespace), addresses)
}));

self.expiring_registrations
.extend(registrations.iter().cloned().map(|registration| {
async move {
// if the timer errors we consider it expired
futures_timer::Delay::new(Duration::from_secs(
registration.ttl,
))
.await;

(registration.record.peer_id(), registration.namespace)
}
.boxed()
}));

Event::Discovered {
rendezvous_node,
registrations,
cookie,
if let Some((rendezvous_node, ns)) = self.waiting_for_discovery.remove(request_id) {
let res = match response {
Ok((registrations, cookie)) => {
self.discovered_peers.extend(registrations.iter().map(
dgarus marked this conversation as resolved.
Show resolved Hide resolved
|registration| {
let peer_id = registration.record.peer_id();
let namespace = registration.namespace.clone();

let addresses = registration.record.addresses().to_vec();

((peer_id, namespace), addresses)
},
));

self.expiring_registrations
.extend(registrations.iter().cloned().map(|registration| {
async move {
// if the timer errors we consider it expired
futures_timer::Delay::new(Duration::from_secs(
registration.ttl,
))
.await;

(registration.record.peer_id(), registration.namespace)
}
.boxed()
}));

Event::Discovered {
rendezvous_node,
registrations,
cookie,
}
}
}
Err(error_code) => Event::DiscoverFailed {
rendezvous_node,
namespace: ns,
error: error_code,
},
};
Err(error_code) => Event::DiscoverFailed {
rendezvous_node,
namespace: ns,
error: error_code,
},
};

return Some(res);
}

res
None
}
_ => unreachable!(),
_ => unreachable!("rendezvous clients never receive requests"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't safe. We cannot statically ensure that we don't receive a request, the remote could send us any message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In follow-up work, we could redesign this to have two enums: one for requests and one for responses. Then our codec could ensure that and inbound message on the client is always a response.

}
}
}
19 changes: 10 additions & 9 deletions protocols/rendezvous/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,12 @@ impl libp2p_request_response::Codec for Codec {
where
T: AsyncRead + Unpin + Send,
{
if let Some(result) = FramedRead::new(io, RendezvousCodec::default()).next().await {
return Ok(result?);
}
let message = FramedRead::new(io, RendezvousCodec::default())
.next()
.await
.ok_or(io::ErrorKind::UnexpectedEof)??;

Err(io::ErrorKind::InvalidInput.into())
Ok(message)
dgarus marked this conversation as resolved.
Show resolved Hide resolved
}

async fn read_response<T>(
Expand All @@ -271,12 +272,12 @@ impl libp2p_request_response::Codec for Codec {
where
T: AsyncRead + Unpin + Send,
{
let mut frame = FramedRead::new(io, RendezvousCodec::default());
if let Some(result) = frame.next().await {
return Ok(result?);
}
let message = FramedRead::new(io, RendezvousCodec::default())
.next()
.await
.ok_or(io::ErrorKind::UnexpectedEof)??;

Err(io::ErrorKind::InvalidInput.into())
Ok(message)
}

async fn write_request<T>(
Expand Down
Loading