Skip to content
This repository has been archived by the owner on Aug 30, 2022. It is now read-only.

Commit

Permalink
Update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert-Steiner committed Sep 7, 2020
1 parent dead5d5 commit 3d8db52
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 29 deletions.
87 changes: 87 additions & 0 deletions rust/xaynet-server/src/storage/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,91 @@ fn redis_type_error(desc: &'static str, details: Option<String>) -> RedisError {
}
}

/// Implements ['FromRedisValue'] and ['ToRedisArgs'] for ['ByteObject'].
/// The Redis traits as well as the crypto types are both defined in foreign crates.
/// To bypass the restrictions of orphan rule, we use `Newtypes` for the crypto types.
///
/// Each crypto type has two `Newtypes`, one for reading and one for writing.
/// The difference between `Read` and `Write` is that the write `Newtype` does not take the
/// ownership of the value but only a reference. This allows us to use references in the
/// RedisStore methods. The `Read` Newtype also implements [`ToRedisArgs`] to reduce the
/// conversion overhead that you would get if you want to reuse a `Read` value for a Redis query.
///
/// Example:
///
/// ```ignore
/// let sum_pks: Vec<PublicSigningKeyRead> = self.connection.hkeys("sum_dict").await?;
/// for sum_pk in sum_pks {
/// let sum_pk_seed_dict: HashMap<PublicSigningKeyRead, EncryptedMaskSeedRead>
/// = self.connection.hgetall(&sum_pk).await?; // no need to convert sum_pk from PublicSigningKeyRead to PublicSigningKeyWrite
/// }
/// ```
///
/// # Example:
///
/// ```ignore
/// impl_byte_object_redis_traits!(EncryptedMaskSeed);
/// ```
///
/// This expands to:
///
/// ```ignore
/// impl FromRedisValue for EncryptedMaskSeedRead {
/// fn from_redis_value(v: &Value) -> RedisResult<EncryptedMaskSeedRead> {
/// match *v {
/// Value::Data(ref bytes) => {
/// let inner = <EncryptedMaskSeed>::from_slice(bytes)
/// .ok_or_else(|| redis_type_error("Invalid EncryptedMaskSeed", None))?;
/// Ok(EncryptedMaskSeedRead(inner))
/// }
/// _ => Err(redis_type_error(
/// "Response not EncryptedMaskSeed compatible",
/// None,
/// )),
/// }
/// }
/// }
/// impl ToRedisArgs for EncryptedMaskSeedRead {
/// fn write_redis_args<W>(&self, out: &mut W)
/// where
/// W: ?Sized + RedisWrite,
/// {
/// self.0.as_slice().write_redis_args(out)
/// }
/// }
/// impl<'a> ToRedisArgs for &'a EncryptedMaskSeedRead {
/// fn write_redis_args<W>(&self, out: &mut W)
/// where
/// W: ?Sized + RedisWrite,
/// {
/// self.0.as_slice().write_redis_args(out)
/// }
/// }
/// pub(crate) struct EncryptedMaskSeedWrite<'a>(&'a EncryptedMaskSeed);
/// impl<'a> ::core::convert::From<(&'a EncryptedMaskSeed)> for EncryptedMaskSeedWrite<'a> {
/// #[allow(unused_variables)]
/// #[inline]
/// fn from(original: (&'a EncryptedMaskSeed)) -> EncryptedMaskSeedWrite<'a> {
/// EncryptedMaskSeedWrite(original)
/// }
/// }
/// impl ToRedisArgs for EncryptedMaskSeedWrite<'_> {
/// fn write_redis_args<W>(&self, out: &mut W)
/// where
/// W: ?Sized + RedisWrite,
/// {
/// self.0.as_slice().write_redis_args(out)
/// }
/// }
/// impl<'a> ToRedisArgs for &'a EncryptedMaskSeedWrite<'a> {
/// fn write_redis_args<W>(&self, out: &mut W)
/// where
/// W: ?Sized + RedisWrite,
/// {
/// self.0.as_slice().write_redis_args(out)
/// }
/// }
/// ```
macro_rules! impl_byte_object_redis_traits {
($ty: ty) => {
paste! {
Expand Down Expand Up @@ -84,6 +169,8 @@ impl_byte_object_redis_traits!(PublicEncryptKey);
impl_byte_object_redis_traits!(PublicSigningKey);
impl_byte_object_redis_traits!(EncryptedMaskSeed);

/// Same as the macro [`impl_byte_object_redis_traits!`] but it uses bincode to de/serialize the
/// data.
macro_rules! impl_bincode_redis_traits {
($ty: ty) => {
impl FromRedisValue for $ty {
Expand Down
67 changes: 38 additions & 29 deletions rust/xaynet-server/src/storage/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,14 @@ use crate::{
PublicSigningKeyWrite,
},
};
use redis::{aio::MultiplexedConnection, AsyncCommands, Client, RedisError, RedisResult};
use redis::{
aio::MultiplexedConnection,
AsyncCommands,
Client,
IntoConnectionInfo,
RedisError,
RedisResult,
};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
Expand Down Expand Up @@ -39,18 +46,24 @@ pub struct Connection {
}

impl RedisStore {
/// Create a new store. `url` is the URL to connect to the redis
/// instance, and `n` is the maximum number of concurrent
/// connections to the redis instance.
pub async fn new<S: Into<String>>(url: S, n: usize) -> Result<Self, RedisError> {
let client = Client::open(url.into())?;
/// Creates a new Redis store.
///
/// `url` to which Redis instance the client should connect to.
/// The URL format is `redis://[<username>][:<passwd>@]<hostname>[:port][/<db>]`.
/// `n` is the maximum number of concurrent connections to the redis instance.
pub async fn new<T: IntoConnectionInfo>(url: T, n: usize) -> Result<Self, RedisError> {
let client = Client::open(url)?;
let connection = client.get_multiplexed_tokio_connection().await?;
Ok(Self {
raw_connection: connection,
semaphore: Arc::new(Semaphore::new(n)),
})
}

/// Acquires a new connection.
///
/// If no remaining connections are available, the method waits until an
/// outstanding connection is dropped.
pub async fn connection(self) -> Connection {
let _permit = self.semaphore.acquire_owned().await;
Connection {
Expand All @@ -61,18 +74,19 @@ impl RedisStore {
}

impl Connection {
/// Retrieve the [`CoordinatorState`].
/// Retrieves a [`CoordinatorState`].
pub async fn get_coordinator_state(mut self) -> Result<CoordinatorState, RedisError> {
self.connection.get("coordinator_state").await
}

/// Store the [`CoordinatorState`].
/// Stores a [`CoordinatorState`].
///
/// If the coordinator state already exists, it is overwritten.
pub async fn set_coordinator_state(mut self, state: &CoordinatorState) -> RedisResult<()> {
self.connection.set("coordinator_state", state).await
}

/// Retrieve the entries [`SumDict`].
/// Retrieves the entries [`SumDict`].
pub async fn get_sum_dict(mut self) -> Result<SumDict, RedisError> {
let result: Vec<(PublicSigningKeyRead, PublicEncryptKeyRead)> =
self.connection.hgetall("sum_dict").await?;
Expand All @@ -84,7 +98,8 @@ impl Connection {
Ok(sum_dict)
}

/// Store a new [`SumDict`] entry.
/// Stores a new [`SumDict`] entry.
///
/// Returns `1` if field is a new and `0` if field already exists.
pub async fn add_sum_participant(
mut self,
Expand All @@ -100,7 +115,8 @@ impl Connection {
.await
}

/// Remove an entry in the [`SumDict`].
/// Removes an entry in the [`SumDict`].
///
/// Returns `1` if field was deleted and `0` if field does not exists.
pub async fn remove_sum_dict_entry(
mut self,
Expand All @@ -111,20 +127,20 @@ impl Connection {
.await
}

/// Retrieve the length of the [`SumDict`].
/// Retrieves the length of the [`SumDict`].
pub async fn get_sum_dict_len(mut self) -> Result<usize, RedisError> {
self.connection.hlen("sum_dict").await
}

/// Retrieve the sum_pks of the [`SumDict`].
/// Retrieves the sum_pks of the [`SumDict`].
pub async fn get_sum_pks(mut self) -> Result<HashSet<SumParticipantPublicKey>, RedisError> {
let result: HashSet<PublicSigningKeyRead> = self.connection.hkeys("sum_dict").await?;
let sum_pks = result.into_iter().map(|pk| pk.into()).collect();

Ok(sum_pks)
}

/// Retrieve [`SeedDict`] entry for the given sum participant.
/// Retrieves [`SeedDict`] entry for the given ['SumParticipantPublicKey'].
pub async fn get_seed_dict_for_sum_pk(
mut self,
sum_pk: &SumParticipantPublicKey,
Expand All @@ -141,7 +157,7 @@ impl Connection {
Ok(seed_dict)
}

/// Retrieve the whole [`SeedDict`].
/// Retrieves the full [`SeedDict`].
pub async fn get_seed_dict(mut self) -> Result<SeedDict, RedisError> {
let sum_pks: Vec<PublicSigningKeyRead> = self.connection.hkeys("sum_dict").await?;

Expand All @@ -161,8 +177,8 @@ impl Connection {
Ok(seed_dict)
}

/// Update the [`SeedDict`] with the seeds from the given update
/// participant, and return the number of participants that already submitted an update.
/// Updates the [`SeedDict`] with the seeds from the given ['UpdateParticipantPublicKey'],
/// and returns the number of participants that already submitted an update.
pub async fn update_seed_dict(
mut self,
update_pk: &UpdateParticipantPublicKey,
Expand All @@ -186,7 +202,8 @@ impl Connection {
pipe.atomic().query_async(&mut self.connection).await
}

/// Update the [`MaskDict`] with the given [`MaskObject`].
/// Updates the [`MaskDict`] with the given [`MaskObject`].
///
/// The score/counter of the given mask is incremented by `1`.
pub async fn incr_mask_count(mut self, mask: &MaskObject) -> RedisResult<()> {
redis::pipe()
Expand All @@ -196,7 +213,7 @@ impl Connection {
Ok(())
}

/// Retrieve the two masks with the highest score.
/// Retrieves the two masks with the highest score.
pub async fn get_best_masks(mut self) -> Result<Vec<(MaskObject, usize)>, RedisError> {
let result: Vec<(MaskObjectRead, usize)> = self
.connection
Expand All @@ -209,23 +226,15 @@ impl Connection {
.collect())
}

pub async fn schedule_snapshot(mut self) -> RedisResult<()> {
redis::cmd("BGSAVE")
.arg("SCHEDULE")
.query_async(&mut self.connection)
.await?;
Ok(())
}

/// Delete all data in the current database.
/// Deletes all data in the current database.
pub async fn flushdb(mut self) -> RedisResult<()> {
redis::cmd("FLUSHDB")
.arg("ASYNC")
.query_async(&mut self.connection)
.await
}

/// Delete the dictionaries [`SumDict`], [`SeedDict`] and [`MaskDict`].
/// Deletes the dictionaries [`SumDict`], [`SeedDict`] and [`MaskDict`].
pub async fn flush_dicts(mut self) -> RedisResult<()> {
let sum_pks: Vec<PublicSigningKeyRead> = self.connection.hkeys("sum_dict").await?;
let mut pipe = redis::pipe();
Expand Down

0 comments on commit 3d8db52

Please sign in to comment.