Skip to content

Commit

Permalink
fix(core/mempool): improve perf of retrieve transactions (#4710)
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi authored Sep 22, 2022
1 parent 4551ac3 commit f55762e
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 134 deletions.
17 changes: 6 additions & 11 deletions base_layer/core/benches/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ mod benches {
mod benches {
use std::sync::Arc;

use criterion::{criterion_group, BatchSize, Criterion};
use criterion::{criterion_group, Criterion};
use futures::future::try_join_all;
use tari_common::configuration::Network;
use tari_core::{
Expand Down Expand Up @@ -103,16 +103,11 @@ mod benches {
OutputFeatures::default(),
));
c.bench_function("Mempool Insert", move |b| {
let mut offset = 0;
b.iter_batched(
|| {
let batch = transactions[offset..offset + 10].to_vec();
offset = (offset + 10) % NUM_TXNS;
batch
},
|txns| runtime.block_on(async { mempool.insert_all(txns).await.unwrap() }),
BatchSize::SmallInput,
);
let mut idx = 0;
b.iter(|| {
runtime.block_on(async { mempool.insert(transactions[idx].clone()).await.unwrap() });
idx = (idx + 1) % NUM_TXNS;
});
});
}

Expand Down
36 changes: 33 additions & 3 deletions base_layer/core/src/mempool/mempool_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::sync::Arc;
use std::{sync::Arc, time::Instant};

use log::*;
use tari_common_types::types::{PrivateKey, Signature};
Expand Down Expand Up @@ -79,15 +79,25 @@ impl MempoolStorage {
.first()
.map(|k| k.excess_sig.get_signature().to_hex())
.unwrap_or_else(|| "None?!".into());
let timer = Instant::now();
debug!(target: LOG_TARGET, "Inserting tx into mempool: {}", tx_id);
match self.validator.validate(&tx) {
Ok(()) => {
debug!(
target: LOG_TARGET,
"Transaction {} is VALID, inserting in unconfirmed pool", tx_id
"Transaction {} is VALID ({:.2?}), inserting in unconfirmed pool in",
tx_id,
timer.elapsed()
);
let timer = Instant::now();
let weight = self.get_transaction_weighting(0);
self.unconfirmed_pool.insert(tx, None, &weight);
debug!(
target: LOG_TARGET,
"Transaction {} inserted in {:.2?}",
tx_id,
timer.elapsed()
);
TxStorageResponse::UnconfirmedPool
},
Err(ValidationError::UnknownInputs(dependent_outputs)) => {
Expand Down Expand Up @@ -146,16 +156,36 @@ impl MempoolStorage {
published_block.header.hash().to_hex(),
published_block.body.to_counts_string()
);
let timer = Instant::now();
// Move published txs to ReOrgPool and discard double spends
let removed_transactions = self
.unconfirmed_pool
.remove_published_and_discard_deprecated_transactions(published_block);
debug!(
target: LOG_TARGET,
"{} transactions removed from unconfirmed pool in {:.2?}, moving them to reorg pool for block #{} ({}) {}",
removed_transactions.len(),
timer.elapsed(),
published_block.header.height,
published_block.header.hash().to_hex(),
published_block.body.to_counts_string()
);
let timer = Instant::now();
self.reorg_pool
.insert_all(published_block.header.height, removed_transactions);

debug!(
target: LOG_TARGET,
"Transactions added to reorg pool in {:.2?} for block #{} ({}) {}",
timer.elapsed(),
published_block.header.height,
published_block.header.hash().to_hex(),
published_block.body.to_counts_string()
);
let timer = Instant::now();
self.unconfirmed_pool.compact();
self.reorg_pool.compact();

debug!(target: LOG_TARGET, "Compaction took {:.2?}", timer.elapsed());
debug!(target: LOG_TARGET, "{}", self.stats());
Ok(())
}
Expand Down
128 changes: 61 additions & 67 deletions base_layer/core/src/mempool/service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,21 +99,13 @@ impl MempoolService {

// Outbound tx messages from the OutboundMempoolServiceInterface
Some((txn, excluded_peers)) = outbound_tx_stream.recv() => {
let _res = handle_outbound_tx(&mut self.outbound_message_service, txn, excluded_peers).await.map_err(|e|
let _res = self.handle_outbound_tx(txn, excluded_peers).await.map_err(|e|
error!(target: LOG_TARGET, "Error sending outbound tx message: {}", e)
);
},

// Incoming transaction messages from the Comms layer
Some(transaction_msg) = inbound_transaction_stream.next() => {
let result = handle_incoming_tx(&mut self.inbound_handlers, transaction_msg).await;
if let Err(e) = result {
error!(
target: LOG_TARGET,
"Failed to handle incoming transaction message: {:?}", e
);
}
}
Some(transaction_msg) = inbound_transaction_stream.next() => self.handle_incoming_tx(transaction_msg),

// Incoming local request messages from the LocalMempoolServiceInterface and other local services
Some(local_request_context) = local_request_stream.next() => {
Expand Down Expand Up @@ -171,67 +163,69 @@ impl MempoolService {
}
});
}
}

async fn handle_incoming_tx(
inbound_handlers: &mut MempoolInboundHandlers,
domain_transaction_msg: DomainMessage<Transaction>,
) -> Result<(), MempoolServiceError> {
let DomainMessage::<_> { source_peer, inner, .. } = domain_transaction_msg;

debug!(
"New transaction received: {}, from: {}",
inner
.first_kernel_excess_sig()
.map(|s| s.get_signature().to_hex())
.unwrap_or_else(|| "No kernels!".to_string()),
source_peer.public_key,
);
trace!(
target: LOG_TARGET,
"New transaction: {}, from: {}",
inner,
source_peer.public_key
);
inbound_handlers
.handle_transaction(inner, Some(source_peer.node_id))
.await?;

Ok(())
}
fn handle_incoming_tx(&self, domain_transaction_msg: DomainMessage<Transaction>) {
let DomainMessage::<_> { source_peer, inner, .. } = domain_transaction_msg;

debug!(
"New transaction received: {}, from: {}",
inner
.first_kernel_excess_sig()
.map(|s| s.get_signature().to_hex())
.unwrap_or_else(|| "No kernels!".to_string()),
source_peer.public_key,
);
trace!(
target: LOG_TARGET,
"New transaction: {}, from: {}",
inner,
source_peer.public_key
);
let mut inbound_handlers = self.inbound_handlers.clone();
task::spawn(async move {
let result = inbound_handlers
.handle_transaction(inner, Some(source_peer.node_id))
.await;
if let Err(e) = result {
error!(
target: LOG_TARGET,
"Failed to handle incoming transaction message: {:?}", e
);
}
});
}

async fn handle_outbound_tx(
outbound_message_service: &mut OutboundMessageRequester,
tx: Arc<Transaction>,
exclude_peers: Vec<NodeId>,
) -> Result<(), MempoolServiceError> {
let result = outbound_message_service
.flood(
NodeDestination::Unknown,
OutboundEncryption::ClearText,
exclude_peers,
OutboundDomainMessage::new(
&TariMessageType::NewTransaction,
proto::types::Transaction::try_from(tx.clone()).map_err(MempoolServiceError::ConversionError)?,
),
format!(
"Outbound mempool tx: {}",
tx.first_kernel_excess_sig()
.map(|s| s.get_signature().to_hex())
.unwrap_or_else(|| "No kernels!".to_string())
),
)
.await;

if let Err(e) = result {
return match e {
DhtOutboundError::NoMessagesQueued => Ok(()),
_ => {
async fn handle_outbound_tx(
&mut self,
tx: Arc<Transaction>,
exclude_peers: Vec<NodeId>,
) -> Result<(), MempoolServiceError> {
let result = self
.outbound_message_service
.flood(
NodeDestination::Unknown,
OutboundEncryption::ClearText,
exclude_peers,
OutboundDomainMessage::new(
&TariMessageType::NewTransaction,
proto::types::Transaction::try_from(tx.clone()).map_err(MempoolServiceError::ConversionError)?,
),
format!(
"Outbound mempool tx: {}",
tx.first_kernel_excess_sig()
.map(|s| s.get_signature().to_hex())
.unwrap_or_else(|| "No kernels!".to_string())
),
)
.await;

match result {
Ok(_) => Ok(()),
Err(DhtOutboundError::NoMessagesQueued) => Ok(()),
Err(e) => {
error!(target: LOG_TARGET, "Handle outbound tx failure. {:?}", e);
Err(MempoolServiceError::OutboundMessageService(e.to_string()))
},
};
}
}

Ok(())
}
Loading

0 comments on commit f55762e

Please sign in to comment.