Skip to content

Commit

Permalink
Fix orders "leak" due to race condition #1173. (#1178)
Browse files Browse the repository at this point in the history
  • Loading branch information
artemii235 committed Dec 28, 2021
1 parent fcff9ca commit be163e1
Show file tree
Hide file tree
Showing 10 changed files with 199 additions and 232 deletions.
65 changes: 28 additions & 37 deletions mm2src/lp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ use common::mm_ctx::{MmArc, MmWeak};
use common::mm_error::prelude::*;
use common::mm_metrics::{ClockOps, MetricsOps};
use derive_more::Display;
use futures::{channel::oneshot, lock::Mutex as AsyncMutex, StreamExt};
use futures::{channel::oneshot, StreamExt};
use mm2_libp2p::atomicdex_behaviour::{AdexBehaviourCmd, AdexBehaviourEvent, AdexCmdTx, AdexEventRx, AdexResponse,
AdexResponseChannel};
use mm2_libp2p::peers_exchange::PeerAddresses;
use mm2_libp2p::{decode_message, encode_message, GossipsubMessage, MessageId, NetworkPorts, PeerId, TOPIC_SEPARATOR};
#[cfg(test)] use mocktopus::macros::*;
use parking_lot::Mutex as PaMutex;
use serde::de;
use std::net::ToSocketAddrs;
use std::sync::Arc;
Expand Down Expand Up @@ -62,14 +63,14 @@ pub enum P2PRequest {

pub struct P2PContext {
/// Using Mutex helps to prevent cloning which can actually result to channel being unbounded in case of using 1 tx clone per 1 message.
pub cmd_tx: AsyncMutex<AdexCmdTx>,
pub cmd_tx: PaMutex<AdexCmdTx>,
}

#[cfg_attr(test, mockable)]
impl P2PContext {
pub fn new(cmd_tx: AdexCmdTx) -> Self {
P2PContext {
cmd_tx: AsyncMutex::new(cmd_tx),
cmd_tx: PaMutex::new(cmd_tx),
}
}

Expand Down Expand Up @@ -103,7 +104,7 @@ pub async fn p2p_event_process_loop(ctx: MmWeak, mut rx: AdexEventRx, i_am_relay
request,
response_channel,
}) => {
if let Err(e) = process_p2p_request(ctx, peer_id, request, response_channel).await {
if let Err(e) = process_p2p_request(ctx, peer_id, request, response_channel) {
log::error!("Error on process P2P request: {:?}", e);
}
},
Expand Down Expand Up @@ -157,16 +158,16 @@ async fn process_p2p_message(
}
}

async fn process_p2p_request(
fn process_p2p_request(
ctx: MmArc,
_peer_id: PeerId,
request: Vec<u8>,
response_channel: AdexResponseChannel,
) -> P2PRequestResult<()> {
let request = decode_message::<P2PRequest>(&request)?;
let result = match request {
P2PRequest::Ordermatch(req) => lp_ordermatch::process_peer_request(ctx.clone(), req).await,
P2PRequest::NetworkInfo(req) => lp_stats::process_info_request(ctx.clone(), req).await,
P2PRequest::Ordermatch(req) => lp_ordermatch::process_peer_request(ctx.clone(), req),
P2PRequest::NetworkInfo(req) => lp_stats::process_info_request(ctx.clone(), req),
};

let res = match result {
Expand All @@ -180,32 +181,29 @@ async fn process_p2p_request(
p2p_ctx
.cmd_tx
.lock()
.await
.try_send(cmd)
.map_to_mm(|e| P2PRequestError::SendError(e.to_string()))?;
Ok(())
}

pub fn broadcast_p2p_msg(ctx: &MmArc, topics: Vec<String>, msg: Vec<u8>) {
let ctx = ctx.clone();
spawn(async move {
let cmd = AdexBehaviourCmd::PublishMsg { topics, msg };
let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx);
if let Err(e) = p2p_ctx.cmd_tx.lock().await.try_send(cmd) {
log::error!("broadcast_p2p_msg cmd_tx.send error {:?}", e);
};
});
let cmd = AdexBehaviourCmd::PublishMsg { topics, msg };
let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx);
if let Err(e) = p2p_ctx.cmd_tx.lock().try_send(cmd) {
log::error!("broadcast_p2p_msg cmd_tx.send error {:?}", e);
};
}

/// Subscribe to the given `topic`.
///
/// # Safety
///
/// The function locks the [`MmCtx::p2p_ctx`] mutex.
pub async fn subscribe_to_topic(ctx: &MmArc, topic: String) {
pub fn subscribe_to_topic(ctx: &MmArc, topic: String) {
let p2p_ctx = P2PContext::fetch_from_mm_arc(ctx);
let cmd = AdexBehaviourCmd::Subscribe { topic };
if let Err(e) = p2p_ctx.cmd_tx.lock().await.try_send(cmd) {
if let Err(e) = p2p_ctx.cmd_tx.lock().try_send(cmd) {
log::error!("subscribe_to_topic cmd_tx.send error {:?}", e);
};
}
Expand All @@ -225,7 +223,6 @@ pub async fn request_any_relay<T: de::DeserializeOwned>(
p2p_ctx
.cmd_tx
.lock()
.await
.try_send(cmd)
.map_to_mm(|e| P2PRequestError::SendError(e.to_string()))?;
match response_rx
Expand Down Expand Up @@ -262,7 +259,6 @@ pub async fn request_relays<T: de::DeserializeOwned>(
p2p_ctx
.cmd_tx
.lock()
.await
.try_send(cmd)
.map_to_mm(|e| P2PRequestError::SendError(e.to_string()))?;
let responses = response_rx
Expand All @@ -288,7 +284,6 @@ pub async fn request_peers<T: de::DeserializeOwned>(
p2p_ctx
.cmd_tx
.lock()
.await
.try_send(cmd)
.map_to_mm(|e| P2PRequestError::SendError(e.to_string()))?;
let responses = response_rx
Expand Down Expand Up @@ -339,27 +334,23 @@ fn parse_peers_responses<T: de::DeserializeOwned>(

pub fn propagate_message(ctx: &MmArc, message_id: MessageId, propagation_source: PeerId) {
let ctx = ctx.clone();
spawn(async move {
let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx);
let cmd = AdexBehaviourCmd::PropagateMessage {
message_id,
propagation_source,
};
if let Err(e) = p2p_ctx.cmd_tx.lock().await.try_send(cmd) {
log::error!("propagate_message cmd_tx.send error {:?}", e);
};
});
let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx);
let cmd = AdexBehaviourCmd::PropagateMessage {
message_id,
propagation_source,
};
if let Err(e) = p2p_ctx.cmd_tx.lock().try_send(cmd) {
log::error!("propagate_message cmd_tx.send error {:?}", e);
};
}

pub fn add_reserved_peer_addresses(ctx: &MmArc, peer: PeerId, addresses: PeerAddresses) {
let ctx = ctx.clone();
spawn(async move {
let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx);
let cmd = AdexBehaviourCmd::AddReservedPeer { peer, addresses };
if let Err(e) = p2p_ctx.cmd_tx.lock().await.try_send(cmd) {
log::error!("add_reserved_peer_addresses cmd_tx.send error {:?}", e);
};
});
let p2p_ctx = P2PContext::fetch_from_mm_arc(&ctx);
let cmd = AdexBehaviourCmd::AddReservedPeer { peer, addresses };
if let Err(e) = p2p_ctx.cmd_tx.lock().try_send(cmd) {
log::error!("add_reserved_peer_addresses cmd_tx.send error {:?}", e);
};
}

#[derive(Debug, Display)]
Expand Down
Loading

0 comments on commit be163e1

Please sign in to comment.