diff --git a/base_layer/core/src/mempool/priority/prioritized_transaction.rs b/base_layer/core/src/mempool/priority/prioritized_transaction.rs index 418d3c25d3..8c4c4c6e1f 100644 --- a/base_layer/core/src/mempool/priority/prioritized_transaction.rs +++ b/base_layer/core/src/mempool/priority/prioritized_transaction.rs @@ -76,6 +76,7 @@ pub struct PrioritizedTransaction { pub key: usize, pub transaction: Arc, pub priority: FeePriority, + pub fee_per_byte: u64, pub weight: u64, pub dependent_output_hashes: Vec, } @@ -95,6 +96,7 @@ impl PrioritizedTransaction { Ok(Self { key, priority: FeePriority::new(&transaction, insert_epoch, weight), + fee_per_byte: ((f64::from(transaction.body.get_total_fee()) / weight as f64) * 1000.0) as u64, weight, transaction, dependent_output_hashes: dependent_outputs.unwrap_or_default(), diff --git a/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs b/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs index 4f9eb6b536..5ae3d247fd 100644 --- a/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs +++ b/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs @@ -21,7 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use std::{ - collections::{BTreeMap, HashMap, HashSet}, + collections::{BTreeMap, BinaryHeap, HashMap, HashSet}, sync::Arc, }; @@ -89,6 +89,8 @@ pub struct RetrieveResults { pub transactions_to_insert: Vec>, } +pub type CompleteTransactionBranch = HashMap>, u64, u64)>; + impl UnconfirmedPool { /// Create a new UnconfirmedPool with the specified configuration pub fn new(config: UnconfirmedPoolConfig) -> Self { @@ -172,40 +174,94 @@ impl UnconfirmedPool { } /// Returns a set of the highest priority unconfirmed transactions, that can be included in a block + #[allow(clippy::too_many_lines)] pub fn fetch_highest_priority_txs(&mut self, total_weight: u64) -> Result { + // The process of selection is as follows: + // Assume that all transaction have the same weight for simplicity. A(20)->B(2) means A depends on B and A has + // fee 20 and B has fee 2. A(20)->B(2)->C(14), D(12) + // 1) A will be selected first, but B and C will be piggybacked on A, because overall fee_per_byte is 12, so we + // store it temporarily. + // 2) We look at transaction C with fee per byte 14, it's good, nothing is better. + // 3) We come back to transaction A with fee per byte 12, but now that C is already in, we recompute it's fee + // per byte to 11, and again we store it temporarily. + // 4) Next we process transaction D, it's good, nothing is better. + // 5) And now we proceed finally to transaction A, because there is no other possible better option. + // + // Note, if we store some TX_a that is dependent on some TXs including TX_b. And we remove TX_b (this should + // trigger TX_a fee per byte recompute) before we process TX_a again, then the TX_a fee_per_byte will be lower + // or equal, it will never be higher. Proof by contradiction we remove TX_b sooner then TX_a is process and + // fee_per_byte(TX_a+dependents) > fee_per_byte(TX_a+dependents-TX_b), that would mean that + // fee_per_byte(TX_b)::new(); + // For each transaction we store transactions that depends on it. So when we process it, we can mark all of them + // for recomputing. + let mut depended_on: HashMap> = HashMap::new(); + let mut recompute = HashSet::new(); for (_, tx_key) in self.tx_by_priority.iter().rev() { if selected_txs.contains_key(tx_key) { continue; } - let prioritized_transaction = self .tx_by_key .get(tx_key) .ok_or(UnconfirmedPoolError::StorageOutofSync)?; - + self.check_the_potential_txs( + total_weight, + &mut selected_txs, + &mut curr_weight, + &mut curr_skip_count, + &mut complete_transaction_branch, + &mut potentional_to_add, + &mut depended_on, + &mut recompute, + prioritized_transaction.fee_per_byte, + ); + if curr_skip_count >= self.config.weight_tx_skip_count { + break; + } let mut total_transaction_weight = 0; + let mut total_transaction_fees = 0; let mut candidate_transactions_to_select = HashMap::new(); + let mut potential_transactions_to_remove_and_recheck = Vec::new(); self.get_all_dependent_transactions( prioritized_transaction, &mut candidate_transactions_to_select, &mut potential_transactions_to_remove_and_recheck, &selected_txs, &mut total_transaction_weight, + &mut total_transaction_fees, &mut unique_ids, )?; let total_weight_after_candidates = curr_weight + total_transaction_weight; if total_weight_after_candidates <= total_weight && potential_transactions_to_remove_and_recheck.is_empty() { - if !UnconfirmedPool::find_duplicate_input(&selected_txs, &candidate_transactions_to_select) { - curr_weight += total_transaction_weight; - selected_txs.extend(candidate_transactions_to_select); + for dependend_on_tx_key in candidate_transactions_to_select.keys() { + if dependend_on_tx_key != tx_key { + // Transaction is not depended on itself. + depended_on + .entry(*dependend_on_tx_key) + .and_modify(|v| v.push(tx_key)) + .or_insert_with(|| vec![tx_key]); + } } + let fee_per_byte = (total_transaction_fees as f64 / total_transaction_weight as f64 * 1000.0) as u64; + complete_transaction_branch.insert( + *tx_key, + ( + candidate_transactions_to_select.clone(), + total_transaction_weight, + total_transaction_fees, + ), + ); + potentional_to_add.push((fee_per_byte, *tx_key)); } else { transactions_to_remove_and_recheck.append(&mut potential_transactions_to_remove_and_recheck); // Check if some the next few txs with slightly lower priority wont fit in the remaining space. @@ -215,6 +271,19 @@ impl UnconfirmedPool { } } } + if curr_skip_count < self.config.weight_tx_skip_count { + self.check_the_potential_txs( + total_weight, + &mut selected_txs, + &mut curr_weight, + &mut curr_skip_count, + &mut complete_transaction_branch, + &mut potentional_to_add, + &mut depended_on, + &mut recompute, + 0, + ); + } if !transactions_to_remove_and_recheck.is_empty() { // we need to remove all transactions that need to be rechecked. debug!( @@ -237,6 +306,98 @@ impl UnconfirmedPool { Ok(results) } + fn check_the_potential_txs<'a>( + &self, + total_weight: u64, + selected_txs: &mut HashMap>, + curr_weight: &mut u64, + curr_skip_count: &mut usize, + complete_transaction_branch: &mut CompleteTransactionBranch, + potentional_to_add: &mut BinaryHeap<(u64, TransactionKey)>, + depended_on: &mut HashMap>, + recompute: &mut HashSet<&'a TransactionKey>, + fee_per_byte_threshold: u64, + ) { + while match potentional_to_add.peek() { + Some((fee_per_byte, _)) => *fee_per_byte >= fee_per_byte_threshold, + None => false, + } { + // If the current TXs has lower fee than the ones we already processed, we can add some. + let (_fee_per_byte, tx_key) = potentional_to_add.pop().unwrap(); // Safe, we already checked we have some. + if selected_txs.contains_key(&tx_key) { + continue; + } + // Before we do anything with the top transaction we need to know if needs to be recomputed. + if recompute.contains(&tx_key) { + recompute.remove(&tx_key); + // So we recompute the total fees based on updated weights and fees. + let (_, total_transaction_weight, total_transaction_fees) = + complete_transaction_branch.get(&tx_key).unwrap(); + let fee_per_byte = (*total_transaction_fees as f64 / *total_transaction_weight as f64 * 1000.0) as u64; + potentional_to_add.push((fee_per_byte, tx_key)); + continue; + } + let (candidate_transactions_to_select, total_transaction_weight, _total_transaction_fees) = + complete_transaction_branch.remove(&tx_key).unwrap(); + + let total_weight_after_candidates = *curr_weight + total_transaction_weight; + if total_weight_after_candidates <= total_weight { + if !UnconfirmedPool::find_duplicate_input(selected_txs, &candidate_transactions_to_select) { + *curr_weight += total_transaction_weight; + // So we processed the transaction, let's mark the dependents to be recomputed. + for tx_key in candidate_transactions_to_select.keys() { + self.remove_transaction_from_the_dependants( + *tx_key, + complete_transaction_branch, + depended_on, + recompute, + ); + } + selected_txs.extend(candidate_transactions_to_select); + } + } else { + *curr_skip_count += 1; + if *curr_skip_count >= self.config.weight_tx_skip_count { + break; + } + } + // Some cleanup of what we don't need anymore + complete_transaction_branch.remove(&tx_key); + depended_on.remove(&tx_key); + } + } + + pub fn remove_transaction_from_the_dependants<'a>( + &self, + tx_key: TransactionKey, + complete_transaction_branch: &mut CompleteTransactionBranch, + depended_on: &mut HashMap>, + recompute: &mut HashSet<&'a TransactionKey>, + ) { + if let Some(txs) = depended_on.remove(&tx_key) { + let prioritized_transaction = self + .tx_by_key + .get(&tx_key) + .ok_or(UnconfirmedPoolError::StorageOutofSync) + .unwrap(); + for tx in txs { + if let Some(( + update_candidate_transactions_to_select, + update_total_transaction_weight, + update_total_transaction_fees, + )) = complete_transaction_branch.get_mut(tx) + { + update_candidate_transactions_to_select.remove(&tx_key); + *update_total_transaction_weight -= prioritized_transaction.weight; + *update_total_transaction_fees -= prioritized_transaction.transaction.body.get_total_fee().0; + // We mark it as recompute, we don't have to update the Heap, because it will never be + // better as it was (see the note at the top of the function). + recompute.insert(tx); + } + } + } + } + pub fn retrieve_by_excess_sigs(&self, excess_sigs: &[PrivateKey]) -> (Vec>, Vec) { // Hashset used to prevent duplicates let mut found = HashSet::new(); @@ -269,6 +430,7 @@ impl UnconfirmedPool { transactions_to_recheck: &mut Vec<(TransactionKey, Arc)>, selected_txs: &HashMap>, total_weight: &mut u64, + total_fees: &mut u64, _unique_ids: &mut HashSet<[u8; 32]>, ) -> Result<(), UnconfirmedPoolError> { for dependent_output in &transaction.dependent_output_hashes { @@ -282,6 +444,7 @@ impl UnconfirmedPool { transactions_to_recheck, selected_txs, total_weight, + total_fees, _unique_ids, )?; @@ -304,6 +467,7 @@ impl UnconfirmedPool { .insert(transaction.key, transaction.transaction.clone()) .is_none() { + *total_fees += transaction.transaction.body.get_total_fee().0; *total_weight += transaction.weight; } diff --git a/base_layer/core/tests/tests/mempool.rs b/base_layer/core/tests/tests/mempool.rs index 3863cedf76..9df67c4bfc 100644 --- a/base_layer/core/tests/tests/mempool.rs +++ b/base_layer/core/tests/tests/mempool.rs @@ -455,6 +455,106 @@ async fn test_retrieve() { assert!(retrieved_txs.contains(&tx2[1])); } +#[tokio::test] +#[allow(clippy::identity_op)] +async fn test_zero_conf_no_piggyback() { + // This is the scenario described in fetch_highest_priority_txs function. + let network = Network::LocalNet; + let (mut store, mut blocks, mut outputs, consensus_manager, key_manager) = create_new_blockchain(network).await; + let mempool_validator = TransactionChainLinkedValidator::new(store.clone(), consensus_manager.clone()); + let mempool = Mempool::new( + MempoolConfig::default(), + consensus_manager.clone(), + Box::new(mempool_validator), + ); + let txs = vec![txn_schema!( + from: vec![outputs[0][0].clone()], + to: vec![21 * T, 11 * T, 11 * T, 16 * T] + )]; + // "Mine" Block 1 + generate_new_block( + &mut store, + &mut blocks, + &mut outputs, + txs, + &consensus_manager, + &key_manager, + ) + .await + .unwrap(); + mempool.process_published_block(blocks[1].to_arc_block()).await.unwrap(); + + let (tx_d, _tx_d_out) = spend_utxos( + txn_schema!( + from: vec![outputs[1][1].clone()], + to: vec![5 * T, 5 * T], + fee: 12*uT, + lock: 0, + features: OutputFeatures::default() + ), + &key_manager, + ) + .await; + assert_eq!( + mempool.insert(Arc::new(tx_d.clone())).await.unwrap(), + TxStorageResponse::UnconfirmedPool + ); + let (tx_c, tx_c_out) = spend_utxos( + txn_schema!( + from: vec![outputs[1][0].clone()], + to: vec![15 * T, 5 * T], + fee: 14*uT, + lock: 0, + features: OutputFeatures::default() + ), + &key_manager, + ) + .await; + assert_eq!( + mempool.insert(Arc::new(tx_c.clone())).await.unwrap(), + TxStorageResponse::UnconfirmedPool + ); + + let (tx_b, tx_b_out) = spend_utxos( + txn_schema!( + from: vec![tx_c_out[0].clone()], + to: vec![7 * T, 4 * T], + fee: 2*uT, lock: 0, + features: OutputFeatures::default() + ), + &key_manager, + ) + .await; + assert_eq!( + mempool.insert(Arc::new(tx_b.clone())).await.unwrap(), + TxStorageResponse::UnconfirmedPool + ); + let (tx_a, _tx_a_out) = spend_utxos( + txn_schema!( + from: vec![tx_b_out[1].clone()], + to: vec![2 * T, 1 * T], + fee: 20*uT, + lock: 0, + features: OutputFeatures::default() + ), + &key_manager, + ) + .await; + + assert_eq!( + mempool.insert(Arc::new(tx_a.clone())).await.unwrap(), + TxStorageResponse::UnconfirmedPool + ); + + let weight = mempool.stats().await.unwrap().unconfirmed_weight - 1; + let retrieved_txs = mempool.retrieve(weight).await.unwrap(); + assert_eq!(retrieved_txs.len(), 3); + assert!(retrieved_txs.contains(&Arc::new(tx_d))); + assert!(retrieved_txs.contains(&Arc::new(tx_c))); + assert!(retrieved_txs.contains(&Arc::new(tx_b))); + assert!(!retrieved_txs.contains(&Arc::new(tx_a))); +} + #[tokio::test] #[allow(clippy::identity_op)] #[allow(clippy::too_many_lines)] @@ -834,8 +934,8 @@ async fn test_zero_conf() { assert!(retrieved_txs.contains(&Arc::new(tx22))); assert!(retrieved_txs.contains(&Arc::new(tx23))); assert!(retrieved_txs.contains(&Arc::new(tx24))); - assert!(!retrieved_txs.contains(&Arc::new(tx31))); // Missing - assert!(retrieved_txs.contains(&Arc::new(tx32))); + assert!(retrieved_txs.contains(&Arc::new(tx31))); + assert!(!retrieved_txs.contains(&Arc::new(tx32))); // Missing assert!(retrieved_txs.contains(&Arc::new(tx33))); assert!(retrieved_txs.contains(&Arc::new(tx34))); }