From f55762ea05e54f7711e893f1c7df4d7b670ddabd Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Thu, 22 Sep 2022 15:49:15 +0400 Subject: [PATCH] fix(core/mempool): improve perf of retrieve transactions (#4710) --- base_layer/core/benches/mempool.rs | 17 +-- .../core/src/mempool/mempool_storage.rs | 36 ++++- .../core/src/mempool/service/service.rs | 128 ++++++++-------- .../unconfirmed_pool/unconfirmed_pool.rs | 141 +++++++++++------- 4 files changed, 188 insertions(+), 134 deletions(-) diff --git a/base_layer/core/benches/mempool.rs b/base_layer/core/benches/mempool.rs index 769ea5218d..042c72eb80 100644 --- a/base_layer/core/benches/mempool.rs +++ b/base_layer/core/benches/mempool.rs @@ -31,7 +31,7 @@ mod benches { mod benches { use std::sync::Arc; - use criterion::{criterion_group, BatchSize, Criterion}; + use criterion::{criterion_group, Criterion}; use futures::future::try_join_all; use tari_common::configuration::Network; use tari_core::{ @@ -103,16 +103,11 @@ mod benches { OutputFeatures::default(), )); c.bench_function("Mempool Insert", move |b| { - let mut offset = 0; - b.iter_batched( - || { - let batch = transactions[offset..offset + 10].to_vec(); - offset = (offset + 10) % NUM_TXNS; - batch - }, - |txns| runtime.block_on(async { mempool.insert_all(txns).await.unwrap() }), - BatchSize::SmallInput, - ); + let mut idx = 0; + b.iter(|| { + runtime.block_on(async { mempool.insert(transactions[idx].clone()).await.unwrap() }); + idx = (idx + 1) % NUM_TXNS; + }); }); } diff --git a/base_layer/core/src/mempool/mempool_storage.rs b/base_layer/core/src/mempool/mempool_storage.rs index 31afcd356b..12b79d7e64 100644 --- a/base_layer/core/src/mempool/mempool_storage.rs +++ b/base_layer/core/src/mempool/mempool_storage.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::sync::Arc; +use std::{sync::Arc, time::Instant}; use log::*; use tari_common_types::types::{PrivateKey, Signature}; @@ -79,15 +79,25 @@ impl MempoolStorage { .first() .map(|k| k.excess_sig.get_signature().to_hex()) .unwrap_or_else(|| "None?!".into()); + let timer = Instant::now(); debug!(target: LOG_TARGET, "Inserting tx into mempool: {}", tx_id); match self.validator.validate(&tx) { Ok(()) => { debug!( target: LOG_TARGET, - "Transaction {} is VALID, inserting in unconfirmed pool", tx_id + "Transaction {} is VALID ({:.2?}), inserting in unconfirmed pool in", + tx_id, + timer.elapsed() ); + let timer = Instant::now(); let weight = self.get_transaction_weighting(0); self.unconfirmed_pool.insert(tx, None, &weight); + debug!( + target: LOG_TARGET, + "Transaction {} inserted in {:.2?}", + tx_id, + timer.elapsed() + ); TxStorageResponse::UnconfirmedPool }, Err(ValidationError::UnknownInputs(dependent_outputs)) => { @@ -146,16 +156,36 @@ impl MempoolStorage { published_block.header.hash().to_hex(), published_block.body.to_counts_string() ); + let timer = Instant::now(); // Move published txs to ReOrgPool and discard double spends let removed_transactions = self .unconfirmed_pool .remove_published_and_discard_deprecated_transactions(published_block); + debug!( + target: LOG_TARGET, + "{} transactions removed from unconfirmed pool in {:.2?}, moving them to reorg pool for block #{} ({}) {}", + removed_transactions.len(), + timer.elapsed(), + published_block.header.height, + published_block.header.hash().to_hex(), + published_block.body.to_counts_string() + ); + let timer = Instant::now(); self.reorg_pool .insert_all(published_block.header.height, removed_transactions); - + debug!( + target: LOG_TARGET, + "Transactions added to reorg pool in {:.2?} for block #{} ({}) {}", + timer.elapsed(), + published_block.header.height, + published_block.header.hash().to_hex(), + published_block.body.to_counts_string() + ); + let timer = Instant::now(); self.unconfirmed_pool.compact(); self.reorg_pool.compact(); + debug!(target: LOG_TARGET, "Compaction took {:.2?}", timer.elapsed()); debug!(target: LOG_TARGET, "{}", self.stats()); Ok(()) } diff --git a/base_layer/core/src/mempool/service/service.rs b/base_layer/core/src/mempool/service/service.rs index 61d01e6fe3..4505fc6b17 100644 --- a/base_layer/core/src/mempool/service/service.rs +++ b/base_layer/core/src/mempool/service/service.rs @@ -99,21 +99,13 @@ impl MempoolService { // Outbound tx messages from the OutboundMempoolServiceInterface Some((txn, excluded_peers)) = outbound_tx_stream.recv() => { - let _res = handle_outbound_tx(&mut self.outbound_message_service, txn, excluded_peers).await.map_err(|e| + let _res = self.handle_outbound_tx(txn, excluded_peers).await.map_err(|e| error!(target: LOG_TARGET, "Error sending outbound tx message: {}", e) ); }, // Incoming transaction messages from the Comms layer - Some(transaction_msg) = inbound_transaction_stream.next() => { - let result = handle_incoming_tx(&mut self.inbound_handlers, transaction_msg).await; - if let Err(e) = result { - error!( - target: LOG_TARGET, - "Failed to handle incoming transaction message: {:?}", e - ); - } - } + Some(transaction_msg) = inbound_transaction_stream.next() => self.handle_incoming_tx(transaction_msg), // Incoming local request messages from the LocalMempoolServiceInterface and other local services Some(local_request_context) = local_request_stream.next() => { @@ -171,67 +163,69 @@ impl MempoolService { } }); } -} -async fn handle_incoming_tx( - inbound_handlers: &mut MempoolInboundHandlers, - domain_transaction_msg: DomainMessage, -) -> Result<(), MempoolServiceError> { - let DomainMessage::<_> { source_peer, inner, .. } = domain_transaction_msg; - - debug!( - "New transaction received: {}, from: {}", - inner - .first_kernel_excess_sig() - .map(|s| s.get_signature().to_hex()) - .unwrap_or_else(|| "No kernels!".to_string()), - source_peer.public_key, - ); - trace!( - target: LOG_TARGET, - "New transaction: {}, from: {}", - inner, - source_peer.public_key - ); - inbound_handlers - .handle_transaction(inner, Some(source_peer.node_id)) - .await?; - - Ok(()) -} + fn handle_incoming_tx(&self, domain_transaction_msg: DomainMessage) { + let DomainMessage::<_> { source_peer, inner, .. } = domain_transaction_msg; + + debug!( + "New transaction received: {}, from: {}", + inner + .first_kernel_excess_sig() + .map(|s| s.get_signature().to_hex()) + .unwrap_or_else(|| "No kernels!".to_string()), + source_peer.public_key, + ); + trace!( + target: LOG_TARGET, + "New transaction: {}, from: {}", + inner, + source_peer.public_key + ); + let mut inbound_handlers = self.inbound_handlers.clone(); + task::spawn(async move { + let result = inbound_handlers + .handle_transaction(inner, Some(source_peer.node_id)) + .await; + if let Err(e) = result { + error!( + target: LOG_TARGET, + "Failed to handle incoming transaction message: {:?}", e + ); + } + }); + } -async fn handle_outbound_tx( - outbound_message_service: &mut OutboundMessageRequester, - tx: Arc, - exclude_peers: Vec, -) -> Result<(), MempoolServiceError> { - let result = outbound_message_service - .flood( - NodeDestination::Unknown, - OutboundEncryption::ClearText, - exclude_peers, - OutboundDomainMessage::new( - &TariMessageType::NewTransaction, - proto::types::Transaction::try_from(tx.clone()).map_err(MempoolServiceError::ConversionError)?, - ), - format!( - "Outbound mempool tx: {}", - tx.first_kernel_excess_sig() - .map(|s| s.get_signature().to_hex()) - .unwrap_or_else(|| "No kernels!".to_string()) - ), - ) - .await; - - if let Err(e) = result { - return match e { - DhtOutboundError::NoMessagesQueued => Ok(()), - _ => { + async fn handle_outbound_tx( + &mut self, + tx: Arc, + exclude_peers: Vec, + ) -> Result<(), MempoolServiceError> { + let result = self + .outbound_message_service + .flood( + NodeDestination::Unknown, + OutboundEncryption::ClearText, + exclude_peers, + OutboundDomainMessage::new( + &TariMessageType::NewTransaction, + proto::types::Transaction::try_from(tx.clone()).map_err(MempoolServiceError::ConversionError)?, + ), + format!( + "Outbound mempool tx: {}", + tx.first_kernel_excess_sig() + .map(|s| s.get_signature().to_hex()) + .unwrap_or_else(|| "No kernels!".to_string()) + ), + ) + .await; + + match result { + Ok(_) => Ok(()), + Err(DhtOutboundError::NoMessagesQueued) => Ok(()), + Err(e) => { error!(target: LOG_TARGET, "Handle outbound tx failure. {:?}", e); Err(MempoolServiceError::OutboundMessageService(e.to_string())) }, - }; + } } - - Ok(()) } diff --git a/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs b/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs index bf59316dc8..92995fd441 100644 --- a/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs +++ b/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs @@ -28,8 +28,8 @@ use std::{ use log::*; use serde::{Deserialize, Serialize}; -use tari_common_types::types::{HashOutput, PrivateKey, Signature}; -use tari_utilities::hex::Hex; +use tari_common_types::types::{FixedHash, HashOutput, PrivateKey, Signature}; +use tokio::time::Instant; use crate::{ blocks::Block, @@ -333,14 +333,15 @@ impl UnconfirmedPool { current_transactions: &HashMap>, transactions_to_insert: &HashMap>, ) -> bool { - for (_, tx_to_insert) in transactions_to_insert.iter() { - for (_, transaction) in current_transactions.iter() { - for input in transaction.body.inputs() { - for tx_input in tx_to_insert.body.inputs() { - if tx_input.output_hash() == input.output_hash() { - return true; - } - } + let insert_set = transactions_to_insert + .values() + .flat_map(|tx| tx.body.inputs()) + .map(|i| i.output_hash()) + .collect::>(); + for (_, transaction) in current_transactions.iter() { + for input in transaction.body.inputs() { + if insert_set.contains(&input.output_hash()) { + return true; } } } @@ -377,65 +378,99 @@ impl UnconfirmedPool { target: LOG_TARGET, "Searching for transactions to remove from unconfirmed pool in block {} ({})", published_block.header.height, - published_block.header.hash().to_hex(), + published_block.header.hash() ); - // Remove all transactions that contain the kernels found in this block - let mut to_remove = published_block - .body - .kernels() - .iter() - .map(|kernel| kernel.excess_sig.get_signature()) - .filter_map(|sig| self.txs_by_signature.get(sig)) - .flatten() - .copied() - .collect::>(); - - let mut removed_transactions = to_remove - .iter() - .filter_map(|key| self.remove_transaction(*key)) - .collect::>(); + let mut to_remove; + let mut removed_transactions; + { + // Remove all transactions that contain the kernels found in this block + let timer = Instant::now(); + to_remove = published_block + .body + .kernels() + .iter() + .map(|kernel| kernel.excess_sig.get_signature()) + .filter_map(|sig| self.txs_by_signature.get(sig)) + .flatten() + .copied() + .collect::>(); + removed_transactions = to_remove + .iter() + .filter_map(|key| self.remove_transaction(*key)) + .collect::>(); + debug!( + target: LOG_TARGET, + "Found {} transactions with matching kernel sigs from unconfirmed pool in {:.2?}", + to_remove.len(), + timer.elapsed() + ); + } // Reuse the buffer, clear is very cheap to_remove.clear(); - // Remove all transactions that contain the inputs found in this block - to_remove.extend( - self.tx_by_key + { + // Remove all transactions that contain the inputs found in this block + let timer = Instant::now(); + let published_block_hash_set = published_block + .body + .inputs() .iter() - .filter(|(_, tx)| UnconfirmedPool::find_matching_block_input(tx, published_block)) - .map(|(key, _)| *key), - ); + .map(|i| i.output_hash()) + .collect::>(); + + to_remove.extend( + self.tx_by_key + .iter() + .filter(|(_, tx)| UnconfirmedPool::find_matching_block_input(tx, &published_block_hash_set)) + .map(|(key, _)| *key), + ); + + removed_transactions.extend(to_remove.iter().filter_map(|key| self.remove_transaction(*key))); + debug!( + target: LOG_TARGET, + "Found {} transactions with matching inputs from unconfirmed pool in {:.2?}", + to_remove.len(), + timer.elapsed() + ); + } - removed_transactions.extend(to_remove.iter().filter_map(|key| self.remove_transaction(*key))); to_remove.clear(); - // Remove all transactions that contain the outputs found in this block - to_remove.extend( - published_block - .body - .outputs() - .iter() - .filter_map(|output| self.txs_by_output.get(&output.hash())) - .flatten() - .copied(), - ); + { + // Remove all transactions that contain the outputs found in this block + let timer = Instant::now(); + to_remove.extend( + published_block + .body + .outputs() + .iter() + .filter_map(|output| self.txs_by_output.get(&output.hash())) + .flatten() + .copied(), + ); - removed_transactions.extend(to_remove.iter().filter_map(|key| self.remove_transaction(*key))); + removed_transactions.extend(to_remove.iter().filter_map(|key| self.remove_transaction(*key))); + debug!( + target: LOG_TARGET, + "Found {} transactions with matching outputs from unconfirmed pool in {:.2?}", + to_remove.len(), + timer.elapsed() + ); + } removed_transactions } /// Searches a block and transaction for matching inputs - fn find_matching_block_input(transaction: &PrioritizedTransaction, published_block: &Block) -> bool { - for input in transaction.transaction.body.inputs() { - for published_input in published_block.body.inputs() { - if published_input.output_hash() == input.output_hash() { - return true; - } - } - } - false + fn find_matching_block_input(transaction: &PrioritizedTransaction, published_block: &HashSet) -> bool { + transaction + .transaction + .body + .inputs() + .iter() + .any(|input| published_block.contains(&input.output_hash())) } /// Ensures that all transactions are safely deleted in order and from all storage