From a5cd0d0e03ca24ac3a458d45ec1945c31136fede Mon Sep 17 00:00:00 2001 From: Denis Garus Date: Wed, 24 May 2023 14:13:12 +0300 Subject: [PATCH] feat(request-response): add modules for `json` and `cbor` messages This patch adds two modules to `libp2p::request_response`: - `cbor` - `json` Both define a `Behaviour` type-alias that comes with a `Codec` implementation which uses the respective `serde` crate to serialize and deserialize the messages. Fixes #3905. Pull-Request: #3952. --- Cargo.lock | 14 ++ examples/file-sharing/src/network.rs | 2 +- protocols/autonat/src/behaviour.rs | 2 +- protocols/request-response/CHANGELOG.md | 7 +- protocols/request-response/Cargo.toml | 9 + protocols/request-response/src/cbor.rs | 214 +++++++++++++++++++++++ protocols/request-response/src/json.rs | 202 +++++++++++++++++++++ protocols/request-response/src/lib.rs | 29 ++- protocols/request-response/tests/ping.rs | 26 +-- 9 files changed, 489 insertions(+), 16 deletions(-) create mode 100644 protocols/request-response/src/cbor.rs create mode 100644 protocols/request-response/src/json.rs diff --git a/Cargo.lock b/Cargo.lock index 6be8ba987a3..27f94fdbc36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2980,6 +2980,7 @@ dependencies = [ "async-trait", "env_logger 0.10.0", "futures", + "futures_ringbuf", "instant", "libp2p-core", "libp2p-identity", @@ -2990,6 +2991,9 @@ dependencies = [ "libp2p-yamux", "log", "rand 0.8.5", + "serde", + "serde_cbor", + "serde_json", "smallvec", "void", ] @@ -4653,6 +4657,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_cbor" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5" +dependencies = [ + "half", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.163" diff --git a/examples/file-sharing/src/network.rs b/examples/file-sharing/src/network.rs index 49479bf81ed..f497d0ce299 100644 --- a/examples/file-sharing/src/network.rs +++ b/examples/file-sharing/src/network.rs @@ -60,7 +60,7 @@ pub(crate) async fn new( transport, ComposedBehaviour { kademlia: Kademlia::new(peer_id, MemoryStore::new(peer_id)), - request_response: request_response::Behaviour::new( + request_response: request_response::Behaviour::with_codec( FileExchangeCodec(), iter::once(( StreamProtocol::new("/file-exchange/1"), diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index 926445c9e49..439543f8318 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -222,7 +222,7 @@ impl Behaviour { let protocols = iter::once((DEFAULT_PROTOCOL_NAME, ProtocolSupport::Full)); let mut cfg = request_response::Config::default(); cfg.set_request_timeout(config.timeout); - let inner = request_response::Behaviour::new(AutoNatCodec, protocols, cfg); + let inner = request_response::Behaviour::with_codec(AutoNatCodec, protocols, cfg); Self { local_peer_id, inner, diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 442906f4621..222cf59c347 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -1,5 +1,9 @@ ## 0.25.0 - unreleased +- Add `request_response::json::Behaviour` and `request_response::cbor::Behaviour` building on top of the `serde` traits. + To conveniently construct these, we remove the `Codec` parameter from `Behaviour::new` and add `Behaviour::with_codec`. + See [PR 3952]. + - Raise MSRV to 1.65. See [PR 3715]. - Remove deprecated `RequestResponse` prefixed items. See [PR 3702]. @@ -8,10 +12,11 @@ These variants are no longer constructed. See [PR 3605]. -- Don't close connections if individual streams fail. +- Don't close connections if individual streams fail. Log the error instead. See [PR 3913]. +[PR 3952]: https://github.com/libp2p/rust-libp2p/pull/3952 [PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605 [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 [PR 3702]: https://github.com/libp2p/rust-libp2p/pull/3702 diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index 55108a2411f..9e733c8a50b 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -18,10 +18,17 @@ libp2p-core = { workspace = true } libp2p-swarm = { workspace = true } libp2p-identity = { workspace = true } rand = "0.8" +serde = { version = "1.0", optional = true} +serde_json = { version = "1.0.96", optional = true } +serde_cbor = { version = "0.11.2", optional = true } smallvec = "1.6.1" void = "1.0.2" log = "0.4.17" +[features] +json = ["dep:serde", "dep:serde_json", "libp2p-swarm/macros"] +cbor = ["dep:serde", "dep:serde_cbor", "libp2p-swarm/macros"] + [dev-dependencies] async-std = { version = "1.6.2", features = ["attributes"] } env_logger = "0.10.0" @@ -30,6 +37,8 @@ libp2p-tcp = { workspace = true, features = ["async-io"] } libp2p-yamux = { workspace = true } rand = "0.8" libp2p-swarm-test = { workspace = true } +futures_ringbuf = "0.3.1" +serde = { version = "1.0", features = ["derive"]} # Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling diff --git a/protocols/request-response/src/cbor.rs b/protocols/request-response/src/cbor.rs new file mode 100644 index 00000000000..b7c520c72b9 --- /dev/null +++ b/protocols/request-response/src/cbor.rs @@ -0,0 +1,214 @@ +// Copyright 2023 Protocol Labs +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +/// A request-response behaviour using [`serde_cbor`] for serializing and deserializing the messages. +/// +/// # Example +/// +/// ``` +/// # use libp2p_request_response::{cbor, ProtocolSupport, self as request_response}; +/// # use libp2p_swarm::{StreamProtocol, SwarmBuilder}; +/// #[derive(Debug, serde::Serialize, serde::Deserialize)] +/// struct GreetRequest { +/// name: String, +/// } +/// +/// #[derive(Debug, serde::Serialize, serde::Deserialize)] +/// struct GreetResponse { +/// message: String, +/// } +/// +/// let behaviour = cbor::Behaviour::::new( +/// [(StreamProtocol::new("/my-cbor-protocol"), ProtocolSupport::Full)], +/// request_response::Config::default() +/// ); +/// ``` +pub type Behaviour = crate::Behaviour>; + +mod codec { + use async_trait::async_trait; + use futures::prelude::*; + use futures::{AsyncRead, AsyncWrite}; + use libp2p_swarm::StreamProtocol; + use serde::{de::DeserializeOwned, Serialize}; + use std::{io, marker::PhantomData}; + + /// Max request size in bytes + const REQUEST_SIZE_MAXIMUM: u64 = 1024 * 1024; + /// Max response size in bytes + const RESPONSE_SIZE_MAXIMUM: u64 = 10 * 1024 * 1024; + + pub struct Codec { + phantom: PhantomData<(Req, Resp)>, + } + + impl Default for Codec { + fn default() -> Self { + Codec { + phantom: PhantomData, + } + } + } + + impl Clone for Codec { + fn clone(&self) -> Self { + Self::default() + } + } + + #[async_trait] + impl crate::Codec for Codec + where + Req: Send + Serialize + DeserializeOwned, + Resp: Send + Serialize + DeserializeOwned, + { + type Protocol = StreamProtocol; + type Request = Req; + type Response = Resp; + + async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut vec = Vec::new(); + + io.take(REQUEST_SIZE_MAXIMUM).read_to_end(&mut vec).await?; + + serde_cbor::from_slice(vec.as_slice()).map_err(into_io_error) + } + + async fn read_response(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut vec = Vec::new(); + + io.take(RESPONSE_SIZE_MAXIMUM).read_to_end(&mut vec).await?; + + serde_cbor::from_slice(vec.as_slice()).map_err(into_io_error) + } + + async fn write_request( + &mut self, + _: &Self::Protocol, + io: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + let data: Vec = serde_cbor::to_vec(&req).map_err(into_io_error)?; + + io.write_all(data.as_ref()).await?; + + Ok(()) + } + + async fn write_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + resp: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + let data: Vec = serde_cbor::to_vec(&resp).map_err(into_io_error).unwrap(); + + io.write_all(data.as_ref()).await?; + + Ok(()) + } + } + + fn into_io_error(err: serde_cbor::Error) -> io::Error { + if err.is_syntax() || err.is_data() { + return io::Error::new(io::ErrorKind::InvalidData, err); + } + + if err.is_eof() { + return io::Error::new(io::ErrorKind::UnexpectedEof, err); + } + + io::Error::new(io::ErrorKind::Other, err) + } +} + +#[cfg(test)] +mod tests { + use crate::cbor::codec::Codec; + use crate::Codec as _; + use futures::AsyncWriteExt; + use futures_ringbuf::Endpoint; + use libp2p_swarm::StreamProtocol; + use serde::{Deserialize, Serialize}; + + #[async_std::test] + async fn test_codec() { + let expected_request = TestRequest { + payload: "test_payload".to_string(), + }; + let expected_response = TestResponse { + payload: "test_payload".to_string(), + }; + let protocol = StreamProtocol::new("/test_cbor/1"); + let mut codec = Codec::default(); + + let (mut a, mut b) = Endpoint::pair(124, 124); + codec + .write_request(&protocol, &mut a, expected_request.clone()) + .await + .expect("Should write request"); + a.close().await.unwrap(); + + let actual_request = codec + .read_request(&protocol, &mut b) + .await + .expect("Should read request"); + b.close().await.unwrap(); + + assert_eq!(actual_request, expected_request); + + let (mut a, mut b) = Endpoint::pair(124, 124); + codec + .write_response(&protocol, &mut a, expected_response.clone()) + .await + .expect("Should write response"); + a.close().await.unwrap(); + + let actual_response = codec + .read_response(&protocol, &mut b) + .await + .expect("Should read response"); + b.close().await.unwrap(); + + assert_eq!(actual_response, expected_response); + } + + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] + struct TestRequest { + payload: String, + } + + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] + struct TestResponse { + payload: String, + } +} diff --git a/protocols/request-response/src/json.rs b/protocols/request-response/src/json.rs new file mode 100644 index 00000000000..0b3d634573b --- /dev/null +++ b/protocols/request-response/src/json.rs @@ -0,0 +1,202 @@ +// Copyright 2023 Protocol Labs +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +/// A request-response behaviour using [`serde_json`] for serializing and deserializing the messages. +/// +/// # Example +/// +/// ``` +/// # use libp2p_request_response::{json, ProtocolSupport, self as request_response}; +/// # use libp2p_swarm::{StreamProtocol}; +/// #[derive(Debug, serde::Serialize, serde::Deserialize)] +/// struct GreetRequest { +/// name: String, +/// } +/// +/// #[derive(Debug, serde::Serialize, serde::Deserialize)] +/// struct GreetResponse { +/// message: String, +/// } +/// +/// let behaviour = json::Behaviour::::new( +/// [(StreamProtocol::new("/my-json-protocol"), ProtocolSupport::Full)], +/// request_response::Config::default() +/// ); +/// ``` +pub type Behaviour = crate::Behaviour>; + +mod codec { + use async_trait::async_trait; + use futures::prelude::*; + use futures::{AsyncRead, AsyncWrite}; + use libp2p_swarm::StreamProtocol; + use serde::{de::DeserializeOwned, Serialize}; + use std::{io, marker::PhantomData}; + + /// Max request size in bytes + const REQUEST_SIZE_MAXIMUM: u64 = 1024 * 1024; + /// Max response size in bytes + const RESPONSE_SIZE_MAXIMUM: u64 = 10 * 1024 * 1024; + + pub struct Codec { + phantom: PhantomData<(Req, Resp)>, + } + + impl Default for Codec { + fn default() -> Self { + Codec { + phantom: PhantomData, + } + } + } + + impl Clone for Codec { + fn clone(&self) -> Self { + Self::default() + } + } + + #[async_trait] + impl crate::Codec for Codec + where + Req: Send + Serialize + DeserializeOwned, + Resp: Send + Serialize + DeserializeOwned, + { + type Protocol = StreamProtocol; + type Request = Req; + type Response = Resp; + + async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut vec = Vec::new(); + + io.take(REQUEST_SIZE_MAXIMUM).read_to_end(&mut vec).await?; + + Ok(serde_json::from_slice(vec.as_slice())?) + } + + async fn read_response(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut vec = Vec::new(); + + io.take(RESPONSE_SIZE_MAXIMUM).read_to_end(&mut vec).await?; + + Ok(serde_json::from_slice(vec.as_slice())?) + } + + async fn write_request( + &mut self, + _: &Self::Protocol, + io: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + let data = serde_json::to_vec(&req)?; + + io.write_all(data.as_ref()).await?; + + Ok(()) + } + + async fn write_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + resp: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + let data = serde_json::to_vec(&resp)?; + + io.write_all(data.as_ref()).await?; + + Ok(()) + } + } +} + +#[cfg(test)] +mod tests { + use crate::Codec; + use futures::AsyncWriteExt; + use futures_ringbuf::Endpoint; + use libp2p_swarm::StreamProtocol; + use serde::{Deserialize, Serialize}; + + #[async_std::test] + async fn test_codec() { + let expected_request = TestRequest { + payload: "test_payload".to_string(), + }; + let expected_response = TestResponse { + payload: "test_payload".to_string(), + }; + let protocol = StreamProtocol::new("/test_json/1"); + let mut codec: super::codec::Codec = + super::codec::Codec::default(); + + let (mut a, mut b) = Endpoint::pair(124, 124); + codec + .write_request(&protocol, &mut a, expected_request.clone()) + .await + .expect("Should write request"); + a.close().await.unwrap(); + + let actual_request = codec + .read_request(&protocol, &mut b) + .await + .expect("Should read request"); + b.close().await.unwrap(); + + assert_eq!(actual_request, expected_request); + + let (mut a, mut b) = Endpoint::pair(124, 124); + codec + .write_response(&protocol, &mut a, expected_response.clone()) + .await + .expect("Should write response"); + a.close().await.unwrap(); + + let actual_response = codec + .read_response(&protocol, &mut b) + .await + .expect("Should read response"); + b.close().await.unwrap(); + + assert_eq!(actual_response, expected_response); + } + + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] + struct TestRequest { + payload: String, + } + + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] + struct TestResponse { + payload: String, + } +} diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 1c3f4cf725f..7b1a8088443 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -28,7 +28,7 @@ //! over the actual messages being sent, which are defined in terms of a //! [`Codec`]. Creating a request/response protocol thus amounts //! to providing an implementation of this trait which can then be -//! given to [`Behaviour::new`]. Further configuration options are +//! given to [`Behaviour::with_codec`]. Further configuration options are //! available via the [`Config`]. //! //! Requests are sent using [`Behaviour::send_request`] and the @@ -39,6 +39,14 @@ //! receiving a [`Message::Request`] via //! [`Event::Message`]. //! +//! ## Predefined codecs +//! +//! In case your message types implement [`serde::Serialize`] and [`serde::Deserialize`], +//! you can use two predefined behaviours: +//! +//! - [`cbor::Behaviour`] for CBOR-encoded messages +//! - [`json::Behaviour`] for JSON-encoded messages +//! //! ## Protocol Families //! //! A single [`Behaviour`] instance can be used with an entire @@ -58,8 +66,12 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +#[cfg(feature = "cbor")] +pub mod cbor; mod codec; mod handler; +#[cfg(feature = "json")] +pub mod json; pub use codec::Codec; pub use handler::ProtocolSupport; @@ -328,13 +340,26 @@ where pending_outbound_requests: HashMap; 10]>>, } +impl Behaviour +where + TCodec: Codec + Default + Clone + Send + 'static, +{ + /// Creates a new `Behaviour` for the given protocols and configuration, using [`Default`] to construct the codec. + pub fn new(protocols: I, cfg: Config) -> Self + where + I: IntoIterator, + { + Self::with_codec(TCodec::default(), protocols, cfg) + } +} + impl Behaviour where TCodec: Codec + Clone + Send + 'static, { /// Creates a new `Behaviour` for the given /// protocols, codec and configuration. - pub fn new(codec: TCodec, protocols: I, cfg: Config) -> Self + pub fn with_codec(codec: TCodec, protocols: I, cfg: Config) -> Self where I: IntoIterator, { diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 48860b5887f..42baf0cf773 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -43,8 +43,9 @@ async fn is_response_outbound() { )); let cfg = request_response::Config::default(); - let mut swarm1 = - Swarm::new_ephemeral(|_| request_response::Behaviour::new(PingCodec(), protocols, cfg)); + let mut swarm1 = Swarm::new_ephemeral(|_| { + request_response::Behaviour::with_codec(PingCodec(), protocols, cfg) + }); let request_id1 = swarm1 .behaviour_mut() @@ -87,11 +88,12 @@ async fn ping_protocol() { let cfg = request_response::Config::default(); let mut swarm1 = Swarm::new_ephemeral(|_| { - request_response::Behaviour::new(PingCodec(), protocols.clone(), cfg.clone()) + request_response::Behaviour::with_codec(PingCodec(), protocols.clone(), cfg.clone()) }); let peer1_id = *swarm1.local_peer_id(); - let mut swarm2 = - Swarm::new_ephemeral(|_| request_response::Behaviour::new(PingCodec(), protocols, cfg)); + let mut swarm2 = Swarm::new_ephemeral(|_| { + request_response::Behaviour::with_codec(PingCodec(), protocols, cfg) + }); let peer2_id = *swarm2.local_peer_id(); swarm1.listen().await; @@ -178,11 +180,12 @@ async fn emits_inbound_connection_closed_failure() { let cfg = request_response::Config::default(); let mut swarm1 = Swarm::new_ephemeral(|_| { - request_response::Behaviour::new(PingCodec(), protocols.clone(), cfg.clone()) + request_response::Behaviour::with_codec(PingCodec(), protocols.clone(), cfg.clone()) }); let peer1_id = *swarm1.local_peer_id(); - let mut swarm2 = - Swarm::new_ephemeral(|_| request_response::Behaviour::new(PingCodec(), protocols, cfg)); + let mut swarm2 = Swarm::new_ephemeral(|_| { + request_response::Behaviour::with_codec(PingCodec(), protocols, cfg) + }); let peer2_id = *swarm2.local_peer_id(); swarm1.listen().await; @@ -241,11 +244,12 @@ async fn emits_inbound_connection_closed_if_channel_is_dropped() { let cfg = request_response::Config::default(); let mut swarm1 = Swarm::new_ephemeral(|_| { - request_response::Behaviour::new(PingCodec(), protocols.clone(), cfg.clone()) + request_response::Behaviour::with_codec(PingCodec(), protocols.clone(), cfg.clone()) }); let peer1_id = *swarm1.local_peer_id(); - let mut swarm2 = - Swarm::new_ephemeral(|_| request_response::Behaviour::new(PingCodec(), protocols, cfg)); + let mut swarm2 = Swarm::new_ephemeral(|_| { + request_response::Behaviour::with_codec(PingCodec(), protocols, cfg) + }); let peer2_id = *swarm2.local_peer_id(); swarm1.listen().await;