Skip to content

Commit

Permalink
feat(request-response): add modules for json and cbor messages
Browse files Browse the repository at this point in the history
This patch adds two modules to `libp2p::request_response`:

- `cbor`
- `json`

Both define a `Behaviour` type-alias that comes with a `Codec` implementation which uses the respective `serde` crate to serialize and deserialize the messages.

Fixes #3905.

Pull-Request: #3952.
  • Loading branch information
dgarus authored May 24, 2023
1 parent 5e8f2e8 commit a5cd0d0
Show file tree
Hide file tree
Showing 9 changed files with 489 additions and 16 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/file-sharing/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub(crate) async fn new(
transport,
ComposedBehaviour {
kademlia: Kademlia::new(peer_id, MemoryStore::new(peer_id)),
request_response: request_response::Behaviour::new(
request_response: request_response::Behaviour::with_codec(
FileExchangeCodec(),
iter::once((
StreamProtocol::new("/file-exchange/1"),
Expand Down
2 changes: 1 addition & 1 deletion protocols/autonat/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ impl Behaviour {
let protocols = iter::once((DEFAULT_PROTOCOL_NAME, ProtocolSupport::Full));
let mut cfg = request_response::Config::default();
cfg.set_request_timeout(config.timeout);
let inner = request_response::Behaviour::new(AutoNatCodec, protocols, cfg);
let inner = request_response::Behaviour::with_codec(AutoNatCodec, protocols, cfg);
Self {
local_peer_id,
inner,
Expand Down
7 changes: 6 additions & 1 deletion protocols/request-response/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 0.25.0 - unreleased

- Add `request_response::json::Behaviour` and `request_response::cbor::Behaviour` building on top of the `serde` traits.
To conveniently construct these, we remove the `Codec` parameter from `Behaviour::new` and add `Behaviour::with_codec`.
See [PR 3952].

- Raise MSRV to 1.65.
See [PR 3715].
- Remove deprecated `RequestResponse` prefixed items. See [PR 3702].
Expand All @@ -8,10 +12,11 @@
These variants are no longer constructed.
See [PR 3605].

- Don't close connections if individual streams fail.
- Don't close connections if individual streams fail.
Log the error instead.
See [PR 3913].

[PR 3952]: https://github.com/libp2p/rust-libp2p/pull/3952
[PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3702]: https://github.com/libp2p/rust-libp2p/pull/3702
Expand Down
9 changes: 9 additions & 0 deletions protocols/request-response/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@ libp2p-core = { workspace = true }
libp2p-swarm = { workspace = true }
libp2p-identity = { workspace = true }
rand = "0.8"
serde = { version = "1.0", optional = true}
serde_json = { version = "1.0.96", optional = true }
serde_cbor = { version = "0.11.2", optional = true }
smallvec = "1.6.1"
void = "1.0.2"
log = "0.4.17"

[features]
json = ["dep:serde", "dep:serde_json", "libp2p-swarm/macros"]
cbor = ["dep:serde", "dep:serde_cbor", "libp2p-swarm/macros"]

[dev-dependencies]
async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.10.0"
Expand All @@ -30,6 +37,8 @@ libp2p-tcp = { workspace = true, features = ["async-io"] }
libp2p-yamux = { workspace = true }
rand = "0.8"
libp2p-swarm-test = { workspace = true }
futures_ringbuf = "0.3.1"
serde = { version = "1.0", features = ["derive"]}

# Passing arguments to the docsrs builder in order to properly document cfg's.
# More information: https://docs.rs/about/builds#cross-compiling
Expand Down
214 changes: 214 additions & 0 deletions protocols/request-response/src/cbor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// Copyright 2023 Protocol Labs
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

/// A request-response behaviour using [`serde_cbor`] for serializing and deserializing the messages.
///
/// # Example
///
/// ```
/// # use libp2p_request_response::{cbor, ProtocolSupport, self as request_response};
/// # use libp2p_swarm::{StreamProtocol, SwarmBuilder};
/// #[derive(Debug, serde::Serialize, serde::Deserialize)]
/// struct GreetRequest {
/// name: String,
/// }
///
/// #[derive(Debug, serde::Serialize, serde::Deserialize)]
/// struct GreetResponse {
/// message: String,
/// }
///
/// let behaviour = cbor::Behaviour::<GreetRequest, GreetResponse>::new(
/// [(StreamProtocol::new("/my-cbor-protocol"), ProtocolSupport::Full)],
/// request_response::Config::default()
/// );
/// ```
pub type Behaviour<Req, Resp> = crate::Behaviour<codec::Codec<Req, Resp>>;

mod codec {
use async_trait::async_trait;
use futures::prelude::*;
use futures::{AsyncRead, AsyncWrite};
use libp2p_swarm::StreamProtocol;
use serde::{de::DeserializeOwned, Serialize};
use std::{io, marker::PhantomData};

/// Max request size in bytes
const REQUEST_SIZE_MAXIMUM: u64 = 1024 * 1024;
/// Max response size in bytes
const RESPONSE_SIZE_MAXIMUM: u64 = 10 * 1024 * 1024;

pub struct Codec<Req, Resp> {
phantom: PhantomData<(Req, Resp)>,
}

impl<Req, Resp> Default for Codec<Req, Resp> {
fn default() -> Self {
Codec {
phantom: PhantomData,
}
}
}

impl<Req, Resp> Clone for Codec<Req, Resp> {
fn clone(&self) -> Self {
Self::default()
}
}

#[async_trait]
impl<Req, Resp> crate::Codec for Codec<Req, Resp>
where
Req: Send + Serialize + DeserializeOwned,
Resp: Send + Serialize + DeserializeOwned,
{
type Protocol = StreamProtocol;
type Request = Req;
type Response = Resp;

async fn read_request<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Req>
where
T: AsyncRead + Unpin + Send,
{
let mut vec = Vec::new();

io.take(REQUEST_SIZE_MAXIMUM).read_to_end(&mut vec).await?;

serde_cbor::from_slice(vec.as_slice()).map_err(into_io_error)
}

async fn read_response<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Resp>
where
T: AsyncRead + Unpin + Send,
{
let mut vec = Vec::new();

io.take(RESPONSE_SIZE_MAXIMUM).read_to_end(&mut vec).await?;

serde_cbor::from_slice(vec.as_slice()).map_err(into_io_error)
}

async fn write_request<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
req: Self::Request,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
let data: Vec<u8> = serde_cbor::to_vec(&req).map_err(into_io_error)?;

io.write_all(data.as_ref()).await?;

Ok(())
}

async fn write_response<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
resp: Self::Response,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
let data: Vec<u8> = serde_cbor::to_vec(&resp).map_err(into_io_error).unwrap();

io.write_all(data.as_ref()).await?;

Ok(())
}
}

fn into_io_error(err: serde_cbor::Error) -> io::Error {
if err.is_syntax() || err.is_data() {
return io::Error::new(io::ErrorKind::InvalidData, err);
}

if err.is_eof() {
return io::Error::new(io::ErrorKind::UnexpectedEof, err);
}

io::Error::new(io::ErrorKind::Other, err)
}
}

#[cfg(test)]
mod tests {
use crate::cbor::codec::Codec;
use crate::Codec as _;
use futures::AsyncWriteExt;
use futures_ringbuf::Endpoint;
use libp2p_swarm::StreamProtocol;
use serde::{Deserialize, Serialize};

#[async_std::test]
async fn test_codec() {
let expected_request = TestRequest {
payload: "test_payload".to_string(),
};
let expected_response = TestResponse {
payload: "test_payload".to_string(),
};
let protocol = StreamProtocol::new("/test_cbor/1");
let mut codec = Codec::default();

let (mut a, mut b) = Endpoint::pair(124, 124);
codec
.write_request(&protocol, &mut a, expected_request.clone())
.await
.expect("Should write request");
a.close().await.unwrap();

let actual_request = codec
.read_request(&protocol, &mut b)
.await
.expect("Should read request");
b.close().await.unwrap();

assert_eq!(actual_request, expected_request);

let (mut a, mut b) = Endpoint::pair(124, 124);
codec
.write_response(&protocol, &mut a, expected_response.clone())
.await
.expect("Should write response");
a.close().await.unwrap();

let actual_response = codec
.read_response(&protocol, &mut b)
.await
.expect("Should read response");
b.close().await.unwrap();

assert_eq!(actual_response, expected_response);
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TestRequest {
payload: String,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TestResponse {
payload: String,
}
}
Loading

0 comments on commit a5cd0d0

Please sign in to comment.