Skip to content
This repository has been archived by the owner on Aug 30, 2022. It is now read-only.

Commit

Permalink
refactor xaynet_server services
Browse files Browse the repository at this point in the history
This refactoring started when I tried to split the message parsing
service in several components, some which I could re-use for
implementing a multipart message service. I tried to make some
simplification, and after some yak shaving ended up making quite a lot
of changes. This should make it much easier to implement the multipart
message service though.

### Add a coordinator public key validation service

This is a check we forgot to implement in the past. See also:

- http://world.std.com/~dtd/#sign_encrypt
- #514

### `PetError` removal

Only one out of the three variants is used in `xaynet_core` and
`xaynet_server`, and two in `xaynet_client`, so there is no point
sharing that error type accross the crates.

### Remove the tracing machinery

Removed `Request<T>` and the `Traceable` traits. These abstractions
were too pervasive and made the code difficult to maintain. Moreover,
out tracing implementation relied on `tower_tracing` which still
hasn't been released. Removing all these abstractions does makes the
code much more readable, and should make it easier to implement the
multipart service.

### Flatten the service error type

Many services were defined as follow:

```rust
impl Service<MyReq> for MyType {
  type Response = Result<MyResp, MyError>;
  type Error = MyServiceError;
  type Future = MyFuture

  // ..
}
```

The future returned by the service then resolved into a
`Result<Result<MyResp, MyError>, MyServiceError>`. The idea was that
the outer result would be for errors that are specific to the service
machinery, while the inner result type would be for business logic
only.

There are several problems this approach. First it induces complexity
and we quickly end up fighting the type checker when we start messing
around with the services. Second, there's not always a clear
distinction between what is business logic and what is a service
implementation detail. Finally, that's a maintenance burden because it
forces us to maintain several error types, and convert between them.

Therefore, for message services, we unified the error type under
`ServiceError`. Now services are defined as:

```rust
impl Service<MyReq> for MyType {
  type Response = MyResp;
  type Error = MyError;
  type Future = MyFuture

  // ..
}
```

### Split out the decryption service

Our work for supporting multipart messages led us to consider
delegating the encryption layer to TLS at some point. Although there
is no short term plan for that, moving the decryption logic into its
own service lays the ground for this. Moreover, it allowed us to
simplify our tests, because we can now test the message parsing
services with non-encrypted messages
  • Loading branch information
little-dude committed Sep 17, 2020
1 parent 97a64a9 commit 4d21b08
Show file tree
Hide file tree
Showing 42 changed files with 917 additions and 1,262 deletions.
13 changes: 8 additions & 5 deletions rust/xaynet-client/src/api/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,27 @@ use xaynet_core::{
SumParticipantPublicKey,
UpdateSeedDict,
};
use xaynet_server::services::{FetchError, Fetcher, PetMessageError, PetMessageHandler};
use xaynet_server::services::{
fetchers::{FetchError, Fetcher},
messages::{PetMessageHandler, ServiceError},
};

/// A client that communicates with the coordinator's API via
/// in-memory channels.
pub struct InMemoryApiClient {
fetcher: Box<dyn Fetcher + Send + Sync>,
message_handler: Box<dyn PetMessageHandler + Send + Sync>,
message_handler: PetMessageHandler,
}

impl InMemoryApiClient {
#[allow(dead_code)]
pub fn new(
fetcher: impl Fetcher + 'static + Send + Sync,
message_handler: impl PetMessageHandler + 'static + Send + Sync,
message_handler: PetMessageHandler,
) -> Self {
Self {
fetcher: Box::new(fetcher),
message_handler: Box::new(message_handler),
message_handler: message_handler,
}
}
}
Expand All @@ -33,7 +36,7 @@ impl InMemoryApiClient {
#[derive(Debug, Error)]
pub enum InMemoryApiClientError {
#[error("a PET message could not be processed by the coordinator: {0}")]
Message(#[from] PetMessageError),
Message(#[from] ServiceError),

#[error("failed to fetch data from the coordinator: {0}")]
Fetch(#[from] FetchError),
Expand Down
10 changes: 9 additions & 1 deletion rust/xaynet-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use std::time::Duration;
use thiserror::Error;
use tokio::time;

use xaynet_core::{crypto::ByteObject, mask::Model, CoordinatorPublicKey, InitError, PetError};
use xaynet_core::{crypto::ByteObject, mask::Model, CoordinatorPublicKey, InitError};

#[doc(hidden)]
pub mod mobile_client;
Expand All @@ -72,6 +72,14 @@ pub enum CachedModel {
I64(Vec<i64>),
}

#[derive(Debug, Error)]
pub enum PetError {
#[error("Invalid mask")]
InvalidMask,
#[error("Invalid model")]
InvalidModel,
}

#[derive(Debug, Error)]
/// Client-side errors
pub enum ClientError<E: ::std::error::Error + ::std::fmt::Debug + 'static> {
Expand Down
4 changes: 3 additions & 1 deletion rust/xaynet-client/src/mobile_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use crate::{
ClientError,
};
use derive_more::From;
use xaynet_core::{common::RoundParameters, crypto::ByteObject, mask::Model, InitError, PetError};
use xaynet_core::{common::RoundParameters, crypto::ByteObject, mask::Model, InitError};

use crate::PetError;

#[async_trait]
pub trait LocalModel {
Expand Down
9 changes: 7 additions & 2 deletions rust/xaynet-client/src/mobile_client/participant/sum2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ use xaynet_core::{
CoordinatorPublicKey,
ParticipantPublicKey,
ParticipantTaskSignature,
PetError,
SumParticipantEphemeralPublicKey,
SumParticipantEphemeralSecretKey,
UpdateSeedDict,
};

use crate::PetError;

#[derive(Serialize, Deserialize, Clone)]
pub struct Sum2 {
ephm_pk: SumParticipantEphemeralPublicKey,
Expand Down Expand Up @@ -70,7 +72,10 @@ impl Participant<Sum2> {
fn get_seeds(&self, seed_dict: &UpdateSeedDict) -> Result<Vec<MaskSeed>, PetError> {
seed_dict
.values()
.map(|seed| seed.decrypt(&self.inner.ephm_pk, &self.inner.ephm_sk))
.map(|seed| {
seed.decrypt(&self.inner.ephm_pk, &self.inner.ephm_sk)
.map_err(|_| PetError::InvalidMask)
})
.collect()
}

Expand Down
8 changes: 6 additions & 2 deletions rust/xaynet-client/src/participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ use xaynet_core::{
ParticipantPublicKey,
ParticipantSecretKey,
ParticipantTaskSignature,
PetError,
SumDict,
SumParticipantEphemeralPublicKey,
SumParticipantEphemeralSecretKey,
UpdateSeedDict,
};

use crate::PetError;

#[derive(Debug, PartialEq, Copy, Clone)]
/// Tasks of a participant.
pub enum Task {
Expand Down Expand Up @@ -224,7 +225,10 @@ impl Participant {
fn get_seeds(&self, seed_dict: &UpdateSeedDict) -> Result<Vec<MaskSeed>, PetError> {
seed_dict
.values()
.map(|seed| seed.decrypt(&self.ephm_pk, &self.ephm_sk))
.map(|seed| {
seed.decrypt(&self.ephm_pk, &self.ephm_sk)
.map_err(|_| PetError::InvalidMask)
})
.collect()
}

Expand Down
9 changes: 0 additions & 9 deletions rust/xaynet-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ pub mod message;

use std::collections::HashMap;

use derive_more::Display;
use thiserror::Error;

use self::crypto::{
Expand All @@ -95,14 +94,6 @@ use self::crypto::{
/// An error related to insufficient system entropy for secrets at program startup.
pub struct InitError;

#[derive(Debug, Display, Error)]
/// Errors related to the PET protocol.
pub enum PetError {
InvalidMessage,
InvalidMask,
InvalidModel,
}

/// A public encryption key that identifies a coordinator.
pub type CoordinatorPublicKey = PublicEncryptKey;

Expand Down
16 changes: 12 additions & 4 deletions rust/xaynet-core/src/mask/seed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ use derive_more::{AsMut, AsRef};
use rand::SeedableRng;
use rand_chacha::ChaCha20Rng;
use sodiumoxide::crypto::box_;
use thiserror::Error;

use crate::{
crypto::{encrypt::SEALBYTES, prng::generate_integer, ByteObject},
mask::{config::MaskConfig, object::MaskObject},
PetError,
SumParticipantEphemeralPublicKey,
SumParticipantEphemeralSecretKey,
};
Expand Down Expand Up @@ -99,6 +99,14 @@ impl ByteObject for EncryptedMaskSeed {
}
}

#[derive(Debug, Error)]
pub enum InvalidMaskSeed {
#[error("the encrypted mask seed could not be decrypted")]
DecryptionFailed,
#[error("the mask seed has an invalid length")]
InvalidLength,
}

impl EncryptedMaskSeed {
/// Decrypts this seed as a [`MaskSeed`].
///
Expand All @@ -108,13 +116,13 @@ impl EncryptedMaskSeed {
&self,
pk: &SumParticipantEphemeralPublicKey,
sk: &SumParticipantEphemeralSecretKey,
) -> Result<MaskSeed, PetError> {
) -> Result<MaskSeed, InvalidMaskSeed> {
MaskSeed::from_slice(
sk.decrypt(self.as_slice(), pk)
.or(Err(PetError::InvalidMask))?
.or(Err(InvalidMaskSeed::DecryptionFailed))?
.as_slice(),
)
.ok_or(PetError::InvalidMask)
.ok_or(InvalidMaskSeed::InvalidLength)
}
}

Expand Down
7 changes: 7 additions & 0 deletions rust/xaynet-core/src/message/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,13 @@ pub struct MessageBuffer<T> {
}

impl<T: AsRef<[u8]>> MessageBuffer<T> {
pub fn inner(&self) -> &T {
&self.inner
}

pub fn as_ref(&self) -> MessageBuffer<&T> {
MessageBuffer::new_unchecked(self.inner())
}
/// Performs bound checks for the various message fields on `bytes` and returns a new
/// [`MessageBuffer`].
///
Expand Down
5 changes: 3 additions & 2 deletions rust/xaynet-server/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ async fn main() {
metrics_sender,
)
.unwrap();
let fetcher = services::fetcher(&event_subscriber);
let message_handler = services::message_handler(&event_subscriber, requests_tx);
let fetcher = services::fetchers::fetcher(&event_subscriber);
let message_handler =
services::messages::PetMessageHandler::new(&event_subscriber, requests_tx);

tokio::select! {
_ = state_machine.run() => {
Expand Down
2 changes: 0 additions & 2 deletions rust/xaynet-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ pub mod services;
pub mod settings;
pub mod state_machine;
pub mod storage;
pub mod utils;
pub(crate) mod vendor;

#[cfg_attr(docsrs, doc(cfg(feature = "metrics")))]
#[cfg(feature = "metrics")]
Expand Down
17 changes: 8 additions & 9 deletions rust/xaynet-server/src/rest.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! A HTTP API for the PET protocol interactions.

use crate::services::{Fetcher, PetMessageHandler};
use crate::services::{fetchers::Fetcher, messages::PetMessageHandler};
use bytes::{Buf, Bytes};
use std::{convert::Infallible, net::SocketAddr};
use warp::{
Expand All @@ -15,13 +15,12 @@ use xaynet_core::{crypto::ByteObject, ParticipantPublicKey};
/// * `addr`: address of the server.
/// * `fetcher`: fetcher for responding to data requests.
/// * `pet_message_handler`: handler for responding to PET messages.
pub async fn serve<F, MH>(
pub async fn serve<F>(
addr: impl Into<SocketAddr> + 'static,
fetcher: F,
pet_message_handler: MH,
pet_message_handler: PetMessageHandler,
) where
F: Fetcher + Sync + Send + 'static + Clone,
MH: PetMessageHandler + Sync + Send + 'static + Clone,
{
let message = warp::path!("message")
.and(warp::post())
Expand Down Expand Up @@ -68,9 +67,9 @@ pub async fn serve<F, MH>(
}

/// Handles and responds to a PET message.
async fn handle_message<MH: PetMessageHandler>(
async fn handle_message(
body: Bytes,
mut handler: MH,
mut handler: PetMessageHandler,
) -> Result<impl warp::Reply, Infallible> {
let _ = handler.handle_message(body.to_vec()).await.map_err(|e| {
warn!("failed to handle message: {:?}", e);
Expand Down Expand Up @@ -191,9 +190,9 @@ async fn handle_params<F: Fetcher>(mut fetcher: F) -> Result<impl warp::Reply, I
}

/// Converts a PET message handler into a `warp` filter.
fn with_message_handler<MH: PetMessageHandler + Send + Sync + 'static + Clone>(
handler: MH,
) -> impl Filter<Extract = (MH,), Error = Infallible> + Clone {
fn with_message_handler(
handler: PetMessageHandler,
) -> impl Filter<Extract = (PetMessageHandler,), Error = Infallible> + Clone {
warp::any().map(move || handler.clone())
}

Expand Down
16 changes: 4 additions & 12 deletions rust/xaynet-server/src/services/fetchers/mask_length.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,14 @@ use std::task::{Context, Poll};

use futures::future::{self, Ready};
use tower::Service;
use tracing::Span;
use tracing_futures::{Instrument, Instrumented};

use crate::{
state_machine::events::{EventListener, EventSubscriber, MaskLengthUpdate},
utils::Traceable,
};
use crate::state_machine::events::{EventListener, EventSubscriber, MaskLengthUpdate};

/// [`MaskLengthService`]'s request type
#[derive(Default, Clone, Eq, PartialEq, Debug)]
pub struct MaskLengthRequest;

impl Traceable for MaskLengthRequest {
fn make_span(&self) -> Span {
error_span!("mask_length_fetch_request")
}
}

/// [`MaskLengthService`]'s response type.
///
/// The response is `None` when the mask length is not currently
Expand All @@ -37,7 +28,7 @@ impl MaskLengthService {
impl Service<MaskLengthRequest> for MaskLengthService {
type Response = MaskLengthResponse;
type Error = ::std::convert::Infallible;
type Future = Ready<Result<Self::Response, Self::Error>>;
type Future = Instrumented<Ready<Result<Self::Response, Self::Error>>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
Expand All @@ -48,5 +39,6 @@ impl Service<MaskLengthRequest> for MaskLengthService {
MaskLengthUpdate::Invalidate => Ok(None),
MaskLengthUpdate::New(mask_length) => Ok(Some(mask_length)),
})
.instrument(error_span!("mask_length_fetch_request"))
}
}
47 changes: 39 additions & 8 deletions rust/xaynet-server/src/services/fetchers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ pub use self::{
use std::task::{Context, Poll};

use futures::future::poll_fn;
use tower::{layer::Layer, Service};
use tracing_futures::{Instrument, Instrumented};
use tower::{layer::Layer, Service, ServiceBuilder};

use crate::utils::{Request, Traceable};
use crate::state_machine::events::EventSubscriber;

/// A single interface for retrieving data from the coordinator.
#[async_trait]
Expand Down Expand Up @@ -155,20 +154,17 @@ pub(in crate::services) struct FetcherService<S>(S);
impl<S, R> Service<R> for FetcherService<S>
where
S: Service<R>,
R: Traceable,
{
type Response = S::Response;
type Error = S::Error;
type Future = Instrumented<S::Future>;
type Future = S::Future;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx)
}

fn call(&mut self, req: R) -> Self::Future {
let req = Request::new(req);
let span = req.span();
self.0.call(req.into_inner()).instrument(span)
self.0.call(req)
}
}

Expand Down Expand Up @@ -210,3 +206,38 @@ impl<RoundParams, SumDict, SeedDict, MaskLength, Model>
}
}
}

/// Construct a [`Fetcher`] service
pub fn fetcher(event_subscriber: &EventSubscriber) -> impl Fetcher + Sync + Send + Clone + 'static {
let round_params = ServiceBuilder::new()
.buffer(100)
.concurrency_limit(100)
.layer(FetcherLayer)
.service(RoundParamsService::new(event_subscriber));

let mask_length = ServiceBuilder::new()
.buffer(100)
.concurrency_limit(100)
.layer(FetcherLayer)
.service(MaskLengthService::new(event_subscriber));

let model = ServiceBuilder::new()
.buffer(100)
.concurrency_limit(100)
.layer(FetcherLayer)
.service(ModelService::new(event_subscriber));

let sum_dict = ServiceBuilder::new()
.buffer(100)
.concurrency_limit(100)
.layer(FetcherLayer)
.service(SumDictService::new(event_subscriber));

let seed_dict = ServiceBuilder::new()
.buffer(100)
.concurrency_limit(100)
.layer(FetcherLayer)
.service(SeedDictService::new(event_subscriber));

Fetchers::new(round_params, sum_dict, seed_dict, mask_length, model)
}
Loading

0 comments on commit 4d21b08

Please sign in to comment.