Skip to content

Commit

Permalink
fix: update handling of SAF message propagation and deletion (#3164)
Browse files Browse the repository at this point in the history
## Description

This PR adds two changes to the way SAF messages are handled to fix two subtle bugs spotted while developing cucumber tests.

The first issue was that when a Node propagates a SAF message it was storing to other nodes in its neighbourhood the broadcast strategy it was using only chose from currently connected base nodes. This meant that if the Node had an active connection to a Communication Client (wallet) it would not just directly send the SAF message to that client but to other base nodes in the network region. This meant that the wallet would only receive new SAF message when it actively requested them on connection even though it was directly connected to the node.

This PR adds a new broadcast strategy called `DirectOrClosestNodes` which will first check if the node has a direct active connection and if it does just send the SAF message directly to its destination.

The second issue was a subtle problem where when a node starts to send SAF messages to a destination it would remove the messages from the database based only on whether the outbound messages were put onto the outbound message pipeline. The problem occurs when the TCP connection to that peer is actually broken the sending of those messages would fail at the end of the pipeline but the SAF messages were already deleted from the database.

This PR changes the way SAF messages are deleted. When a client asks a node for SAF message it will also provide a timestamp of the most recent SAF message it has received. The Node will then send all SAF messages since that timestamp that it has for the node and will delete all SAF messages from before the specified Timestamp.  This serves as a form of Ack that the client has received the older messages at some point and they are no longer needed.

## How Has This Been Tested?
Unit tests have been updated to test this functionality.

## Checklist:
<!--- Go over all the following points, and put an `x` in all the boxes that apply. -->
* [x] I'm merging against the `development` branch.
* [x] I have squashed my commits into a single commit.
  • Loading branch information
aviator-app[bot] authored Aug 11, 2021
2 parents 4529cc7 + a65eecb commit cedb4ef
Show file tree
Hide file tree
Showing 13 changed files with 341 additions and 122 deletions.
119 changes: 81 additions & 38 deletions comms/dht/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
//! [DhtRequest]: ./enum.DhtRequest.html

use crate::{
broadcast_strategy::BroadcastStrategy,
broadcast_strategy::{BroadcastClosestRequest, BroadcastStrategy},
dedup::DedupCacheDatabase,
discovery::DhtDiscoveryError,
outbound::{DhtOutboundError, OutboundMessageRequester, SendMessageParams},
Expand Down Expand Up @@ -416,43 +416,19 @@ impl DhtActor {
.await?;
Ok(peers.into_iter().map(|p| p.peer_node_id().clone()).collect())
},
Closest(closest_request) => {
let connections = connectivity
.select_connections(ConnectivitySelection::closest_to(
closest_request.node_id.clone(),
config.broadcast_factor,
closest_request.excluded_peers.clone(),
))
.await?;

let mut candidates = connections
.iter()
.map(|conn| conn.peer_node_id())
.cloned()
.collect::<Vec<_>>();

if !closest_request.connected_only {
let excluded = closest_request
.excluded_peers
.iter()
.chain(candidates.iter())
.cloned()
.collect::<Vec<_>>();
// If we don't have enough connections, let's select some more disconnected peers (at least 2)
let n = cmp::max(config.broadcast_factor.saturating_sub(candidates.len()), 2);
let additional = Self::select_closest_peers_for_propagation(
&peer_manager,
&closest_request.node_id,
n,
&excluded,
PeerFeatures::MESSAGE_PROPAGATION,
)
.await?;

candidates.extend(additional);
ClosestNodes(closest_request) => {
Self::select_closest_node_connected(closest_request, config, connectivity, peer_manager).await
},
DirectOrClosestNodes(closest_request) => {
// First check if a direct connection exists
if connectivity
.get_connection(closest_request.node_id.clone())
.await?
.is_some()
{
return Ok(vec![closest_request.node_id.clone()]);
}

Ok(candidates)
Self::select_closest_node_connected(closest_request, config, connectivity, peer_manager).await
},
Random(n, excluded) => {
// Send to a random set of peers of size n that are Communication Nodes
Expand Down Expand Up @@ -659,6 +635,50 @@ impl DhtActor {

Ok(peers.into_iter().map(|p| p.node_id).collect())
}

async fn select_closest_node_connected(
closest_request: Box<BroadcastClosestRequest>,
config: DhtConfig,
mut connectivity: ConnectivityRequester,
peer_manager: Arc<PeerManager>,
) -> Result<Vec<NodeId>, DhtActorError> {
let connections = connectivity
.select_connections(ConnectivitySelection::closest_to(
closest_request.node_id.clone(),
config.broadcast_factor,
closest_request.excluded_peers.clone(),
))
.await?;

let mut candidates = connections
.iter()
.map(|conn| conn.peer_node_id())
.cloned()
.collect::<Vec<_>>();

if !closest_request.connected_only {
let excluded = closest_request
.excluded_peers
.iter()
.chain(candidates.iter())
.cloned()
.collect::<Vec<_>>();
// If we don't have enough connections, let's select some more disconnected peers (at least 2)
let n = cmp::max(config.broadcast_factor.saturating_sub(candidates.len()), 2);
let additional = Self::select_closest_peers_for_propagation(
&peer_manager,
&closest_request.node_id,
n,
&excluded,
PeerFeatures::MESSAGE_PROPAGATION,
)
.await?;

candidates.extend(additional);
}

Ok(candidates)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -888,6 +908,7 @@ mod test {
connectivity_manager_mock_state
.set_selected_connections(vec![conn_out.clone()])
.await;

let peers = requester
.select_peers(BroadcastStrategy::Broadcast(Vec::new()))
.await
Expand Down Expand Up @@ -915,7 +936,29 @@ mod test {
connected_only: false,
});
let peers = requester
.select_peers(BroadcastStrategy::Closest(send_request))
.select_peers(BroadcastStrategy::ClosestNodes(send_request))
.await
.unwrap();
assert_eq!(peers.len(), 2);

let send_request = Box::new(BroadcastClosestRequest {
node_id: node_identity.node_id().clone(),
excluded_peers: vec![],
connected_only: false,
});
let peers = requester
.select_peers(BroadcastStrategy::DirectOrClosestNodes(send_request))
.await
.unwrap();
assert_eq!(peers.len(), 1);

let send_request = Box::new(BroadcastClosestRequest {
node_id: client_node_identity.node_id().clone(),
excluded_peers: vec![],
connected_only: false,
});
let peers = requester
.select_peers(BroadcastStrategy::DirectOrClosestNodes(send_request))
.await
.unwrap();
assert_eq!(peers.len(), 2);
Expand Down
30 changes: 19 additions & 11 deletions comms/dht/src/broadcast_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ pub enum BroadcastStrategy {
/// Send to a random set of peers of size n that are Communication Nodes, excluding the given node IDs
Random(usize, Vec<NodeId>),
/// Send to all n nearest Communication Nodes according to the given BroadcastClosestRequest
Closest(Box<BroadcastClosestRequest>),
ClosestNodes(Box<BroadcastClosestRequest>),
/// Send directly to destination if connected but otherwise send to all n nearest Communication Nodes
DirectOrClosestNodes(Box<BroadcastClosestRequest>),
Broadcast(Vec<NodeId>),
/// Propagate to a set of closest neighbours and random peers
Propagate(NodeDestination, Vec<NodeId>),
Expand All @@ -70,7 +72,8 @@ impl fmt::Display for BroadcastStrategy {
DirectPublicKey(pk) => write!(f, "DirectPublicKey({})", pk),
DirectNodeId(node_id) => write!(f, "DirectNodeId({})", node_id),
Flood(excluded) => write!(f, "Flood({} excluded)", excluded.len()),
Closest(request) => write!(f, "Closest({})", request),
ClosestNodes(request) => write!(f, "ClosestNodes({})", request),
DirectOrClosestNodes(request) => write!(f, "DirectOrClosestNodes({})", request),
Random(n, excluded) => write!(f, "Random({}, {} excluded)", n, excluded.len()),
Broadcast(excluded) => write!(f, "Broadcast({} excluded)", excluded.len()),
Propagate(destination, excluded) => write!(f, "Propagate({}, {} excluded)", destination, excluded.len(),),
Expand All @@ -79,13 +82,18 @@ impl fmt::Display for BroadcastStrategy {
}

impl BroadcastStrategy {
/// Returns true if this strategy will send multiple messages, otherwise false
pub fn is_multi_message(&self) -> bool {
/// Returns true if this strategy will send multiple indirect messages, otherwise false
pub fn is_multi_message(&self, chosen_peers: &[NodeId]) -> bool {
use BroadcastStrategy::*;
matches!(
self,
Closest(_) | Flood(_) | Broadcast(_) | Random(_, _) | Propagate(_, _)
)

match self {
DirectOrClosestNodes(strategy) => {
// Testing if there is a single chosen peer and it is the target NodeId
chosen_peers.len() == 1 && chosen_peers.first() == Some(&strategy.node_id)
},
ClosestNodes(_) | Broadcast(_) | Propagate(_, _) | Flood(_) | Random(_, _) => true,
_ => false,
}
}

pub fn is_direct(&self) -> bool {
Expand Down Expand Up @@ -129,7 +137,7 @@ mod test {
assert!(!BroadcastStrategy::Broadcast(Default::default()).is_direct());
assert!(!BroadcastStrategy::Propagate(Default::default(), Default::default()).is_direct(),);
assert!(!BroadcastStrategy::Flood(Default::default()).is_direct());
assert!(!BroadcastStrategy::Closest(Box::new(BroadcastClosestRequest {
assert!(!BroadcastStrategy::ClosestNodes(Box::new(BroadcastClosestRequest {
node_id: NodeId::default(),
excluded_peers: Default::default(),
connected_only: false
Expand All @@ -152,7 +160,7 @@ mod test {
assert!(BroadcastStrategy::Flood(Default::default())
.direct_public_key()
.is_none());
assert!(BroadcastStrategy::Closest(Box::new(BroadcastClosestRequest {
assert!(BroadcastStrategy::ClosestNodes(Box::new(BroadcastClosestRequest {
node_id: NodeId::default(),
excluded_peers: Default::default(),
connected_only: false
Expand All @@ -174,7 +182,7 @@ mod test {
.direct_node_id()
.is_none());
assert!(BroadcastStrategy::Flood(Default::default()).direct_node_id().is_none());
assert!(BroadcastStrategy::Closest(Box::new(BroadcastClosestRequest {
assert!(BroadcastStrategy::ClosestNodes(Box::new(BroadcastClosestRequest {
node_id: NodeId::default(),
excluded_peers: Default::default(),
connected_only: false
Expand Down
6 changes: 0 additions & 6 deletions comms/dht/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ pub struct DhtConfig {
pub saf_max_message_size: usize,
/// When true, store and forward messages are requested from peers on connect (Default: true)
pub saf_auto_request: bool,
/// The minimum period used to request SAF messages from a peer. When requesting SAF messages,
/// it will request messages since the DHT last went offline, but this may be a small amount of
/// time, so `minimum_request_period` can be used so that messages aren't missed.
/// Default: 3 days
pub saf_minimum_request_period: Duration,
/// The max capacity of the message hash cache
/// Default: 2,500
pub dedup_cache_capacity: usize,
Expand Down Expand Up @@ -154,7 +149,6 @@ impl Default for DhtConfig {
saf_high_priority_msg_storage_ttl: Duration::from_secs(3 * 24 * 60 * 60), // 3 days
saf_auto_request: true,
saf_max_message_size: 512 * 1024,
saf_minimum_request_period: Duration::from_secs(3 * 24 * 60 * 60), // 3 days
dedup_cache_capacity: 2_500,
dedup_cache_trim_interval: Duration::from_secs(5 * 60),
database_url: DbConnectionUrl::Memory,
Expand Down
2 changes: 1 addition & 1 deletion comms/dht/src/outbound/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ where S: Service<DhtOutboundMessage, Response = (), Error = PipelineError>
is_discovery_enabled,
);

let is_broadcast = broadcast_strategy.is_multi_message();
let is_broadcast = broadcast_strategy.is_multi_message(&peers);

// Discovery is required if:
// - Discovery is enabled for this request
Expand Down
20 changes: 16 additions & 4 deletions comms/dht/src/outbound/message_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,25 +116,37 @@ impl SendMessageParams {
/// `node_id` - Select the closest known peers to this `NodeId`
/// `excluded_peers` - vector of `NodeId`s to exclude from broadcast.
pub fn closest(&mut self, node_id: NodeId, excluded_peers: Vec<NodeId>) -> &mut Self {
self.params_mut().broadcast_strategy = BroadcastStrategy::Closest(Box::new(BroadcastClosestRequest {
self.params_mut().broadcast_strategy = BroadcastStrategy::ClosestNodes(Box::new(BroadcastClosestRequest {
excluded_peers,
node_id,
connected_only: false,
}));
self
}

/// Set broadcast_strategy to Closest.`excluded_peers` are excluded. Only peers that are currently connected will be
/// included.
/// Set broadcast_strategy to ClosestNodes.`excluded_peers` are excluded. Only peers that are currently connected
/// will be included.
pub fn closest_connected(&mut self, node_id: NodeId, excluded_peers: Vec<NodeId>) -> &mut Self {
self.params_mut().broadcast_strategy = BroadcastStrategy::Closest(Box::new(BroadcastClosestRequest {
self.params_mut().broadcast_strategy = BroadcastStrategy::ClosestNodes(Box::new(BroadcastClosestRequest {
excluded_peers,
node_id,
connected_only: true,
}));
self
}

/// Set broadcast_strategy to DirectOrClosestNodes.`excluded_peers` are excluded. Only peers that are currently
/// connected will be included.
pub fn direct_or_closest_connected(&mut self, node_id: NodeId, excluded_peers: Vec<NodeId>) -> &mut Self {
self.params_mut().broadcast_strategy =
BroadcastStrategy::DirectOrClosestNodes(Box::new(BroadcastClosestRequest {
excluded_peers,
node_id,
connected_only: true,
}));
self
}

/// Set broadcast_strategy to Neighbours. `excluded_peers` are excluded. Only Peers that have
/// `PeerFeatures::MESSAGE_PROPAGATION` are included.
pub fn broadcast(&mut self, excluded_peers: Vec<NodeId>) -> &mut Self {
Expand Down
2 changes: 1 addition & 1 deletion comms/dht/src/outbound/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl OutboundServiceMock {
},
};
},
BroadcastStrategy::Closest(_) => {
BroadcastStrategy::ClosestNodes(_) => {
if behaviour.broadcast == ResponseType::Queued {
let (response, mut inner_reply_tx) = self.add_call((*params).clone(), body);
reply_tx.send(response).expect("Reply channel cancelled");
Expand Down
2 changes: 2 additions & 0 deletions comms/dht/src/storage/dht_setting_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use std::fmt;
pub enum DhtMetadataKey {
/// Timestamp each time the DHT is shut down
OfflineTimestamp,
/// Timestamp of the most recent SAF message received
LastSafMessageReceived,
}

impl fmt::Display for DhtMetadataKey {
Expand Down
11 changes: 11 additions & 0 deletions comms/dht/src/store_forward/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,17 @@ impl StoreAndForwardDatabase {
.await
}

pub(crate) async fn delete_messages_older_than(&self, since: NaiveDateTime) -> Result<usize, StorageError> {
self.connection
.with_connection_async(move |conn| {
diesel::delete(stored_messages::table)
.filter(stored_messages::stored_at.lt(since))
.execute(conn)
.map_err(Into::into)
})
.await
}

pub(crate) async fn truncate_messages(&self, max_size: usize) -> Result<usize, StorageError> {
self.connection
.with_connection_async(move |conn| {
Expand Down
2 changes: 1 addition & 1 deletion comms/dht/src/store_forward/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ where S: Service<DecryptedDhtMessage, Response = (), Error = PipelineError>
target: LOG_TARGET,
"Forwarding SAF message directly to node: {}, Tag#{}", node_id, dht_header.message_tag
);
send_params.closest_connected(node_id.clone(), excluded_peers);
send_params.direct_or_closest_connected(node_id.clone(), excluded_peers);
},
_ => {
debug!(
Expand Down
9 changes: 7 additions & 2 deletions comms/dht/src/store_forward/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,17 @@ impl StoredMessagesRequest {

#[cfg(test)]
impl StoredMessage {
pub fn new(version: u32, dht_header: crate::envelope::DhtMessageHeader, body: Vec<u8>) -> Self {
pub fn new(
version: u32,
dht_header: crate::envelope::DhtMessageHeader,
body: Vec<u8>,
stored_at: DateTime<Utc>,
) -> Self {
Self {
version,
dht_header: Some(dht_header.into()),
body,
stored_at: Some(datetime_to_timestamp(Utc::now())),
stored_at: Some(datetime_to_timestamp(stored_at)),
}
}
}
Expand Down
Loading

0 comments on commit cedb4ef

Please sign in to comment.