Skip to content

Commit

Permalink
Fix: For single-threaded execution, there is no need for Send/`Sync…
Browse files Browse the repository at this point in the history
…` bounds.

Some preliminary implementation with `OptionalSend`/`OptionalSync` was done
previously, it was not complete, though.

Currently, `Send`/`Sync` bounds are required for `AppData`, `RaftEntry`,
async runtime wrappers and other types, but they are only relevant for
multi-threaded access. If `openraft` is configured to use single-threaded
runtime, then they don't have to be `Send`/`Sync`.

Consequently replace `Send`/`Sync` bounds with `OptionalSend`/`OptionalSync`
bounds to clean it up.

Even if there are no `Send`/`Sync` bounds required, the `Raft` object can
still be `Send`/`Sync` capable. It is only an API object sending requests
to the Raft main loop over a channel. As long as the involved data types
are `Send` (and some of them, which are used in `RaftInner`, also `Sync`),
we can declare `Raft` as such `Send`/`Sync`.

This change also fixes `timeout.rs`, which seems to be unused so far, but
didn't properly use `AsyncRuntime` abstraction.

Two points for the future:
- We should add a generic test invocation to invoke tests on `LocalSet`
  for tests with `singlethreaded` feature. Currently, there is a single
  test only, in `timeout_test.rs`.
- Later, even `Arc` could be replaced with a single-threaded counterpart
  to prevent the need for atomic refcounting.
  • Loading branch information
schreter committed Nov 18, 2023
1 parent 91888b1 commit 4ad89fe
Show file tree
Hide file tree
Showing 19 changed files with 159 additions and 53 deletions.
6 changes: 3 additions & 3 deletions openraft/src/async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use crate::TokioInstant;
/// ## Note
///
/// The default asynchronous runtime is `tokio`.
pub trait AsyncRuntime: Debug + Default + Send + Sync + 'static {
pub trait AsyncRuntime: Debug + Default + OptionalSend + OptionalSync + 'static {
/// The error type of [`Self::JoinHandle`].
type JoinError: Debug + Display + Send;
type JoinError: Debug + Display + OptionalSend;

/// The return type of [`Self::spawn`].
type JoinHandle<T: OptionalSend + 'static>: Future<Output = Result<T, Self::JoinError>>
Expand All @@ -33,7 +33,7 @@ pub trait AsyncRuntime: Debug + Default + Send + Sync + 'static {
type Instant: Instant;

/// The timeout error type.
type TimeoutError: Debug + Display + Send;
type TimeoutError: Debug + Display + OptionalSend;

/// The timeout type used by [`Self::timeout`] and [`Self::timeout_at`] that enables the user
/// to await the outcome of a [`Future`].
Expand Down
8 changes: 7 additions & 1 deletion openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,19 @@ where
tx: ResultSender<ClientWriteResponse<C>, ClientWriteError<C::NodeId, C::Node>>,
},

#[allow(clippy::type_complexity)]
ExternalRequest {
#[allow(clippy::type_complexity)]
#[cfg(not(feature = "singlethreaded"))]
req: Box<
dyn FnOnce(&RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>, &mut LS, &mut N)
+ Send
+ 'static,
>,
#[cfg(feature = "singlethreaded")]
req: Box<
dyn FnOnce(&RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>, &mut LS, &mut N)
+ 'static,
>,
},

ExternalCommand {
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/defensive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ use std::ops::RangeBounds;
use crate::log_id::RaftLogId;
use crate::DefensiveError;
use crate::ErrorSubject;
use crate::OptionalSend;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::Violation;

pub fn check_range_matches_entries<C: RaftTypeConfig, RB: RangeBounds<u64> + Debug + Send>(
pub fn check_range_matches_entries<C: RaftTypeConfig, RB: RangeBounds<u64> + Debug + OptionalSend>(
range: RB,
entries: &[C::Entry],
) -> Result<(), StorageError<C::NodeId>> {
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/docs/feature_flags/feature-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ By default openraft enables no features.
V2 storage separates log store and state machine store so that log IO and state machine IO can be parallelized naturally.
<br/><br/>

- `singlethreaded`: removes `Send` bounds from `AppData`, `AppDataResponse`, `RaftEntry`, and `SnapshotData` to force the
asynchronous runtime to spawn any tasks in the current thread.
- `singlethreaded`: removes `Send` and `Sync` bounds from `AppData`, `AppDataResponse`, `RaftEntry`, `SnapshotData`
and other types to force the asynchronous runtime to spawn any tasks in the current thread.
This is for any single-threaded application that never allows a raft instance to be shared among multiple threads.
In order to use the feature, `AsyncRuntime::spawn` should invoke `tokio::task::spawn_local` or equivalents.
<br/><br/>
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/entry/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::Node;
use crate::NodeId;
use crate::OptionalSend;
use crate::OptionalSerde;
use crate::OptionalSync;

/// Defines operations on an entry payload.
pub trait RaftPayload<NID, N>
Expand All @@ -27,7 +28,7 @@ pub trait RaftEntry<NID, N>: RaftPayload<NID, N> + RaftLogId<NID>
where
N: Node,
NID: NodeId,
Self: OptionalSerde + Debug + Display + OptionalSend + Sync,
Self: OptionalSerde + Debug + Display + OptionalSend + OptionalSync,
{
/// Create a new blank log entry.
///
Expand Down
7 changes: 5 additions & 2 deletions openraft/src/instant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use std::panic::RefUnwindSafe;
use std::panic::UnwindSafe;
use std::time::Duration;

use crate::OptionalSend;
use crate::OptionalSync;

/// A measurement of a monotonically non-decreasing clock.
pub trait Instant:
Add<Duration, Output = Self>
Expand All @@ -19,11 +22,11 @@ pub trait Instant:
+ PartialEq
+ PartialOrd
+ RefUnwindSafe
+ Send
+ OptionalSend
+ Sub<Duration, Output = Self>
+ Sub<Self, Output = Duration>
+ SubAssign<Duration>
+ Sync
+ OptionalSync
+ Unpin
+ UnwindSafe
+ 'static
Expand Down
8 changes: 4 additions & 4 deletions openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,9 @@ impl<T: Sync + ?Sized> OptionalSync for T {}
/// ## Note
///
/// The trait is automatically implemented for all types which satisfy its supertraits.
pub trait AppData: OptionalSend + Sync + 'static + OptionalSerde {}
pub trait AppData: OptionalSend + OptionalSync + 'static + OptionalSerde {}

impl<T> AppData for T where T: OptionalSend + Sync + 'static + OptionalSerde {}
impl<T> AppData for T where T: OptionalSend + OptionalSync + 'static + OptionalSerde {}

/// A trait defining application specific response data.
///
Expand All @@ -210,6 +210,6 @@ impl<T> AppData for T where T: OptionalSend + Sync + 'static + OptionalSerde {}
/// ## Note
///
/// The trait is automatically implemented for all types which satisfy its supertraits.
pub trait AppDataResponse: OptionalSend + Sync + 'static + OptionalSerde {}
pub trait AppDataResponse: OptionalSend + OptionalSync + 'static + OptionalSerde {}

impl<T> AppDataResponse for T where T: OptionalSend + Sync + 'static + OptionalSerde {}
impl<T> AppDataResponse for T where T: OptionalSend + OptionalSync + 'static + OptionalSerde {}
7 changes: 6 additions & 1 deletion openraft/src/network/backoff.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
use std::time::Duration;

use crate::OptionalSend;

/// A backoff instance that is an infinite iterator of durations to sleep before next retry, when a
/// [`Unreachable`](`crate::error::Unreachable`) occurs.
pub struct Backoff {
#[cfg(not(feature = "singlethreaded"))]
inner: Box<dyn Iterator<Item = Duration> + Send + 'static>,
#[cfg(feature = "singlethreaded")]
inner: Box<dyn Iterator<Item = Duration> + 'static>,
}

impl Backoff {
pub fn new(iter: impl Iterator<Item = Duration> + Send + 'static) -> Self {
pub fn new(iter: impl Iterator<Item = Duration> + OptionalSend + 'static) -> Self {
Self { inner: Box::new(iter) }
}
}
Expand Down
4 changes: 3 additions & 1 deletion openraft/src/network/factory.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use macros::add_async_trait;

use crate::network::RaftNetwork;
use crate::OptionalSend;
use crate::OptionalSync;
use crate::RaftTypeConfig;

/// A trait defining the interface for a Raft network factory to create connections between cluster
Expand All @@ -12,7 +14,7 @@ use crate::RaftTypeConfig;
/// Typically, the network implementation as such will be hidden behind a `Box<T>` or `Arc<T>` and
/// this interface implemented on the `Box<T>` or `Arc<T>`.
#[add_async_trait]
pub trait RaftNetworkFactory<C>: Send + Sync + 'static
pub trait RaftNetworkFactory<C>: OptionalSend + OptionalSync + 'static
where C: RaftTypeConfig
{
/// Actual type of the network handling a single connection.
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/network/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::raft::InstallSnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::OptionalSend;
use crate::OptionalSync;
use crate::RaftTypeConfig;

/// A trait defining the interface for a Raft network between cluster members.
Expand All @@ -36,7 +37,7 @@ use crate::RaftTypeConfig;
///
/// - Implementing the new APIs will disable the old APIs.
#[add_async_trait]
pub trait RaftNetwork<C>: OptionalSend + Sync + 'static
pub trait RaftNetwork<C>: OptionalSend + OptionalSync + 'static
where C: RaftTypeConfig
{
/// Send an AppendEntries RPC to the target.
Expand Down
30 changes: 25 additions & 5 deletions openraft/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,32 @@ use std::fmt::Display;
use std::fmt::Formatter;
use std::hash::Hash;

use crate::OptionalSend;
use crate::OptionalSync;

/// Essential trait bound for node-id, except serde.
#[doc(hidden)]
pub trait NodeIdEssential:
Sized + Send + Sync + Eq + PartialEq + Ord + PartialOrd + Debug + Display + Hash + Copy + Clone + Default + 'static
Sized
+ OptionalSend
+ OptionalSync
+ Eq
+ PartialEq
+ Ord
+ PartialOrd
+ Debug
+ Display
+ Hash
+ Copy
+ Clone
+ Default
+ 'static
{
}

impl<T> NodeIdEssential for T where T: Sized
+ Send
+ Sync
+ OptionalSend
+ OptionalSync
+ Eq
+ PartialEq
+ Ord
Expand Down Expand Up @@ -43,8 +59,12 @@ pub trait NodeId: NodeIdEssential {}
impl<T> NodeId for T where T: NodeIdEssential {}

/// Essential trait bound for application level node-data, except serde.
pub trait NodeEssential: Sized + Send + Sync + Eq + PartialEq + Debug + Clone + Default + 'static {}
impl<T> NodeEssential for T where T: Sized + Send + Sync + Eq + PartialEq + Debug + Clone + Default + 'static {}
pub trait NodeEssential:
Sized + OptionalSend + OptionalSync + Eq + PartialEq + Debug + Clone + Default + 'static
{
}
impl<T> NodeEssential for T where T: Sized + OptionalSend + OptionalSync + Eq + PartialEq + Debug + Clone + Default + 'static
{}

/// A Raft `Node`, this trait holds all relevant node information.
///
Expand Down
44 changes: 43 additions & 1 deletion openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use crate::ChangeMembers;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::MessageSummary;
use crate::OptionalSend;
use crate::RaftState;
pub use crate::RaftTypeConfig;
use crate::StorageHelper;
Expand Down Expand Up @@ -149,6 +150,47 @@ where
}
}

#[cfg(feature = "singlethreaded")]
// SAFETY: Even for a single-threaded Raft, the API object is MT-capable.
//
// The API object just sends the requests to the Raft loop over a channel. If all the relevant
// types in the type config are `Send`, then it's safe to send the request across threads over
// the channel.
//
// Notably, the state machine, log storage and network factory DO NOT have to be `Send`, those
// are only used within Raft task(s) on a single thread.
unsafe impl<C, N, LS, SM> Send for Raft<C, N, LS, SM>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
C::D: Send,
C::Entry: Send,
C::Node: Send + Sync,
C::NodeId: Send + Sync,
C::R: Send,
{
}

#[cfg(feature = "singlethreaded")]
// SAFETY: Even for a single-threaded Raft, the API object is MT-capable.
//
// See above for details.
unsafe impl<C, N, LS, SM> Sync for Raft<C, N, LS, SM>
where
C: RaftTypeConfig + Send,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
C::D: Send,
C::Entry: Send,
C::Node: Send + Sync,
C::NodeId: Send + Sync,
C::R: Send,
{
}

impl<C, N, LS, SM> Raft<C, N, LS, SM>
where
C: RaftTypeConfig,
Expand Down Expand Up @@ -698,7 +740,7 @@ where
/// destroyed right away and not called at all.
pub fn external_request<
F: FnOnce(&RaftState<C::NodeId, C::Node, <C::AsyncRuntime as AsyncRuntime>::Instant>, &mut LS, &mut N)
+ Send
+ OptionalSend
+ 'static,
>(
&self,
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/storage/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::storage::RaftStateMachine;
use crate::LogId;
use crate::LogState;
use crate::OptionalSend;
use crate::OptionalSync;
use crate::RaftLogReader;
use crate::RaftStorage;
use crate::RaftTypeConfig;
Expand Down Expand Up @@ -103,7 +104,7 @@ where
C: RaftTypeConfig,
S: RaftStorage<C>,
{
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend + OptionalSync>(
&mut self,
range: RB,
) -> Result<Vec<C::Entry>, StorageError<C::NodeId>> {
Expand Down
4 changes: 3 additions & 1 deletion openraft/src/storage/log_store_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use macros::add_async_trait;

use crate::defensive::check_range_matches_entries;
use crate::LogId;
use crate::OptionalSend;
use crate::OptionalSync;
use crate::RaftLogId;
use crate::RaftLogReader;
use crate::RaftTypeConfig;
Expand All @@ -26,7 +28,7 @@ where C: RaftTypeConfig
///
/// Similar to `try_get_log_entries` except an error will be returned if there is an entry not
/// found in the specified range.
async fn get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
async fn get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend + OptionalSync>(
&mut self,
range: RB,
) -> Result<Vec<C::Entry>, StorageError<C::NodeId>> {
Expand Down
9 changes: 5 additions & 4 deletions openraft/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::LogId;
use crate::MessageSummary;
use crate::NodeId;
use crate::OptionalSend;
use crate::OptionalSync;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::StoredMembership;
Expand Down Expand Up @@ -138,7 +139,7 @@ pub struct LogState<C: RaftTypeConfig> {
/// this interface implemented on the `Arc<T>`. It can be co-implemented with [`RaftStorage`]
/// interface on the same cloneable object, if the underlying state machine is anyway synchronized.
#[add_async_trait]
pub trait RaftLogReader<C>: Send + Sync + 'static
pub trait RaftLogReader<C>: OptionalSend + OptionalSync + 'static
where C: RaftTypeConfig
{
/// Get a series of log entries from storage.
Expand All @@ -147,7 +148,7 @@ where C: RaftTypeConfig
/// stop)`.
///
/// Entry that is not found is allowed.
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend + OptionalSync>(
&mut self,
range: RB,
) -> Result<Vec<C::Entry>, StorageError<C::NodeId>>;
Expand All @@ -162,7 +163,7 @@ where C: RaftTypeConfig
/// co-implemented with [`RaftStorage`] interface on the same cloneable object, if the underlying
/// state machine is anyway synchronized.
#[add_async_trait]
pub trait RaftSnapshotBuilder<C>: Send + Sync + 'static
pub trait RaftSnapshotBuilder<C>: OptionalSend + OptionalSync + 'static
where C: RaftTypeConfig
{
/// Build snapshot
Expand Down Expand Up @@ -192,7 +193,7 @@ where C: RaftTypeConfig
/// The implementation of the API has to cope with (infrequent) concurrent access from these two
/// components.
#[add_async_trait]
pub trait RaftStorage<C>: RaftLogReader<C> + Send + Sync + 'static
pub trait RaftStorage<C>: RaftLogReader<C> + OptionalSend + OptionalSync + 'static
where C: RaftTypeConfig
{
/// Log reader type.
Expand Down
Loading

0 comments on commit 4ad89fe

Please sign in to comment.