Skip to content

Commit

Permalink
chore: hazop code review changes (#5959)
Browse files Browse the repository at this point in the history
Description
---
Some changes from the hazop meeting

Motivation and Context
---
Clean secure code
  • Loading branch information
SWvheerden authored Nov 17, 2023
1 parent 40fc940 commit 9b90c3f
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 107 deletions.
6 changes: 3 additions & 3 deletions applications/minotari_console_wallet/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ pub fn convert_to_transaction_event(event: String, source: TransactionWrapper) -
direction: "inbound".to_string(),
amount: inbound.amount.as_u64(),
message: inbound.message.clone(),
/// The coinbase are technically Inbound.
/// To determine whether a transaction is coinbase
/// we will check whether the message contains `Coinbase`.
// The coinbase are technically Inbound.
// To determine whether a transaction is coinbase
// we will check whether the message contains `Coinbase`.
is_coinbase: inbound.message.to_lowercase().contains("coinbase"),
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ where T: Into<Vec<ListItem<'a>>>
constraints.push(Constraint::Length(1));
let column_areas = Layout::default()
.direction(Direction::Horizontal)
.constraints(constraints.as_ref())
.constraints::<Vec<Constraint>>(constraints)
.margin(1)
.split(area);

Expand Down
34 changes: 34 additions & 0 deletions base_layer/core/src/base_node/comms_interface/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use thiserror::Error;
use crate::{
blocks::{BlockError, BlockHeaderValidationError},
chain_storage::ChainStorageError,
common::{BanPeriod, BanReason},
consensus::ConsensusManagerError,
mempool::MempoolError,
proof_of_work::{monero_rx::MergeMineError, DifficultyError},
Expand Down Expand Up @@ -77,3 +78,36 @@ pub enum CommsInterfaceError {
#[error("Transaction error: {0}")]
TransactionError(#[from] TransactionError),
}

impl CommsInterfaceError {
pub fn get_ban_reason(&self) -> Option<BanReason> {
match self {
err @ CommsInterfaceError::UnexpectedApiResponse |
err @ CommsInterfaceError::RequestTimedOut |
err @ CommsInterfaceError::TransportChannelError(_) => Some(BanReason {
reason: err.to_string(),
ban_duration: BanPeriod::Short,
}),
err @ CommsInterfaceError::InvalidPeerResponse(_) |
err @ CommsInterfaceError::InvalidBlockHeader(_) |
err @ CommsInterfaceError::TransactionError(_) |
err @ CommsInterfaceError::InvalidFullBlock { .. } |
err @ CommsInterfaceError::InvalidRequest { .. } => Some(BanReason {
reason: err.to_string(),
ban_duration: BanPeriod::Long,
}),
CommsInterfaceError::MempoolError(e) => e.get_ban_reason(),
CommsInterfaceError::ChainStorageError(e) => e.get_ban_reason(),
CommsInterfaceError::MergeMineError(e) => e.get_ban_reason(),
CommsInterfaceError::NoBootstrapNodesConfigured |
CommsInterfaceError::OutboundMessageError(_) |
CommsInterfaceError::BroadcastFailed |
CommsInterfaceError::InternalChannelError(_) |
CommsInterfaceError::DifficultyAdjustmentManagerError(_) |
CommsInterfaceError::InternalError(_) |
CommsInterfaceError::ApiError(_) |
CommsInterfaceError::BlockError(_) |
CommsInterfaceError::DifficultyError(_) => None,
}
}
}
60 changes: 9 additions & 51 deletions base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,7 @@

#[cfg(feature = "metrics")]
use std::convert::{TryFrom, TryInto};
use std::{
cmp::max,
collections::HashSet,
sync::Arc,
time::{Duration, Instant},
};
use std::{cmp::max, collections::HashSet, sync::Arc, time::Instant};

use log::*;
use strum_macros::Display;
Expand Down Expand Up @@ -542,7 +537,7 @@ where B: BlockchainBackend + 'static
}

async fn check_exists_and_not_bad_block(&self, block: FixedHash) -> Result<bool, CommsInterfaceError> {
if self.blockchain_db.chain_block_or_orphan_block_exists(block).await? {
if self.blockchain_db.chain_header_or_orphan_exists(block).await? {
debug!(
target: LOG_TARGET,
"Block with hash `{}` already stored",
Expand Down Expand Up @@ -612,13 +607,6 @@ where B: BlockchainBackend + 'static
current_meta.best_block().to_hex(),
source_peer,
);
if excess_sigs.is_empty() {
let block = BlockBuilder::new(header.version)
.with_coinbase_utxo(coinbase_output, coinbase_kernel)
.with_header(header.clone())
.build();
return Ok(block);
}
#[cfg(feature = "metrics")]
metrics::compact_block_tx_misses(header.height).set(excess_sigs.len() as i64);
let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
Expand Down Expand Up @@ -734,18 +722,6 @@ where B: BlockchainBackend + 'static
{
Ok(Some(block)) => Ok(block),
Ok(None) => {
if let Err(e) = self
.connectivity
.ban_peer_until(
source_peer.clone(),
Duration::from_secs(100),
format!("Peer {} failed to return the block that was requested.", source_peer),
)
.await
{
error!(target: LOG_TARGET, "Failed to ban peer: {}", e);
}

debug!(
target: LOG_TARGET,
"Peer `{}` failed to return the block that was requested.", source_peer
Expand All @@ -760,13 +736,6 @@ where B: BlockchainBackend + 'static
target: LOG_TARGET,
"Peer `{}` sent unexpected API response.", source_peer
);
if let Err(e) = self
.connectivity
.ban_peer(source_peer.clone(), "Peer sent invalid API response".to_string())
.await
{
error!(target: LOG_TARGET, "Failed to ban peer: {}", e);
}
Err(CommsInterfaceError::UnexpectedApiResponse)
},
Err(e) => Err(e),
Expand Down Expand Up @@ -834,7 +803,13 @@ where B: BlockchainBackend + 'static
);
let exclude_peers = source_peer.into_iter().collect();
let new_block_msg = NewBlock::from(&*block);
self.outbound_nci.propagate_block(new_block_msg, exclude_peers).await?;
if let Err(e) = self.outbound_nci.propagate_block(new_block_msg, exclude_peers).await {
warn!(
target: LOG_TARGET,
"Failed to propagate block ({}) to network: {}.",
block_hash.to_hex(), e
);
}
}
Ok(block_hash)
},
Expand All @@ -854,23 +829,6 @@ where B: BlockchainBackend + 'static
.unwrap_or_else(|| "<local request>".to_string()),
e
);
match source_peer {
Some(ref source_peer) => {
if let Err(e) = self
.connectivity
.ban_peer(source_peer.clone(), format!("Peer propagated invalid block: {}", e))
.await
{
error!(target: LOG_TARGET, "Failed to ban peer: {}", e);
}
},
// SECURITY: This indicates an issue in the transaction validator.
None => {
#[cfg(feature = "metrics")]
metrics::rejected_local_blocks(block.header.height, &block_hash).inc();
debug!(target: LOG_TARGET, "There may have been an issue in the transaction validator");
},
}
self.publish_block_event(BlockEvent::AddBlockValidationFailed { block, source_peer });
Err(e.into())
},
Expand Down
13 changes: 0 additions & 13 deletions base_layer/core/src/base_node/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,19 +138,6 @@ pub fn rejected_blocks(height: u64, hash: &FixedHash) -> IntCounter {
METER.with_label_values(&[&height.to_string(), &hash.to_hex()])
}

pub fn rejected_local_blocks(height: u64, hash: &FixedHash) -> IntCounter {
static METER: Lazy<IntCounterVec> = Lazy::new(|| {
tari_metrics::register_int_counter_vec(
"base_node::blockchain::rejected_local_blocks",
"Number of local block rejected by the base node",
&["height", "block_hash"],
)
.unwrap()
});

METER.with_label_values(&[&height.to_string(), &hash.to_hex()])
}

pub fn active_sync_peers() -> &'static IntGauge {
static METER: Lazy<IntGauge> = Lazy::new(|| {
tari_metrics::register_int_gauge(
Expand Down
33 changes: 1 addition & 32 deletions base_layer/core/src/base_node/service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,38 +45,7 @@ pub enum BaseNodeServiceError {
impl BaseNodeServiceError {
pub fn get_ban_reason(&self) -> Option<BanReason> {
match self {
BaseNodeServiceError::CommsInterfaceError(comms) => match comms {
err @ CommsInterfaceError::UnexpectedApiResponse | err @ CommsInterfaceError::RequestTimedOut => {
Some(BanReason {
reason: err.to_string(),
ban_duration: BanPeriod::Short,
})
},
err @ CommsInterfaceError::InvalidPeerResponse(_) |
err @ CommsInterfaceError::InvalidBlockHeader(_) |
err @ CommsInterfaceError::TransactionError(_) |
err @ CommsInterfaceError::InvalidFullBlock { .. } |
err @ CommsInterfaceError::InvalidRequest { .. } => Some(BanReason {
reason: err.to_string(),
ban_duration: BanPeriod::Long,
}),
CommsInterfaceError::MempoolError(e) => e.get_ban_reason(),
CommsInterfaceError::TransportChannelError(e) => Some(BanReason {
reason: e.to_string(),
ban_duration: BanPeriod::Short,
}),
CommsInterfaceError::ChainStorageError(e) => e.get_ban_reason(),
CommsInterfaceError::MergeMineError(e) => e.get_ban_reason(),
CommsInterfaceError::NoBootstrapNodesConfigured |
CommsInterfaceError::OutboundMessageError(_) |
CommsInterfaceError::BroadcastFailed |
CommsInterfaceError::InternalChannelError(_) |
CommsInterfaceError::DifficultyAdjustmentManagerError(_) |
CommsInterfaceError::InternalError(_) |
CommsInterfaceError::ApiError(_) |
CommsInterfaceError::BlockError(_) |
CommsInterfaceError::DifficultyError(_) => None,
},
BaseNodeServiceError::CommsInterfaceError(e) => e.get_ban_reason(),
BaseNodeServiceError::DhtOutboundError(_) => None,
err @ BaseNodeServiceError::InvalidRequest(_) |
err @ BaseNodeServiceError::InvalidResponse(_) |
Expand Down
6 changes: 0 additions & 6 deletions base_layer/core/src/blocks/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ pub struct BlockBuilder {
inputs: Vec<TransactionInput>,
outputs: Vec<TransactionOutput>,
kernels: Vec<TransactionKernel>,
total_fee: MicroMinotari,
}

impl BlockBuilder {
Expand All @@ -172,7 +171,6 @@ impl BlockBuilder {
inputs: Vec::new(),
outputs: Vec::new(),
kernels: Vec::new(),
total_fee: MicroMinotari::from(0),
}
}

Expand All @@ -196,10 +194,6 @@ impl BlockBuilder {

/// This function adds the provided transaction kernels to the block WITHOUT updating kernel_mmr_size in the header
pub fn add_kernels(mut self, mut kernels: Vec<TransactionKernel>) -> Self {
for kernel in &kernels {
// Saturating add is used here to prevent overflow; invalid fees will be caught by block validation
self.total_fee = self.total_fee.saturating_add(kernel.fee);
}
self.kernels.append(&mut kernels);
self
}
Expand Down
2 changes: 2 additions & 0 deletions base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {

make_async_fn!(chain_block_or_orphan_block_exists(block_hash: BlockHash) -> bool, "block_exists");

make_async_fn!(chain_header_or_orphan_exists(block_hash: BlockHash) -> bool, "header_exists");

make_async_fn!(bad_block_exists(block_hash: BlockHash) -> bool, "bad_block_exists");

make_async_fn!(add_bad_block(hash: BlockHash, height: u64) -> (), "add_bad_block");
Expand Down
12 changes: 11 additions & 1 deletion base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,9 @@ where B: BlockchainBackend
after_lock - before_lock,
);

// If this is true, we already got the header in our database due to header-sync, between us starting the
// process of processing an incoming block and now getting a write-lock on the database. Block-sync will
// download the body for us, so we can safely exit here.
if db.contains(&DbKey::HeaderHash(block_hash))? {
return Ok(BlockAddResult::BlockExists);
}
Expand Down Expand Up @@ -1121,6 +1124,12 @@ where B: BlockchainBackend
Ok(db.fetch_block_accumulated_data(&hash)?.is_some() || db.contains(&DbKey::OrphanBlock(hash))?)
}

/// Returns true if this block header in the chain, or is orphaned.
pub fn chain_header_or_orphan_exists(&self, hash: BlockHash) -> Result<bool, ChainStorageError> {
let db = self.db_read_access()?;
Ok(db.contains(&DbKey::HeaderHash(hash))? || db.contains(&DbKey::OrphanBlock(hash))?)
}

/// Returns true if this block exists in the chain, or is orphaned.
pub fn bad_block_exists(&self, hash: BlockHash) -> Result<bool, ChainStorageError> {
let db = self.db_read_access()?;
Expand Down Expand Up @@ -1328,7 +1337,7 @@ pub fn calculate_mmr_roots<T: BlockchainBackend>(
)
} else {
// MR is unchanged except for epoch boundary
let tip_header = fetch_header(db, block_height - 1)?;
let tip_header = fetch_header(db, block_height.saturating_sub(1))?;
(tip_header.validator_node_mr, 0)
};

Expand Down Expand Up @@ -2389,6 +2398,7 @@ fn prune_to_height<T: BlockchainBackend>(db: &mut T, target_horizon_height: u64)

txn.prune_outputs_spent_at_hash(*header.hash());
txn.delete_all_inputs_in_block(*header.hash());
// Write the transaction periodically so it wont run into the transaction size limit. 100 was a safe limit.
if txn.operations().len() >= 100 {
txn.set_pruned_height(block_to_prune);
db.write(mem::take(&mut txn))?;
Expand Down
1 change: 1 addition & 0 deletions base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2152,6 +2152,7 @@ impl BlockchainBackend for LMDBDatabase {
})?;
}

// Sort the orphans by age, oldest first
orphans.sort_by(|a, b| a.0.cmp(&b.0));
let mut txn = DbTransaction::new();
for (removed_count, (height, block_hash)) in orphans.into_iter().enumerate() {
Expand Down
1 change: 1 addition & 0 deletions comms/dht/src/network_discovery/discovering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl Discovering {
})
.await?;
let mut counter = 0;
#[allow(clippy::mutable_key_type)]
let mut peers_received = HashSet::new();
while let Some(resp) = stream.next().await {
counter += 1;
Expand Down

0 comments on commit 9b90c3f

Please sign in to comment.