From db9eb9641836f6bf3e878a3065c08661a4c57254 Mon Sep 17 00:00:00 2001 From: David Main <51991544+StriderDM@users.noreply.github.com> Date: Mon, 18 Oct 2021 15:54:37 +0200 Subject: [PATCH 1/3] feat: improve logging for tari_mining_node (#3449) Description --- This PR improves the logging for tari_mining_node by making use of the log4rs target, it also sets the default logging level to info. This PR also adds basic statistics for pool mining which are automatically displayed at a 20s interval. Motivation and Context --- How Has This Been Tested? --- Manually --- applications/tari_mining_node/src/main.rs | 83 +++++++++----- applications/tari_mining_node/src/miner.rs | 26 +++-- .../src/stratum/controller.rs | 104 ++++++++++++------ .../tari_mining_node/src/stratum/mod.rs | 1 + .../stratum/stratum_controller/controller.rs | 90 +++++++++++++-- .../src/stratum/stratum_miner/miner.rs | 73 ++++++++---- .../src/stratum/stratum_statistics/mod.rs | 1 + .../src/stratum/stratum_statistics/stats.rs | 100 +++++++++++++++++ .../src/stratum/stratum_types/mod.rs | 2 +- .../src/stratum/stratum_types/rpc_error.rs | 2 +- .../{worker_status.rs => submit_response.rs} | 11 +- common/logging/log4rs_sample_mining_node.yml | 7 +- 12 files changed, 390 insertions(+), 110 deletions(-) create mode 100644 applications/tari_mining_node/src/stratum/stratum_statistics/mod.rs create mode 100644 applications/tari_mining_node/src/stratum/stratum_statistics/stats.rs rename applications/tari_mining_node/src/stratum/stratum_types/{worker_status.rs => submit_response.rs} (91%) diff --git a/applications/tari_mining_node/src/main.rs b/applications/tari_mining_node/src/main.rs index 687b1ce52b3..568bc67eb99 100644 --- a/applications/tari_mining_node/src/main.rs +++ b/applications/tari_mining_node/src/main.rs @@ -33,7 +33,11 @@ mod utils; use crate::{ miner::MiningReport, - stratum::{stratum_controller::controller::Controller, stratum_miner::miner::StratumMiner}, + stratum::{ + stratum_controller::controller::Controller, + stratum_miner::miner::StratumMiner, + stratum_statistics::stats::Statistics, + }, }; use errors::{err_empty, MinerError}; use miner::Miner; @@ -42,6 +46,7 @@ use std::{ sync::{ atomic::{AtomicBool, Ordering}, Arc, + RwLock, }, thread, time::Instant, @@ -61,6 +66,9 @@ use tokio::{runtime::Runtime, time::sleep}; use tonic::transport::Channel; use utils::{coinbase_request, extract_outputs_and_kernels}; +pub const LOG_TARGET: &str = "tari_mining_node::miner::main"; +pub const LOG_TARGET_FILE: &str = "tari_mining_node::logging::miner::main"; + /// Application entry point fn main() { let rt = Runtime::new().expect("Failed to start tokio runtime"); @@ -68,7 +76,7 @@ fn main() { Ok(_) => std::process::exit(0), Err(exit_code) => { eprintln!("Fatal error: {:?}", exit_code); - error!("Exiting with code: {:?}", exit_code); + error!(target: LOG_TARGET, "Exiting with code: {:?}", exit_code); std::process::exit(exit_code.as_i32()) }, } @@ -83,8 +91,8 @@ async fn main_inner() -> Result<(), ExitCodes> { config.mining_worker_name = global.mining_worker_name.clone(); config.mining_wallet_address = global.mining_wallet_address.clone(); config.mining_pool_address = global.mining_pool_address.clone(); - debug!("{:?}", bootstrap); - debug!("{:?}", config); + debug!(target: LOG_TARGET_FILE, "{:?}", bootstrap); + debug!(target: LOG_TARGET_FILE, "{:?}", config); if !config.mining_wallet_address.is_empty() && !config.mining_pool_address.is_empty() { let url = config.mining_pool_address.clone(); @@ -94,18 +102,25 @@ async fn main_inner() -> Result<(), ExitCodes> { if !config.mining_worker_name.is_empty() { miner_address += &format!("{}{}", ".", &config.mining_worker_name); } - let mut mc = Controller::new().unwrap_or_else(|e| { + let stats = Arc::new(RwLock::new(Statistics::default())); + let mut mc = Controller::new(stats.clone()).unwrap_or_else(|e| { + debug!(target: LOG_TARGET_FILE, "Error loading mining controller: {}", e); panic!("Error loading mining controller: {}", e); }); - let cc = stratum::controller::Controller::new(&url, Some(miner_address), None, None, mc.tx.clone()) - .unwrap_or_else(|e| { - panic!("Error loading stratum client controller: {:?}", e); - }); + let cc = + stratum::controller::Controller::new(&url, Some(miner_address), None, None, mc.tx.clone(), stats.clone()) + .unwrap_or_else(|e| { + debug!( + target: LOG_TARGET_FILE, + "Error loading stratum client controller: {:?}", e + ); + panic!("Error loading stratum client controller: {:?}", e); + }); let miner_stopped = Arc::new(AtomicBool::new(false)); let client_stopped = Arc::new(AtomicBool::new(false)); mc.set_client_tx(cc.tx.clone()); - let mut miner = StratumMiner::new(config); + let mut miner = StratumMiner::new(config, stats); if let Err(e) = miner.start_solvers() { println!("Error. Please check logs for further info."); println!("Error details:"); @@ -118,7 +133,7 @@ async fn main_inner() -> Result<(), ExitCodes> { .name("mining_controller".to_string()) .spawn(move || { if let Err(e) = mc.run(miner) { - error!("Error. Please check logs for further info: {:?}", e); + error!(target: LOG_TARGET, "Error. Please check logs for further info: {:?}", e); return; } miner_stopped_internal.store(true, Ordering::Relaxed); @@ -142,19 +157,21 @@ async fn main_inner() -> Result<(), ExitCodes> { Ok(()) } else { config.mine_on_tip_only = global.mine_on_tip_only; - debug!("mine_on_tip_only is {}", config.mine_on_tip_only); - + debug!( + target: LOG_TARGET_FILE, + "mine_on_tip_only is {}", config.mine_on_tip_only + ); let (mut node_conn, mut wallet_conn) = connect(&config, &global).await.map_err(ExitCodes::grpc)?; let mut blocks_found: u64 = 0; loop { - debug!("Starting new mining cycle"); + debug!(target: LOG_TARGET_FILE, "Starting new mining cycle"); match mining_cycle(&mut node_conn, &mut wallet_conn, &config, &bootstrap).await { err @ Err(MinerError::GrpcConnection(_)) | err @ Err(MinerError::GrpcStatus(_)) => { // Any GRPC error we will try to reconnect with a standard delay - error!("Connection error: {:?}", err); + error!(target: LOG_TARGET, "Connection error: {:?}", err); loop { - debug!("Holding for {:?}", config.wait_timeout()); + debug!(target: LOG_TARGET_FILE, "Holding for {:?}", config.wait_timeout()); sleep(config.wait_timeout()).await; match connect(&config, &global).await { Ok((nc, wc)) => { @@ -163,22 +180,28 @@ async fn main_inner() -> Result<(), ExitCodes> { break; }, Err(err) => { - error!("Connection error: {:?}", err); + error!(target: LOG_TARGET, "Connection error: {:?}", err); continue; }, } } }, Err(MinerError::MineUntilHeightReached(h)) => { - info!("Prescribed blockchain height {} reached. Aborting ...", h); + info!( + target: LOG_TARGET, + "Prescribed blockchain height {} reached. Aborting ...", h + ); return Ok(()); }, Err(MinerError::MinerLostBlock(h)) => { - info!("Height {} already mined by other node. Restarting ...", h); + info!( + target: LOG_TARGET, + "Height {} already mined by other node. Restarting ...", h + ); }, Err(err) => { - error!("Error: {:?}", err); - debug!("Holding for {:?}", config.wait_timeout()); + error!(target: LOG_TARGET, "Error: {:?}", err); + debug!(target: LOG_TARGET_FILE, "Holding for {:?}", config.wait_timeout()); sleep(config.wait_timeout()).await; }, Ok(submitted) => { @@ -201,10 +224,10 @@ async fn connect( global: &GlobalConfig, ) -> Result<(BaseNodeClient, WalletClient), MinerError> { let base_node_addr = config.base_node_addr(global); - info!("Connecting to base node at {}", base_node_addr); + info!(target: LOG_TARGET, "Connecting to base node at {}", base_node_addr); let node_conn = BaseNodeClient::connect(base_node_addr.clone()).await?; let wallet_addr = config.wallet_addr(global); - info!("Connecting to wallet at {}", wallet_addr); + info!(target: LOG_TARGET, "Connecting to wallet at {}", wallet_addr); let wallet_conn = WalletClient::connect(wallet_addr.clone()).await?; Ok((node_conn, wallet_conn)) @@ -267,8 +290,8 @@ async fn mining_cycle( if report.difficulty < min_diff { submit = false; debug!( - "Mined difficulty {} below minimum difficulty {}. Not submitting.", - report.difficulty, min_diff + target: LOG_TARGET_FILE, + "Mined difficulty {} below minimum difficulty {}. Not submitting.", report.difficulty, min_diff ); } } @@ -276,8 +299,10 @@ async fn mining_cycle( if report.difficulty > max_diff { submit = false; debug!( + target: LOG_TARGET_FILE, "Mined difficulty {} greater than maximum difficulty {}. Not submitting.", - report.difficulty, max_diff + report.difficulty, + max_diff ); } } @@ -285,8 +310,11 @@ async fn mining_cycle( // Mined a block fitting the difficulty let block_header = BlockHeader::try_from(header.clone()).map_err(MinerError::Conversion)?; info!( + target: LOG_TARGET, "Miner {} found block header {} with difficulty {:?}", - report.miner, block_header, report.difficulty, + report.miner, + block_header, + report.difficulty, ); let mut mined_block = block.clone(); mined_block.header = Some(header); @@ -313,6 +341,7 @@ async fn mining_cycle( async fn display_report(report: &MiningReport, config: &MinerConfig) { let hashrate = report.hashes as f64 / report.elapsed.as_micros() as f64; debug!( + target: LOG_TARGET_FILE, "Miner {} reported {:.2}MH/s with total {:.2}MH/s over {} threads. Height: {}. Target: {})", report.miner, hashrate, diff --git a/applications/tari_mining_node/src/miner.rs b/applications/tari_mining_node/src/miner.rs index e2bc4352a99..21f7b891f3e 100644 --- a/applications/tari_mining_node/src/miner.rs +++ b/applications/tari_mining_node/src/miner.rs @@ -33,6 +33,8 @@ use std::{ use tari_app_grpc::{conversions::timestamp, tari_rpc::BlockHeader}; use thread::JoinHandle; +pub const LOG_TARGET: &str = "tari_mining_node::miner::standalone"; + // Identify how often mining thread is reporting / checking context // ~400_000 hashes per second const REPORTING_FREQUENCY: u64 = 3_000_000; @@ -107,20 +109,20 @@ impl Stream for Miner { type Item = MiningReport; fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { - trace!("Polling Miner"); + trace!(target: LOG_TARGET, "Polling Miner"); // First poll would start all the threads passing async context waker if self.threads.is_empty() && self.num_threads > 0 { debug!( - "Starting {} mining threads for target difficulty {}", - self.num_threads, self.target_difficulty + target: LOG_TARGET, + "Starting {} mining threads for target difficulty {}", self.num_threads, self.target_difficulty ); self.start_threads(ctx); return Poll::Pending; } else if self.num_threads == 0 { - error!("Cannot mine: no mining threads"); + error!(target: LOG_TARGET, "Cannot mine: no mining threads"); return Poll::Ready(None); } else if self.channels.is_empty() { - debug!("Finished mining"); + debug!(target: LOG_TARGET, "Finished mining"); return Poll::Ready(None); } @@ -167,14 +169,14 @@ pub fn mining_task( let mut hasher = BlockHeaderSha3::new(header).unwrap(); hasher.random_nonce(); // We're mining over here! - info!("Mining thread {} started", miner); + info!(target: LOG_TARGET, "Mining thread {} started", miner); // Mining work loop { let difficulty = hasher.difficulty(); if difficulty >= target_difficulty { debug!( - "Miner {} found nonce {} with matching difficulty {}", - miner, hasher.nonce, difficulty + target: LOG_TARGET, + "Miner {} found nonce {} with matching difficulty {}", miner, hasher.nonce, difficulty ); if let Err(err) = sender.try_send(MiningReport { miner, @@ -186,10 +188,10 @@ pub fn mining_task( header: Some(hasher.into_header()), target_difficulty, }) { - error!("Miner {} failed to send report: {}", miner, err); + error!(target: LOG_TARGET, "Miner {} failed to send report: {}", miner, err); } waker.wake(); - info!("Mining thread {} stopped", miner); + info!(target: LOG_TARGET, "Mining thread {} stopped", miner); return; } if hasher.nonce % REPORTING_FREQUENCY == 0 { @@ -204,9 +206,9 @@ pub fn mining_task( target_difficulty, }); waker.clone().wake(); - trace!("Reporting from {} result {:?}", miner, res); + trace!(target: LOG_TARGET, "Reporting from {} result {:?}", miner, res); if let Err(TrySendError::Disconnected(_)) = res { - info!("Mining thread {} disconnected", miner); + info!(target: LOG_TARGET, "Mining thread {} disconnected", miner); return; } hasher.set_forward_timestamp(timestamp().seconds as u64); diff --git a/applications/tari_mining_node/src/stratum/controller.rs b/applications/tari_mining_node/src/stratum/controller.rs index 1fb6e76256d..288e94157ec 100644 --- a/applications/tari_mining_node/src/stratum/controller.rs +++ b/applications/tari_mining_node/src/stratum/controller.rs @@ -20,16 +20,19 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // -use crate::stratum::{error::Error, stratum_types as types, stream::Stream}; +use crate::stratum::{error::Error, stratum_statistics::stats, stratum_types as types, stream::Stream}; use log::*; use std::{ self, io::{BufRead, ErrorKind, Write}, - sync::mpsc, + sync::{mpsc, Arc, RwLock}, thread, }; +pub const LOG_TARGET: &str = "tari_mining_node::miner::stratum::controller"; +pub const LOG_TARGET_FILE: &str = "tari_mining_node::logging::miner::stratum::controller"; + pub struct Controller { server_url: String, server_login: Option, @@ -40,6 +43,7 @@ pub struct Controller { pub tx: mpsc::Sender, miner_tx: mpsc::Sender, last_request_id: String, + stats: Arc>, } // fn invalid_error_response() -> types::RpcError { @@ -56,6 +60,7 @@ impl Controller { server_password: Option, server_tls_enabled: Option, miner_tx: mpsc::Sender, + stats: Arc>, ) -> Result { let (tx, rx) = mpsc::channel::(); Ok(Controller { @@ -68,6 +73,7 @@ impl Controller { rx, miner_tx, last_request_id: "".to_string(), + stats, }) } @@ -96,7 +102,7 @@ impl Controller { Err(ref e) if e.kind() == ErrorKind::BrokenPipe => Err(Error::Connection("broken pipe".to_string())), Err(ref e) if e.kind() == ErrorKind::WouldBlock => Ok(None), Err(e) => { - error!("Communication error with stratum server: {}", e); + error!(target: LOG_TARGET, "Communication error with stratum server: {}", e); Err(Error::Connection("broken pipe".to_string())) }, } @@ -106,7 +112,7 @@ impl Controller { if self.stream.is_none() { return Err(Error::Connection(String::from("No server connection"))); } - debug!("sending request: {}", message); + debug!(target: LOG_TARGET_FILE, "sending request: {}", message); let _ = self.stream.as_mut().unwrap().write(message.as_bytes()); let _ = self.stream.as_mut().unwrap().write(b"\n"); let _ = self.stream.as_mut().unwrap().flush(); @@ -172,7 +178,10 @@ impl Controller { } fn send_message_submit(&mut self, job_id: u64, hash: String, nonce: u64) -> Result<(), Error> { - info!("Submitting Solution with hash {} and nonce {}", hash, nonce); + info!( + target: LOG_TARGET, + "Submitting share with hash {} and nonce {}", hash, nonce + ); let params_in = types::submit_params::SubmitParams { id: self.last_request_id.to_string(), job_id, @@ -211,15 +220,15 @@ impl Controller { } pub fn handle_request(&mut self, req: types::rpc_request::RpcRequest) -> Result<(), Error> { - debug!("Received request type: {}", req.method); + debug!(target: LOG_TARGET_FILE, "Received request type: {}", req.method); match req.method.as_str() { "job" => match req.params { None => Err(Error::Request("No params in job request".to_owned())), Some(params) => { let job = serde_json::from_value::(params)?; info!( - "Got a new job for height {} with target difficulty {}", - job.height, job.target + target: LOG_TARGET, + "Got a new job for height {} with target difficulty {}", job.height, job.target ); self.send_miner_job(job) }, @@ -228,13 +237,31 @@ impl Controller { } } + fn handle_error(&mut self, error: types::rpc_error::RpcError) { + if vec![-1, 24].contains(&error.code) { + // unauthorized + let _ = self.send_login(); + } else if vec![21, 20, 22, 23, 25].contains(&error.code) { + // problem with template + let _ = self.send_message_get_job_template(); + } + } + + #[allow(clippy::cognitive_complexity)] pub fn handle_response(&mut self, res: types::rpc_response::RpcResponse) -> Result<(), Error> { - debug!("Received response with id: {}", res.id); + debug!(target: LOG_TARGET_FILE, "Received response with id: {}", res.id); match res.result { Some(result) => { let login_response = serde_json::from_value::(result.clone()); if let Ok(st) = login_response { - info!("Successful login to server, worker identifier is {}", st.id); + info!( + target: LOG_TARGET, + "Successful login to server, worker identifier is {}", st.id + ); + info!( + target: LOG_TARGET, + "Got a new job for height {} with target difficulty {}", st.job.height, st.job.target + ); self.last_request_id = st.id; let _ = self.send_miner_job(st.job); return Ok(()); @@ -242,31 +269,42 @@ impl Controller { let job_response = serde_json::from_value::(result.clone()); if let Ok(st) = job_response { info!( - "Got a new job for height {} with target difficulty {}", - st.height, st.target + target: LOG_TARGET, + "Got a new job for height {} with target difficulty {}", st.height, st.target ); let _ = self.send_miner_job(st); return Ok(()); }; + let submit_response = serde_json::from_value::(result.clone()); + if let Ok(st) = submit_response { + let error = st.error; + if let Some(error) = error { + // rejected share + self.handle_error(error.clone()); + info!(target: LOG_TARGET, "Rejected"); + debug!(target: LOG_TARGET_FILE, "Share rejected: {:?}", error); + let mut stats = self.stats.write().unwrap(); + stats.mining_stats.solution_stats.rejected += 1; + } else { + // accepted share + info!(target: LOG_TARGET, "Accepted"); + debug!(target: LOG_TARGET_FILE, "Share accepted: {:?}", st.status); + } + return Ok(()); + } let rpc_response = serde_json::from_value::(result); if let Ok(st) = rpc_response { let error = st.error; if let Some(error) = error { - if vec![-1, 24].contains(&error.code) { - // unauthorized - let _ = self.send_login(); - } else if vec![21, 20, 22, 23, 25].contains(&error.code) { - // problem with template - let _ = self.send_message_get_job_template(); - } - } else { - info!("{:?}", st.result); + self.handle_error(error); } return Ok(()); + } else { + debug!(target: LOG_TARGET_FILE, "RPC Response: {:?}", rpc_response); }; }, None => { - error!("{:?}", res); + error!(target: LOG_TARGET, "RPC error: {:?}", res); }, } Ok(()) @@ -298,7 +336,7 @@ impl Controller { self.stream = None; } else { let status = format!("Connection Status: Connected to server at {}.", self.server_url); - info!("{}", status); + info!(target: LOG_TARGET, "{}", status); } next_server_retry = time::get_time().sec + server_retry_interval; if self.stream.is_none() { @@ -319,41 +357,41 @@ impl Controller { Ok(Some(m)) => { // figure out what kind of message, // and dispatch appropriately - debug!("Received message: {}", m); + debug!(target: LOG_TARGET_FILE, "Received message: {}", m); // Deserialize to see what type of object it is if let Ok(v) = serde_json::from_str::(&m) { // Is this a response or request? if v["method"] == "job" { // this is a request match serde_json::from_str::(&m) { - Err(e) => error!("Error parsing request {} : {:?}", m, e), + Err(e) => error!(target: LOG_TARGET, "Error parsing request {} : {:?}", m, e), Ok(request) => { if let Err(err) = self.handle_request(request) { - error!("Error handling request {} : :{:?}", m, err) + error!(target: LOG_TARGET, "Error handling request {} : :{:?}", m, err) } }, } } else { // this is a response match serde_json::from_str::(&m) { - Err(e) => error!("Error parsing response {} : {:?}", m, e), + Err(e) => error!(target: LOG_TARGET, "Error parsing response {} : {:?}", m, e), Ok(response) => { if let Err(err) = self.handle_response(response) { - error!("Error handling response {} : :{:?}", m, err) + error!(target: LOG_TARGET, "Error handling response {} : :{:?}", m, err) } }, } } continue; } else { - error!("Error parsing message: {}", m) + error!(target: LOG_TARGET, "Error parsing message: {}", m) } }, Ok(None) => { // noop, nothing to read for this interval }, Err(e) => { - error!("Error reading message: {:?}", e); + error!(target: LOG_TARGET, "Error reading message: {:?}", e); self.stream = None; continue; }, @@ -364,19 +402,19 @@ impl Controller { // Talk to the miner algorithm while let Some(message) = self.rx.try_iter().next() { - debug!("Client received message: {:?}", message); + debug!(target: LOG_TARGET_FILE, "Client received message: {:?}", message); let result = match message { types::client_message::ClientMessage::FoundSolution(job_id, hash, nonce) => { self.send_message_submit(job_id, hash, nonce) }, types::client_message::ClientMessage::KeepAlive => self.send_keepalive(), types::client_message::ClientMessage::Shutdown => { - debug!("Shutting down client controller"); + debug!(target: LOG_TARGET_FILE, "Shutting down client controller"); return; }, }; if let Err(e) = result { - error!("Mining Controller Error {:?}", e); + error!(target: LOG_TARGET, "Mining Controller Error {:?}", e); self.stream = None; } } diff --git a/applications/tari_mining_node/src/stratum/mod.rs b/applications/tari_mining_node/src/stratum/mod.rs index b426a7c4c5e..b71d5ca82be 100644 --- a/applications/tari_mining_node/src/stratum/mod.rs +++ b/applications/tari_mining_node/src/stratum/mod.rs @@ -24,5 +24,6 @@ pub mod controller; pub mod error; pub mod stratum_controller; pub mod stratum_miner; +pub mod stratum_statistics; pub mod stratum_types; pub mod stream; diff --git a/applications/tari_mining_node/src/stratum/stratum_controller/controller.rs b/applications/tari_mining_node/src/stratum/stratum_controller/controller.rs index f40e432e6f1..b7315600b7c 100644 --- a/applications/tari_mining_node/src/stratum/stratum_controller/controller.rs +++ b/applications/tari_mining_node/src/stratum/stratum_controller/controller.rs @@ -22,10 +22,19 @@ // use crate::{ stratum, - stratum::{stratum_miner::miner::StratumMiner, stratum_types as types}, + stratum::{stratum_miner::miner::StratumMiner, stratum_statistics::stats, stratum_types as types}, }; use log::*; -use std::{self, sync::mpsc, thread, time::SystemTime}; +use std::{ + self, + sync::{mpsc, Arc, RwLock}, + thread, + time::{Duration, SystemTime}, +}; + +pub const LOG_TARGET: &str = "tari_mining_node::miner::stratum::controller"; +pub const LOG_TARGET_FILE: &str = "tari_mining_node::logging::miner::stratum::controller"; +const REPORTING_FREQUENCY: u64 = 20; pub struct Controller { rx: mpsc::Receiver, @@ -35,10 +44,12 @@ pub struct Controller { current_job_id: u64, current_blob: String, keep_alive_time: SystemTime, + stats: Arc>, + elapsed: SystemTime, } impl Controller { - pub fn new() -> Result { + pub fn new(stats: Arc>) -> Result { let (tx, rx) = mpsc::channel::(); Ok(Controller { rx, @@ -48,6 +59,8 @@ impl Controller { current_job_id: 0, current_blob: "".to_string(), keep_alive_time: SystemTime::now(), + stats, + elapsed: SystemTime::now(), }) } @@ -55,10 +68,66 @@ impl Controller { self.client_tx = Some(client_tx); } + fn display_stats(&mut self, elapsed: Duration) { + let mut stats = self.stats.write().unwrap(); + debug!(target: LOG_TARGET_FILE, "{:?}", stats.mining_stats); + info!( + target: LOG_TARGET, + "{}", + "--------------- Mining Statistics ---------------".to_string() + ); + info!( + target: LOG_TARGET, + "{}", + format!("Number of solver threads: {}", stats.mining_stats.solvers) + ); + if stats.mining_stats.solution_stats.found > 0 { + info!( + target: LOG_TARGET, + "{}", + format!( + "Estimated combined solver share rate: {:.1$} (S/s)", + stats.mining_stats.sols(), + 5 + ) + ); + } + info!( + target: LOG_TARGET, + "{}", + format!( + "Combined solver hash rate: {:.1$} (Mh/s)", + stats.mining_stats.hash_rate(elapsed), + 5 + ) + ); + info!( + target: LOG_TARGET, + "{}", + format!( + "Shares found: {}, accepted: {}, rejected: {}", + stats.mining_stats.solution_stats.found, + stats.mining_stats.solution_stats.found - stats.mining_stats.solution_stats.rejected, + stats.mining_stats.solution_stats.rejected + ) + ); + info!( + target: LOG_TARGET, + "{}", + "-------------------------------------------------".to_string() + ); + } + pub fn run(&mut self, mut miner: StratumMiner) -> Result<(), stratum::error::Error> { loop { + if let Ok(report) = self.elapsed.elapsed() { + if report.as_secs() >= REPORTING_FREQUENCY { + self.display_stats(report); + self.elapsed = SystemTime::now(); + } + } while let Some(message) = self.rx.try_iter().next() { - debug!("Miner received message: {:?}", message); + debug!(target: LOG_TARGET_FILE, "Miner received message: {:?}", message); let result: Result<(), stratum::error::Error> = match message { types::miner_message::MinerMessage::ReceivedJob(height, job_id, diff, blob) => { self.current_height = height; @@ -72,24 +141,27 @@ impl Controller { ) }, types::miner_message::MinerMessage::StopJob => { - debug!("Stopping jobs"); + debug!(target: LOG_TARGET_FILE, "Stopping jobs"); miner.pause_solvers(); Ok(()) }, types::miner_message::MinerMessage::ResumeJob => { - debug!("Resuming jobs"); + debug!(target: LOG_TARGET_FILE, "Resuming jobs"); miner.resume_solvers(); Ok(()) }, types::miner_message::MinerMessage::Shutdown => { - debug!("Stopping jobs and Shutting down mining controller"); + debug!( + target: LOG_TARGET_FILE, + "Stopping jobs and Shutting down mining controller" + ); miner.stop_solvers(); miner.wait_for_solver_shutdown(); Ok(()) }, }; if let Err(e) = result { - error!("Mining Controller Error {:?}", e); + error!(target: LOG_TARGET, "Mining Controller Error {:?}", e); } } @@ -103,6 +175,8 @@ impl Controller { ss.job_id, ss.hash, ss.nonce, )); self.keep_alive_time = SystemTime::now(); + let mut stats = self.stats.write().unwrap(); + stats.mining_stats.solution_stats.found += 1; } else if self.keep_alive_time.elapsed().unwrap().as_secs() >= 30 { self.keep_alive_time = SystemTime::now(); let _ = self diff --git a/applications/tari_mining_node/src/stratum/stratum_miner/miner.rs b/applications/tari_mining_node/src/stratum/stratum_miner/miner.rs index 50dc44b6c7c..303edb9f09d 100644 --- a/applications/tari_mining_node/src/stratum/stratum_miner/miner.rs +++ b/applications/tari_mining_node/src/stratum/stratum_miner/miner.rs @@ -24,11 +24,14 @@ use crate::{ config::MinerConfig, difficulty::BlockHeaderSha3, stratum, - stratum::stratum_miner::{ - control_message::ControlMessage, - job_shared_data::{JobSharedData, JobSharedDataType}, - solution::Solution, - solver_instance::SolverInstance, + stratum::{ + stratum_miner::{ + control_message::ControlMessage, + job_shared_data::{JobSharedData, JobSharedDataType}, + solution::Solution, + solver_instance::SolverInstance, + }, + stratum_statistics::stats::Statistics, }, }; use log::*; @@ -37,22 +40,31 @@ use std::{ sync::{mpsc, Arc, RwLock}, thread, time, + time::{Duration, SystemTime}, }; use tari_core::{ blocks::BlockHeader, crypto::tari_utilities::{hex::Hex, Hashable}, }; +pub const LOG_TARGET: &str = "tari_mining_node::miner::stratum::controller"; +pub const LOG_TARGET_FILE: &str = "tari_mining_node::logging::miner::stratum::controller"; + +fn calculate_sols(elapsed: Duration) -> f64 { + 1.0 / ((elapsed.as_secs() * 1_000_000_000 + elapsed.subsec_nanos() as u64) as f64 / 1_000_000_000.0) +} + pub struct StratumMiner { config: MinerConfig, pub shared_data: Arc>, control_txs: Vec>, solver_loop_txs: Vec>, solver_stopped_rxs: Vec>, + stats: Arc>, } impl StratumMiner { - pub fn new(config: MinerConfig) -> StratumMiner { + pub fn new(config: MinerConfig, stats: Arc>) -> StratumMiner { let threads = config.num_mining_threads; StratumMiner { config, @@ -60,6 +72,7 @@ impl StratumMiner { control_txs: vec![], solver_loop_txs: vec![], solver_stopped_rxs: vec![], + stats, } } @@ -70,19 +83,20 @@ impl StratumMiner { control_rx: mpsc::Receiver, solver_loop_rx: mpsc::Receiver, solver_stopped_tx: mpsc::Sender, + statistics: Arc>, ) { let stop_handle = thread::spawn(move || loop { while let Some(message) = control_rx.iter().next() { match message { ControlMessage::Stop => { - info!("Stopping Solvers"); + debug!(target: LOG_TARGET_FILE, "Stopping Solvers"); return; }, ControlMessage::Pause => { - info!("Pausing Solvers"); + debug!(target: LOG_TARGET_FILE, "Pausing Solvers"); }, ControlMessage::Resume => { - info!("Resuming Solvers"); + debug!(target: LOG_TARGET_FILE, "Resuming Solvers"); }, _ => {}, }; @@ -90,16 +104,23 @@ impl StratumMiner { }); let mut paused = true; + let mut timer = SystemTime::now(); loop { if let Some(message) = solver_loop_rx.try_iter().next() { - debug!("solver_thread - solver_loop_rx got msg: {:?}", message); + debug!( + target: LOG_TARGET_FILE, + "solver_thread - solver_loop_rx got msg: {:?}", message + ); match message { ControlMessage::Stop => break, ControlMessage::Pause => { paused = true; solver.solver_reset = true; }, - ControlMessage::Resume => paused = false, + ControlMessage::Resume => { + paused = false; + timer = SystemTime::now(); + }, _ => {}, } } @@ -115,7 +136,6 @@ impl StratumMiner { let height = { shared_data.read().unwrap().height }; let job_id = { shared_data.read().unwrap().job_id }; let target_difficulty = { shared_data.read().unwrap().difficulty }; - let mut hasher = BlockHeaderSha3::new(tari_app_grpc::tari_rpc::BlockHeader::from(header)).unwrap(); if solver.solver_reset { @@ -127,17 +147,20 @@ impl StratumMiner { hasher.inc_nonce(); solver.current_nonce = hasher.nonce; } - + let mut stats = statistics.write().unwrap(); let difficulty = hasher.difficulty(); + stats.mining_stats.add_hash(); if difficulty >= target_difficulty { let block_header: BlockHeader = BlockHeader::try_from(hasher.into_header()).unwrap(); info!( + target: LOG_TARGET, "Miner found share with hash {}, nonce {} and difficulty {:?}", block_header.hash().to_hex(), solver.current_nonce, difficulty ); debug!( + target: LOG_TARGET_FILE, "Miner found share with hash {}, difficulty {:?} and data {:?}", block_header.hash().to_hex(), difficulty, @@ -154,6 +177,11 @@ impl StratumMiner { hash: block_header.hash().to_hex(), nonce: block_header.nonce, }); + if let Ok(elapsed) = timer.elapsed() { + let sols = calculate_sols(elapsed); + stats.mining_stats.add_sols(sols); + } + timer = SystemTime::now(); } } solver.solutions = Solution::default(); @@ -169,8 +197,10 @@ impl StratumMiner { } pub fn start_solvers(&mut self) -> Result<(), stratum::error::Error> { + let mut stats = self.stats.write().unwrap(); + stats.mining_stats.solvers = self.config.num_mining_threads; let num_solvers = self.config.num_mining_threads; - info!("Spawning {} solvers", num_solvers); + info!(target: LOG_TARGET, "Spawning {} solvers", num_solvers); let mut solvers = Vec::with_capacity(num_solvers); while solvers.len() < solvers.capacity() { solvers.push(SolverInstance::new()?); @@ -183,8 +213,9 @@ impl StratumMiner { self.control_txs.push(control_tx); self.solver_loop_txs.push(solver_tx); self.solver_stopped_rxs.push(solver_stopped_rx); + let stats = self.stats.clone(); thread::spawn(move || { - StratumMiner::solver_thread(s, i, sd, control_rx, solver_rx, solver_stopped_tx); + StratumMiner::solver_thread(s, i, sd, control_rx, solver_rx, solver_stopped_tx, stats); }); } Ok(()) @@ -216,6 +247,10 @@ impl StratumMiner { sd.difficulty = difficulty; sd.header = Some(header); if paused { + info!( + target: LOG_TARGET, + "Hashing in progress... height: {}, target difficulty: {}", height, difficulty + ); self.resume_solvers(); } Ok(()) @@ -239,7 +274,7 @@ impl StratumMiner { for t in self.solver_loop_txs.iter() { let _ = t.send(ControlMessage::Stop); } - debug!("Stop message sent"); + debug!(target: LOG_TARGET_FILE, "Stop message sent"); } pub fn pause_solvers(&self) { @@ -249,7 +284,7 @@ impl StratumMiner { for t in self.solver_loop_txs.iter() { let _ = t.send(ControlMessage::Pause); } - debug!("Pause message sent"); + debug!(target: LOG_TARGET_FILE, "Pause message sent"); } pub fn resume_solvers(&self) { @@ -259,13 +294,13 @@ impl StratumMiner { for t in self.solver_loop_txs.iter() { let _ = t.send(ControlMessage::Resume); } - debug!("Resume message sent"); + debug!(target: LOG_TARGET_FILE, "Resume message sent"); } pub fn wait_for_solver_shutdown(&self) { for r in self.solver_stopped_rxs.iter() { if let Some(ControlMessage::SolverStopped(i)) = r.iter().next() { - debug!("Solver stopped: {}", i); + debug!(target: LOG_TARGET_FILE, "Solver stopped: {}", i); } } } diff --git a/applications/tari_mining_node/src/stratum/stratum_statistics/mod.rs b/applications/tari_mining_node/src/stratum/stratum_statistics/mod.rs new file mode 100644 index 00000000000..b3ca0d2f63e --- /dev/null +++ b/applications/tari_mining_node/src/stratum/stratum_statistics/mod.rs @@ -0,0 +1 @@ +pub(crate) mod stats; diff --git a/applications/tari_mining_node/src/stratum/stratum_statistics/stats.rs b/applications/tari_mining_node/src/stratum/stratum_statistics/stats.rs new file mode 100644 index 00000000000..cc0d758dfc0 --- /dev/null +++ b/applications/tari_mining_node/src/stratum/stratum_statistics/stats.rs @@ -0,0 +1,100 @@ +// Copyright 2021. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +use std::time::Duration; + +#[derive(Clone, Debug)] +pub struct SolutionStatistics { + /// Total found + pub found: u32, + /// Total rejected + pub rejected: u32, +} + +impl Default for SolutionStatistics { + fn default() -> SolutionStatistics { + SolutionStatistics { found: 0, rejected: 0 } + } +} + +#[derive(Clone, Debug)] +pub struct MiningStatistics { + /// Solutions per second + sols: Vec, + /// Hashes per second + hashes: u64, + /// Number Solvers + pub solvers: usize, + /// Solution statistics + pub solution_stats: SolutionStatistics, +} + +impl Default for MiningStatistics { + fn default() -> MiningStatistics { + MiningStatistics { + solvers: 0, + sols: vec![], + hashes: 0, + solution_stats: SolutionStatistics::default(), + } + } +} + +impl MiningStatistics { + pub fn add_sols(&mut self, val: f64) { + self.sols.insert(0, val); + self.sols.truncate(60); + } + + pub fn sols(&self) -> f64 { + if self.sols.is_empty() { + 0.0 + } else { + let sum: f64 = self.sols.iter().sum(); + sum / (self.sols.len() as f64) + } + } + + pub fn add_hash(&mut self) { + self.hashes += 1; + } + + pub fn hash_rate(&mut self, elapsed: Duration) -> f64 { + let hash_rate = self.hashes as f64 / elapsed.as_micros() as f64; + // reset the total number of hashes for this interval + self.hashes = 0; + hash_rate + } +} + +#[derive(Clone, Debug)] +pub struct Statistics { + pub mining_stats: MiningStatistics, +} + +impl Default for Statistics { + fn default() -> Statistics { + Statistics { + mining_stats: MiningStatistics::default(), + } + } +} diff --git a/applications/tari_mining_node/src/stratum/stratum_types/mod.rs b/applications/tari_mining_node/src/stratum/stratum_types/mod.rs index 432ab296ce2..4142b77f469 100644 --- a/applications/tari_mining_node/src/stratum/stratum_types/mod.rs +++ b/applications/tari_mining_node/src/stratum/stratum_types/mod.rs @@ -30,5 +30,5 @@ pub(crate) mod rpc_error; pub(crate) mod rpc_request; pub(crate) mod rpc_response; pub(crate) mod submit_params; +pub(crate) mod submit_response; pub(crate) mod worker_identifier; -pub(crate) mod worker_status; diff --git a/applications/tari_mining_node/src/stratum/stratum_types/rpc_error.rs b/applications/tari_mining_node/src/stratum/stratum_types/rpc_error.rs index 118a4977ed4..0b26a4891ff 100644 --- a/applications/tari_mining_node/src/stratum/stratum_types/rpc_error.rs +++ b/applications/tari_mining_node/src/stratum/stratum_types/rpc_error.rs @@ -22,7 +22,7 @@ // use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct RpcError { pub code: i32, pub message: String, diff --git a/applications/tari_mining_node/src/stratum/stratum_types/worker_status.rs b/applications/tari_mining_node/src/stratum/stratum_types/submit_response.rs similarity index 91% rename from applications/tari_mining_node/src/stratum/stratum_types/worker_status.rs rename to applications/tari_mining_node/src/stratum/stratum_types/submit_response.rs index 142e3fcb675..6c190d860cb 100644 --- a/applications/tari_mining_node/src/stratum/stratum_types/worker_status.rs +++ b/applications/tari_mining_node/src/stratum/stratum_types/submit_response.rs @@ -20,14 +20,11 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // +use crate::stratum::stratum_types::rpc_error::RpcError; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug)] -pub struct WorkerStatus { - pub id: String, - pub height: u64, - pub difficulty: u64, - pub accepted: u64, - pub rejected: u64, - pub stale: u64, +pub struct SubmitResponse { + pub status: Option, + pub error: Option, } diff --git a/common/logging/log4rs_sample_mining_node.yml b/common/logging/log4rs_sample_mining_node.yml index 16c4c437393..c3e85184ecc 100644 --- a/common/logging/log4rs_sample_mining_node.yml +++ b/common/logging/log4rs_sample_mining_node.yml @@ -44,11 +44,14 @@ loggers: appenders: - mining_node additive: false - tari_mining_node: + tari_mining_node::logging: level: debug appenders: - mining_node + additive: false + tari_mining_node::miner: + level: info + appenders: - stdout additive: false - From 53989f40b0ab03a9a894f3489452f94efae94bc1 Mon Sep 17 00:00:00 2001 From: Mike the Tike Date: Mon, 18 Oct 2021 17:22:59 +0200 Subject: [PATCH 2/3] fix: remove is_synced check for transaction validation (#3459) Description --- Remove the check that aborts transactions validation if the base node is not synced. Motivation and Context --- With this check in place, if the base node is syncing even one block when the wallet asks, it will abort the check. There most likely no relation between the block that the base node is syncing and the data that the wallet is asking for, and the base node may still be able to respond correctly. The logic in tx_validation already handles the fact that a base node may be in progress with a sync, and aborting it early is not necessary. How Has This Been Tested? --- Manually --- .../transaction_validation_protocol.rs | 8 ----- .../transaction_protocols.rs | 29 +------------------ 2 files changed, 1 insertion(+), 36 deletions(-) diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs index 3d758f6d770..fff77520b0c 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_validation_protocol.rs @@ -271,14 +271,6 @@ where }) .await?; - if !batch_response.is_synced { - info!( - target: LOG_TARGET, - "Base Node reports not being synced, aborting transaction validation" - ); - return Err(TransactionServiceError::BaseNodeNotSynced); - } - for response_proto in batch_response.responses { let response = TxQueryBatchResponse::try_from(response_proto) .map_err(TransactionServiceError::ProtobufConversionError)?; diff --git a/base_layer/wallet/tests/transaction_service/transaction_protocols.rs b/base_layer/wallet/tests/transaction_service/transaction_protocols.rs index a0b3e8b3d16..cf6aca4ccc8 100644 --- a/base_layer/wallet/tests/transaction_service/transaction_protocols.rs +++ b/base_layer/wallet/tests/transaction_service/transaction_protocols.rs @@ -71,7 +71,7 @@ use tari_wallet::{ storage::sqlite_utilities::run_migration_and_create_sqlite_connection, transaction_service::{ config::TransactionServiceConfig, - error::{TransactionServiceError, TransactionServiceProtocolError}, + error::TransactionServiceError, handle::{TransactionEvent, TransactionEventReceiver, TransactionEventSender}, protocols::{ transaction_broadcast_protocol::TransactionBroadcastProtocol, @@ -781,33 +781,6 @@ async fn tx_validation_protocol_tx_becomes_mined_unconfirmed_then_confirmed() { rpc_service_state.set_transaction_query_batch_responses(batch_query_response.clone()); - rpc_service_state.set_is_synced(false); - - wallet_connectivity.notify_base_node_set(server_node_identity.to_peer()); - - let protocol = TransactionValidationProtocol::new( - 1, - resources.db.clone(), - wallet_connectivity.clone(), - resources.config.clone(), - resources.event_publisher.clone(), - resources.output_manager_service.clone(), - ); - - let join_handle = task::spawn(protocol.execute()); - - // Check that the protocol ends with error due to base node not being synced - let result = join_handle.await.unwrap(); - assert!(matches!( - result, - Err(TransactionServiceProtocolError { - id: 1, - error: TransactionServiceError::BaseNodeNotSynced, - }) - )); - - rpc_service_state.set_is_synced(true); - let protocol = TransactionValidationProtocol::new( 2, resources.db.clone(), From 80f7c78b296a48eda3d3e69d266396482679f35a Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Mon, 18 Oct 2021 20:47:34 +0400 Subject: [PATCH 3/3] fix: ensure that accumulated orphan chain data is committed before header validation (#3462) Description --- - commit orphan chain accumulated data before orphan header validation so that accumulated target difficulty is available for chain comparison - use hash for reorg chain rewind because the height value of the candidate block is not yet checked against the fork height. - improve test infra to allow blockchain tests to be easily be constructed - add a failing test for the fix in the PR Motivation and Context --- Observed errors: ``` 2021-10-10 09:17:19.987800805 [c::cs::database] WARN Discarding orphan 0291ba64d777e46016ca7b055bdf6979c1fe11bf31b78a7c20d507cb69c3f293 because it has an invalid header: FatalStorageError("The requested chain_header_in_all_chains was not found via hash:9c2c5734a783d891b617905e27e861ac1595760d5c22335c8d31feb7dc38a2a5 in the database") ``` The above error occurs because of orphan accumulated data was set in the `DbTransaction` but is not actually committed to lmdb. This manifests as a validation error and was not picked up by other tests because the validator was mocked out. The solution in this PR is to write the transactions as needed. This means that in a rollback situation, the accumulated orphan chain data will remain. That _should_ be ok since that data can be overwritten/replaced on the next reorg evaluation if needed. This demonstrates the shortcomings of the current approach and the need for the calling code in `BlockchainDatabase` to have access to ACID DB transactions. I have played with the idea of abstracting and generifying atomic transactions but found the need for [GAT](https://blog.rust-lang.org/2021/08/03/GATs-stabilization-push.html) to get it to work. We no longer have or use any other BlockchainBackend impl other than LMDB so leaking LMDB `Read,WriteTransaction` may be acceptable. Passing the transaction in to every function in BlockchainBackend may be a deal breaker. How Has This Been Tested? --- - Issue reproduced in `handle_possible_reorg::it_links_many_orphan_branches_to_main_chain` - Manually (not necessarily tested, the base node runs for a day and is still at the tip) Consolidated some of the existing "blockchain builder" test infra that was duplicated and added a declarative macro for building chains `let specs = block_spec!(["A->GB"], ["B->A", difficulty: 100]);` --- .../horizon_state_synchronization.rs | 2 +- .../src/chain_storage/accumulated_data.rs | 2 +- .../src/chain_storage/block_add_result.rs | 13 +- .../src/chain_storage/blockchain_database.rs | 382 +++++++++++++----- .../core/src/chain_storage/db_transaction.rs | 12 +- .../core/src/chain_storage/lmdb_db/lmdb_db.rs | 117 +++--- base_layer/core/src/lib.rs | 1 + .../core/src/test_helpers/block_spec.rs | 238 +++++++++++ .../core/src/test_helpers/blockchain.rs | 39 +- base_layer/core/src/test_helpers/mod.rs | 65 +-- .../validation/block_validators/body_only.rs | 14 +- .../core/src/validation/chain_balance.rs | 2 +- base_layer/core/src/validation/mocks.rs | 4 +- base_layer/core/src/validation/test.rs | 6 +- base_layer/core/src/validation/traits.rs | 4 +- base_layer/core/tests/block_validation.rs | 10 +- 16 files changed, 642 insertions(+), 269 deletions(-) create mode 100644 base_layer/core/src/test_helpers/block_spec.rs diff --git a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs index 3511ade2061..7e10d597457 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs @@ -604,10 +604,10 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { .sync_validators .final_horizon_state .validate( + &*self.db().clone().into_inner().db_read_access()?, header.height(), &pruned_utxo_sum, &pruned_kernel_sum, - &*self.db().clone().into_inner().db_read_access()?, ) .map_err(HorizonSyncError::FinalStateValidationFailed)?; diff --git a/base_layer/core/src/chain_storage/accumulated_data.rs b/base_layer/core/src/chain_storage/accumulated_data.rs index 5c050290d49..c7eef86bd07 100644 --- a/base_layer/core/src/chain_storage/accumulated_data.rs +++ b/base_layer/core/src/chain_storage/accumulated_data.rs @@ -273,7 +273,7 @@ impl BlockHeaderAccumulatedDataBuilder<'_> { .hash .ok_or_else(|| ChainStorageError::InvalidOperation("hash not provided".to_string()))?; - if hash == self.previous_accum.hash { + if hash == previous_accum.hash { return Err(ChainStorageError::InvalidOperation( "Hash was set to the same hash that is contained in previous accumulated data".to_string(), )); diff --git a/base_layer/core/src/chain_storage/block_add_result.rs b/base_layer/core/src/chain_storage/block_add_result.rs index 26b0107ed00..4c9a783f17a 100644 --- a/base_layer/core/src/chain_storage/block_add_result.rs +++ b/base_layer/core/src/chain_storage/block_add_result.rs @@ -46,6 +46,14 @@ impl BlockAddResult { matches!(self, BlockAddResult::Ok(_)) } + pub fn is_chain_reorg(&self) -> bool { + matches!(self, BlockAddResult::ChainReorg { .. }) + } + + pub fn is_orphaned(&self) -> bool { + matches!(self, BlockAddResult::OrphanBlock) + } + pub fn assert_added(&self) -> ChainBlock { match self { BlockAddResult::ChainReorg { added, removed } => panic!( @@ -60,10 +68,7 @@ impl BlockAddResult { } pub fn assert_orphaned(&self) { - match self { - BlockAddResult::OrphanBlock => (), - _ => panic!("Result was not orphaned"), - } + assert!(self.is_orphaned(), "Result was not orphaned"); } pub fn assert_reorg(&self, num_added: usize, num_removed: usize) { diff --git a/base_layer/core/src/chain_storage/blockchain_database.rs b/base_layer/core/src/chain_storage/blockchain_database.rs index 6053ccef959..34a4cce5da5 100644 --- a/base_layer/core/src/chain_storage/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/blockchain_database.rs @@ -849,7 +849,7 @@ where B: BlockchainBackend fn insert_block(&self, block: Arc) -> Result<(), ChainStorageError> { let mut db = self.db_write_access()?; let mut txn = DbTransaction::new(); - insert_block(&mut txn, block)?; + insert_best_block(&mut txn, block)?; db.write(txn) } @@ -1239,8 +1239,8 @@ fn add_block( ) } -// Adds a new block onto the chain tip. -fn insert_block(txn: &mut DbTransaction, block: Arc) -> Result<(), ChainStorageError> { +/// Adds a new block onto the chain tip and sets it to the best block. +fn insert_best_block(txn: &mut DbTransaction, block: Arc) -> Result<(), ChainStorageError> { let block_hash = block.accumulated_data().hash.clone(); debug!( target: LOG_TARGET, @@ -1432,6 +1432,8 @@ fn rewind_to_height( let metadata = db.fetch_chain_metadata()?; let expected_block_hash = metadata.best_block().clone(); let last_block_height = metadata.height_of_longest_chain(); + // We use the cmp::max value here because we'll only delete headers here and leave remaining headers to be deleted + // with the whole block let steps_back = last_header_height .checked_sub(cmp::max(last_block_height, height)) .ok_or_else(|| { @@ -1461,9 +1463,8 @@ fn rewind_to_height( }); // Delete blocks - let mut steps_back = last_block_height.saturating_sub(height); - // No blocks to remove + // No blocks to remove, no need to update the best block if steps_back == 0 { db.write(txn)?; return Ok(vec![]); @@ -1477,16 +1478,17 @@ fn rewind_to_height( last_block_height - steps_back ); - let prune_past_horizon = metadata.is_pruned_node() && steps_back > metadata.pruning_horizon(); + let effective_pruning_horizon = metadata.height_of_longest_chain() - metadata.pruned_height(); + let prune_past_horizon = metadata.is_pruned_node() && steps_back > effective_pruning_horizon; if prune_past_horizon { warn!( target: LOG_TARGET, - "WARNING, reorg past pruning horizon, rewinding back to 0" + "WARNING, reorg past pruning horizon (more than {} blocks back), rewinding back to 0", + effective_pruning_horizon ); - steps_back = metadata.pruning_horizon(); + steps_back = effective_pruning_horizon; height = 0; } - let chain_header = db.fetch_chain_header_by_height(height)?; for h in 0..steps_back { info!(target: LOG_TARGET, "Deleting block {}", last_block_height - h,); @@ -1519,6 +1521,7 @@ fn rewind_to_height( } } + let chain_header = db.fetch_chain_header_by_height(height)?; // Update metadata debug!( target: LOG_TARGET, @@ -1585,7 +1588,7 @@ fn handle_possible_reorg( } // Check the accumulated difficulty of the best fork chain compared to the main chain. - let fork_header = find_strongest_orphan_tip(new_tips, chain_strength_comparer)?.ok_or_else(|| { + let fork_header = find_strongest_orphan_tip(new_tips, chain_strength_comparer).ok_or_else(|| { // This should never happen because a block is always added to the orphan pool before // checking, but just in case warn!( @@ -1654,16 +1657,15 @@ fn handle_possible_reorg( // TODO: We already have the first link in this chain, can be optimized to exclude it let reorg_chain = get_orphan_link_main_chain(db, fork_header.hash())?; - let fork_height = reorg_chain + let fork_hash = reorg_chain .front() .expect("The new orphan block should be in the queue") - .block() - .header - .height - - 1; + .header() + .prev_hash + .clone(); let num_added_blocks = reorg_chain.len(); - let removed_blocks = reorganize_chain(db, block_validator, fork_height, &reorg_chain)?; + let removed_blocks = reorganize_chain(db, block_validator, fork_hash, &reorg_chain)?; let num_removed_blocks = removed_blocks.len(); // reorg is required when any blocks are removed or more than one are added @@ -1708,15 +1710,15 @@ fn handle_possible_reorg( fn reorganize_chain( backend: &mut T, block_validator: &dyn PostOrphanBodyValidation, - fork_height: u64, + fork_hash: HashOutput, chain: &VecDeque>, ) -> Result>, ChainStorageError> { - let removed_blocks = rewind_to_height(backend, fork_height)?; + let removed_blocks = rewind_to_hash(backend, fork_hash.clone())?; debug!( target: LOG_TARGET, - "Validate and add {} chain block(s) from height {}. Rewound blocks: [{}]", + "Validate and add {} chain block(s) from block {}. Rewound blocks: [{}]", chain.len(), - fork_height, + fork_hash.to_hex(), removed_blocks .iter() .map(|b| b.height().to_string()) @@ -1726,25 +1728,25 @@ fn reorganize_chain( for block in chain { let mut txn = DbTransaction::new(); - let block_hash_hex = block.accumulated_data().hash.to_hex(); - txn.delete_orphan(block.accumulated_data().hash.clone()); + let block_hash = block.hash().clone(); + txn.delete_orphan(block_hash.clone()); let chain_metadata = backend.fetch_chain_metadata()?; - if let Err(e) = block_validator.validate_body_for_valid_orphan(block, backend, &chain_metadata) { + if let Err(e) = block_validator.validate_body_for_valid_orphan(backend, block, &chain_metadata) { warn!( target: LOG_TARGET, "Orphan block {} ({}) failed validation during chain reorg: {:?}", block.header().height, - block_hash_hex, + block_hash.to_hex(), e ); - remove_orphan(backend, block.accumulated_data().hash.clone())?; + remove_orphan(backend, block_hash)?; info!(target: LOG_TARGET, "Restoring previous chain after failed reorg."); - restore_reorged_chain(backend, fork_height, removed_blocks)?; + restore_reorged_chain(backend, fork_hash, removed_blocks)?; return Err(e.into()); } - insert_block(&mut txn, block.clone())?; + insert_best_block(&mut txn, block.clone())?; // Failed to store the block - this should typically never happen unless there is a bug in the validator // (e.g. does not catch a double spend). In any case, we still need to restore the chain to a // good state before returning. @@ -1754,7 +1756,7 @@ fn reorganize_chain( "Failed to commit reorg chain: {:?}. Restoring last chain.", e ); - restore_reorged_chain(backend, fork_height, removed_blocks)?; + restore_reorged_chain(backend, fork_hash, removed_blocks)?; return Err(e); } } @@ -1773,10 +1775,10 @@ fn reorganize_chain( fn restore_reorged_chain( db: &mut T, - height: u64, + to_hash: HashOutput, previous_chain: Vec>, ) -> Result<(), ChainStorageError> { - let invalid_chain = rewind_to_height(db, height)?; + let invalid_chain = rewind_to_hash(db, to_hash)?; debug!( target: LOG_TARGET, "Removed {} blocks during chain restore: {:?}.", @@ -1790,7 +1792,7 @@ fn restore_reorged_chain( for block in previous_chain.into_iter().rev() { txn.delete_orphan(block.accumulated_data().hash.clone()); - insert_block(&mut txn, block)?; + insert_best_block(&mut txn, block)?; } db.write(txn)?; Ok(()) @@ -1810,10 +1812,11 @@ fn insert_orphan_and_find_new_tips( return Ok(vec![]); } - let mut txn = DbTransaction::new(); let parent = match db.fetch_orphan_chain_tip_by_hash(&block.header.prev_hash)? { Some(curr_parent) => { + let mut txn = DbTransaction::new(); txn.remove_orphan_chain_tip(block.header.prev_hash.clone()); + db.write(txn)?; info!( target: LOG_TARGET, "New orphan extends a chain in the current candidate tip set" @@ -1827,22 +1830,32 @@ fn insert_orphan_and_find_new_tips( Some(curr_parent) => { info!( target: LOG_TARGET, - "New orphan does not have a parent in the current tip set. Parent is {}", + "New orphan #{} ({}) does not have a parent in the current tip set. Parent is {}", + block.header.height, + hash.to_hex(), curr_parent.hash().to_hex() ); curr_parent }, None => { - info!( - target: LOG_TARGET, - "Orphan {} was not connected to any previous headers. Inserting as true orphan", - hash.to_hex() - ); - - if !db.contains(&DbKey::OrphanBlock(hash))? { + let hash_hex = hash.to_hex(); + if db.contains(&DbKey::OrphanBlock(hash))? { + info!( + target: LOG_TARGET, + "Orphan #{} ({}) already found in orphan database", block.header.height, hash_hex + ); + } else { + info!( + target: LOG_TARGET, + "Orphan #{} ({}) was not connected to any previous headers. Inserting as true orphan", + block.header.height, + hash_hex + ); + + let mut txn = DbTransaction::new(); txn.insert_orphan(block); + db.write(txn)?; } - db.write(txn)?; return Ok(vec![]); }, }, @@ -1861,10 +1874,16 @@ fn insert_orphan_and_find_new_tips( let chain_header = chain_block.to_chain_header(); // Extend orphan chain tip. - txn.insert_chained_orphan(Arc::new(chain_block)); + let mut txn = DbTransaction::new(); + if !db.contains(&DbKey::OrphanBlock(chain_block.accumulated_data().hash.clone()))? { + txn.insert_orphan(chain_block.to_arc_block()); + } + txn.set_accumulated_data_for_orphan(chain_block.accumulated_data().clone()); + db.write(txn)?; - let tips = find_orphan_descendant_tips_of(&*db, &chain_header, validator, difficulty_calculator, &mut txn)?; + let tips = find_orphan_descendant_tips_of(db, chain_header, validator, difficulty_calculator)?; debug!(target: LOG_TARGET, "Found {} new orphan tips", tips.len()); + let mut txn = DbTransaction::new(); for new_tip in &tips { txn.insert_orphan_chain_tip(new_tip.hash().clone()); } @@ -1875,11 +1894,10 @@ fn insert_orphan_and_find_new_tips( // Find the tip set of any orphans that have hash as an ancestor fn find_orphan_descendant_tips_of( - db: &T, - prev_chain_header: &ChainHeader, + db: &mut T, + prev_chain_header: ChainHeader, validator: &dyn HeaderValidation, difficulty_calculator: &DifficultyCalculator, - txn: &mut DbTransaction, ) -> Result, ChainStorageError> { let children = db.fetch_orphan_children_of(prev_chain_header.hash().clone())?; if children.is_empty() { @@ -1889,11 +1907,19 @@ fn find_orphan_descendant_tips_of( prev_chain_header.height(), prev_chain_header.hash().to_hex() ); - return Ok(vec![prev_chain_header.clone()]); + return Ok(vec![prev_chain_header]); } let mut res = vec![]; for child in children { + debug!( + target: LOG_TARGET, + "Validating header #{} ({}), descendant of #{} ({})", + child.header.height, + child.hash().to_hex(), + prev_chain_header.height(), + prev_chain_header.hash().to_hex() + ); match validator.validate(db, &child.header, difficulty_calculator) { Ok(achieved_target) => { let child_hash = child.hash(); @@ -1911,10 +1937,10 @@ fn find_orphan_descendant_tips_of( })?; // Set/overwrite accumulated data for this orphan block - txn.set_accumulated_data_for_orphan(chain_header.clone()); - - let children = - find_orphan_descendant_tips_of(db, &chain_header, validator, difficulty_calculator, txn)?; + let mut txn = DbTransaction::new(); + txn.set_accumulated_data_for_orphan(chain_header.accumulated_data().clone()); + db.write(txn)?; + let children = find_orphan_descendant_tips_of(db, chain_header, validator, difficulty_calculator)?; res.extend(children); }, Err(e) => { @@ -1925,7 +1951,9 @@ fn find_orphan_descendant_tips_of( child.hash().to_hex(), e ); + let mut txn = DbTransaction::new(); txn.delete_orphan(child.hash()); + db.write(txn)?; }, }; } @@ -1939,7 +1967,7 @@ fn remove_orphan(db: &mut T, hash: HashOutput) -> Result<( db.write(txn) } -/// Gets all blocks ordered from the orphan tip to the point (exclusive) where it connects to the best chain. +/// Gets all blocks ordered from the the block that connects (via prev_hash) to the main chain, to the orphan tip. // TODO: this would probably perform better if it reused the db transaction #[allow(clippy::ptr_arg)] fn get_orphan_link_main_chain( @@ -1971,7 +1999,7 @@ fn get_orphan_link_main_chain( fn find_strongest_orphan_tip( orphan_chain_tips: Vec, chain_strength_comparer: &dyn ChainStrengthComparer, -) -> Result, ChainStorageError> { +) -> Option { let mut best_block_header: Option = None; for tip in orphan_chain_tips { best_block_header = match best_block_header { @@ -1983,7 +2011,7 @@ fn find_strongest_orphan_tip( }; } - Ok(best_block_header) + best_block_header } // Perform a comprehensive search to remove all the minimum height orphans to maintain the configured orphan pool @@ -2092,24 +2120,29 @@ fn convert_to_option_bounds>(bounds: T) -> (Option, Opt mod test { use super::*; use crate::{ + block_specs, consensus::{ chain_strength_comparer::strongest_chain, consensus_constants::PowAlgorithmConstants, ConsensusConstantsBuilder, ConsensusManager, }, - test_helpers::blockchain::{ - create_chained_blocks, - create_main_chain, - create_new_blockchain, - create_orphan_chain, - create_test_blockchain_db, - TempDatabase, + test_helpers::{ + blockchain::{ + create_chained_blocks, + create_main_chain, + create_new_blockchain, + create_orphan_chain, + create_test_blockchain_db, + TempDatabase, + }, + BlockSpecs, }, validation::{header_validator::HeaderValidator, mocks::MockValidator}, }; - use std::collections::HashMap; + use std::{collections::HashMap, sync}; use tari_common::configuration::Network; + use tari_test_utils::unpack_enum; #[test] fn lmdb_fetch_monero_seeds() { @@ -2208,7 +2241,7 @@ mod test { let db = create_new_blockchain(); let validator = MockValidator::new(true); let genesis_block = db.fetch_block(0).unwrap().try_into_chain_block().map(Arc::new).unwrap(); - let (_, chain) = create_chained_blocks(&[("A->GB", 1, 120)], genesis_block); + let (_, chain) = create_chained_blocks(&[("A->GB", 1u64, 120u64)], genesis_block); let block = chain.get("A").unwrap().clone(); let mut access = db.db_write_access().unwrap(); let chain = insert_orphan_and_find_new_tips( @@ -2296,6 +2329,118 @@ mod test { } } + mod handle_possible_reorg { + use super::*; + + #[test] + fn it_links_many_orphan_branches_to_main_chain() { + let test = TestHarness::setup(); + + let (_, main_chain) = + create_main_chain(&test.db, block_specs!(["1a->GB"], ["2a->1a"], ["3a->2a"], ["4a->3a"])); + let genesis = main_chain.get("GB").unwrap().clone(); + + let fork_root = main_chain.get("1a").unwrap().clone(); + let (_, orphan_chain_b) = create_chained_blocks( + block_specs!(["2b->GB"], ["3b->2b"], ["4b->3b"], ["5b->4b"], ["6b->5b"]), + fork_root, + ); + + // Add orphans out of height order + for name in ["5b", "3b", "4b", "6b"] { + let block = orphan_chain_b.get(name).unwrap().clone(); + let result = test.handle_possible_reorg(block.to_arc_block()).unwrap(); + assert!(result.is_orphaned()); + } + + // Add chain c orphans branching from chain b + let fork_root = orphan_chain_b.get("3b").unwrap().clone(); + let (_, orphan_chain_c) = + create_chained_blocks(block_specs!(["4c->GB"], ["5c->4c"], ["6c->5c"], ["7c->6c"]), fork_root); + + for name in ["7c", "5c", "6c", "4c"] { + let block = orphan_chain_c.get(name).unwrap().clone(); + let result = test.handle_possible_reorg(block.to_arc_block()).unwrap(); + assert!(result.is_orphaned()); + } + + let fork_root = orphan_chain_c.get("6c").unwrap().clone(); + let (_, orphan_chain_d) = create_chained_blocks(block_specs!(["7d->GB", difficulty: 10]), fork_root); + + let block = orphan_chain_d.get("7d").unwrap().clone(); + let result = test.handle_possible_reorg(block.to_arc_block()).unwrap(); + assert!(result.is_orphaned()); + + // Now, connect the chain and check that the c branch is the tip + let block = orphan_chain_b.get("2b").unwrap().clone(); + let result = test.handle_possible_reorg(block.to_arc_block()).unwrap(); + result.assert_reorg(6, 3); + + { + // Check 2b was added + let access = test.db_write_access(); + let block = orphan_chain_b.get("2b").unwrap().clone(); + assert!(access.contains(&DbKey::BlockHash(block.hash().clone())).unwrap()); + + // Check 7d is the tip + let block = orphan_chain_d.get("7d").unwrap().clone(); + let tip = access.fetch_tip_header().unwrap(); + assert_eq!(tip.hash(), block.hash()); + let metadata = access.fetch_chain_metadata().unwrap(); + assert_eq!(metadata.best_block(), block.hash()); + assert_eq!(metadata.height_of_longest_chain(), block.height()); + assert!(access.contains(&DbKey::BlockHash(block.hash().clone())).unwrap()); + + let mut all_blocks = main_chain + .into_iter() + .chain(orphan_chain_b) + .chain(orphan_chain_c) + .chain(orphan_chain_d) + .collect::>(); + all_blocks.insert("GB".to_string(), genesis); + // Check the chain heights + let expected_chain = ["GB", "1a", "2b", "3b", "4c", "5c", "6c", "7d"]; + for (height, name) in expected_chain.iter().enumerate() { + let expected_block = all_blocks.get(*name).unwrap(); + unpack_enum!( + DbValue::BlockHeader(found_block) = + access.fetch(&DbKey::BlockHeader(height as u64)).unwrap().unwrap() + ); + assert_eq!(*found_block, *expected_block.header()); + } + } + } + + #[test] + fn it_errors_if_reorging_to_an_invalid_height() { + let test = TestHarness::setup(); + let (_, main_chain) = + create_main_chain(&test.db, block_specs!(["1a->GB"], ["2a->1a"], ["3a->2a"], ["4a->3a"])); + + let fork_root = main_chain.get("1a").unwrap().clone(); + let (_, orphan_chain_b) = + create_chained_blocks(block_specs!(["2b->GB", height: 10, difficulty: 10]), fork_root); + + let block = orphan_chain_b.get("2b").unwrap().clone(); + let err = test.handle_possible_reorg(block.to_arc_block()).unwrap_err(); + unpack_enum!(ChainStorageError::InvalidOperation(_) = err); + } + + #[test] + fn it_allows_orphan_blocks_with_any_height() { + let test = TestHarness::setup(); + let (_, main_chain) = create_main_chain(&test.db, block_specs!(["1a->GB", difficulty: 2])); + + let fork_root = main_chain.get("GB").unwrap().clone(); + let (_, orphan_chain_b) = create_chained_blocks(block_specs!(["1b->GB", height: 10]), fork_root); + + let block = orphan_chain_b.get("1b").unwrap().clone(); + test.handle_possible_reorg(block.to_arc_block()) + .unwrap() + .assert_orphaned(); + } + } + #[test] fn test_handle_possible_reorg_case1() { // Normal chain @@ -2684,33 +2829,71 @@ mod test { assert_eq!(accum_difficulty, values); } - #[allow(clippy::type_complexity)] - fn test_case_handle_possible_reorg( - blocks: &[(&str, u64, u64)], - ) -> Result<(Vec, HashMap>), ChainStorageError> { - let db = create_new_blockchain(); - let genesis_block = db.fetch_block(0).unwrap().try_into_chain_block().map(Arc::new).unwrap(); - let (block_names, chain) = create_chained_blocks(blocks, genesis_block); - let mock_validator = Box::new(MockValidator::new(true)); - // A real validator is needed here to test target difficulties + struct TestHarness { + db: BlockchainDatabase, + chain_strength_comparer: Box, + difficulty_calculator: DifficultyCalculator, + post_orphan_body_validator: Box>, + header_validator: Box>, + } - let consensus = ConsensusManager::builder(Network::LocalNet) - .add_consensus_constants( - ConsensusConstantsBuilder::new(Network::LocalNet) - .clear_proof_of_work() - .add_proof_of_work(PowAlgorithm::Sha3, PowAlgorithmConstants { - max_target_time: 1200, - min_difficulty: 1.into(), - max_difficulty: 100.into(), - target_time: 120, - }) - .build(), + impl TestHarness { + pub fn setup() -> Self { + let consensus = create_consensus_rules(); + let header_validator = Box::new(HeaderValidator::new(consensus)); + let mock_validator = Box::new(MockValidator::new(true)); + Self::new(header_validator, mock_validator) + } + + pub fn new( + header_validator: Box>, + post_orphan_body_validator: Box>, + ) -> Self { + let db = create_new_blockchain(); + let consensus = create_consensus_rules(); + let difficulty_calculator = DifficultyCalculator::new(consensus, Default::default()); + let chain_strength_comparer = strongest_chain().by_sha3_difficulty().build(); + Self { + db, + chain_strength_comparer, + difficulty_calculator, + header_validator, + post_orphan_body_validator, + } + } + + pub fn db_write_access(&self) -> sync::RwLockWriteGuard<'_, TempDatabase> { + self.db.db_write_access().unwrap() + } + + pub fn handle_possible_reorg(&self, block: Arc) -> Result { + let mut access = self.db_write_access(); + handle_possible_reorg( + &mut *access, + &*self.post_orphan_body_validator, + &*self.header_validator, + &self.difficulty_calculator, + &*self.chain_strength_comparer, + block, ) - .build(); + } + } + + #[allow(clippy::type_complexity)] + fn test_case_handle_possible_reorg>( + blocks: T, + ) -> Result<(Vec, HashMap>), ChainStorageError> { + let test = TestHarness::setup(); + // let db = create_new_blockchain(); + let genesis_block = test + .db + .fetch_block(0) + .unwrap() + .try_into_chain_block() + .map(Arc::new) + .unwrap(); + let (block_names, chain) = create_chained_blocks(blocks.into(), genesis_block); - let difficulty_calculator = DifficultyCalculator::new(consensus.clone(), Default::default()); - let header_validator = Box::new(HeaderValidator::new(consensus)); - let chain_strength_comparer = strongest_chain().by_sha3_difficulty().build(); let mut results = vec![]; for name in block_names { let block = chain.get(&name.to_string()).unwrap(); @@ -2720,15 +2903,24 @@ mod test { block.hash().to_hex(), block.header().prev_hash.to_hex() ); - results.push(handle_possible_reorg( - &mut *db.db_write_access()?, - &*mock_validator, - &*header_validator, - &difficulty_calculator, - &*chain_strength_comparer, - block.to_arc_block(), - )?); + results.push(test.handle_possible_reorg(block.to_arc_block()).unwrap()); } Ok((results, chain)) } + + fn create_consensus_rules() -> ConsensusManager { + ConsensusManager::builder(Network::LocalNet) + .add_consensus_constants( + ConsensusConstantsBuilder::new(Network::LocalNet) + .clear_proof_of_work() + .add_proof_of_work(PowAlgorithm::Sha3, PowAlgorithmConstants { + max_target_time: 1200, + min_difficulty: 1.into(), + max_difficulty: 100.into(), + target_time: 120, + }) + .build(), + ) + .build() + } } diff --git a/base_layer/core/src/chain_storage/db_transaction.rs b/base_layer/core/src/chain_storage/db_transaction.rs index e004682da6c..e3bd5fc6458 100644 --- a/base_layer/core/src/chain_storage/db_transaction.rs +++ b/base_layer/core/src/chain_storage/db_transaction.rs @@ -21,7 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use crate::{ blocks::{Block, BlockHeader}, - chain_storage::{error::ChainStorageError, ChainBlock, ChainHeader, MmrTree}, + chain_storage::{error::ChainStorageError, BlockHeaderAccumulatedData, ChainBlock, ChainHeader, MmrTree}, transactions::transaction::{TransactionKernel, TransactionOutput}, }; use croaring::Bitmap; @@ -214,9 +214,9 @@ impl DbTransaction { /// Sets accumulated data for the orphan block, "upgrading" the orphan block to a chained orphan. /// Any existing accumulated data is overwritten. /// The transaction will rollback and write will return an error if the orphan block does not exist. - pub fn set_accumulated_data_for_orphan(&mut self, chain_header: ChainHeader) -> &mut Self { + pub fn set_accumulated_data_for_orphan(&mut self, accumulated_data: BlockHeaderAccumulatedData) -> &mut Self { self.operations - .push(WriteOperation::SetAccumulatedDataForOrphan(chain_header)); + .push(WriteOperation::SetAccumulatedDataForOrphan(accumulated_data)); self } @@ -318,7 +318,7 @@ pub enum WriteOperation { header_hash: HashOutput, kernel_sum: Commitment, }, - SetAccumulatedDataForOrphan(ChainHeader), + SetAccumulatedDataForOrphan(BlockHeaderAccumulatedData), SetBestBlock { height: u64, hash: HashOutput, @@ -415,8 +415,8 @@ impl fmt::Display for WriteOperation { horizon ), UpdateKernelSum { header_hash, .. } => write!(f, "Update kernel sum for block: {}", header_hash.to_hex()), - SetAccumulatedDataForOrphan(chain_header) => { - write!(f, "Set accumulated data for orphan {}", chain_header.hash().to_hex()) + SetAccumulatedDataForOrphan(accumulated_data) => { + write!(f, "Set accumulated data for orphan {}", accumulated_data) }, SetBestBlock { height, diff --git a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs index f58eea66bec..1bae3c9ab2c 100644 --- a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs +++ b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs @@ -203,12 +203,12 @@ impl LMDBDatabase { _file_lock: Arc::new(file_lock), }; - db.build_indexes()?; + db.check_if_rebuild_required()?; Ok(db) } - fn build_indexes(&self) -> Result<(), ChainStorageError> { + fn check_if_rebuild_required(&self) -> Result<(), ChainStorageError> { let txn = self.read_transaction()?; if lmdb_len(&txn, &self.deleted_txo_mmr_position_to_height_index)? == 0 && lmdb_len(&txn, &self.inputs_db)? > 0 { @@ -299,20 +299,12 @@ impl LMDBDatabase { InsertMoneroSeedHeight(data, height) => { self.insert_monero_seed_height(&write_txn, data, *height)?; }, - SetAccumulatedDataForOrphan(chain_header) => { - self.set_accumulated_data_for_orphan( - &write_txn, - chain_header.hash(), - chain_header.accumulated_data(), - )?; + SetAccumulatedDataForOrphan(accumulated_data) => { + self.set_accumulated_data_for_orphan(&write_txn, accumulated_data)?; }, InsertChainOrphanBlock(chain_block) => { self.insert_orphan_block(&write_txn, chain_block.block())?; - self.set_accumulated_data_for_orphan( - &write_txn, - chain_block.hash(), - chain_block.accumulated_data(), - )?; + self.set_accumulated_data_for_orphan(&write_txn, chain_block.accumulated_data())?; }, UpdatePrunedHashSet { mmr_tree, @@ -651,24 +643,22 @@ impl LMDBDatabase { Ok(()) } - #[allow(clippy::ptr_arg)] fn set_accumulated_data_for_orphan( &self, txn: &WriteTransaction<'_>, - header_hash: &HashOutput, accumulated_data: &BlockHeaderAccumulatedData, ) -> Result<(), ChainStorageError> { - if !lmdb_exists(txn, &self.orphans_db, header_hash.as_slice())? { + if !lmdb_exists(txn, &self.orphans_db, accumulated_data.hash.as_slice())? { return Err(ChainStorageError::InvalidOperation(format!( "set_accumulated_data_for_orphan: orphan {} does not exist", - header_hash.to_hex() + accumulated_data.hash.to_hex() ))); } lmdb_insert( txn, &self.orphan_header_accumulated_data_db, - header_hash.as_slice(), + accumulated_data.hash.as_slice(), &accumulated_data, "orphan_header_accumulated_data_db", )?; @@ -704,10 +694,9 @@ impl LMDBDatabase { if let Some(ref last_header) = self.fetch_last_header_in_txn(txn)? { if last_header.height != header.height.saturating_sub(1) { return Err(ChainStorageError::InvalidOperation(format!( - "Attempted to insert a header out of order. Was expecting chain height to be {} but current last \ - header height is {}", - header.height - 1, - last_header.height + "Attempted to insert a header out of order. The last header height is {} but attempted to insert \ + a header with height {}", + last_header.height, header.height, ))); } @@ -953,47 +942,56 @@ impl LMDBDatabase { } fn delete_orphan(&self, txn: &WriteTransaction<'_>, hash: &HashOutput) -> Result<(), ChainStorageError> { - if let Some(orphan) = lmdb_get::<_, Block>(txn, &self.orphans_db, hash.as_slice())? { - let parent_hash = orphan.header.prev_hash; - lmdb_delete_key_value(txn, &self.orphan_parent_map_index, parent_hash.as_slice(), &hash)?; + let orphan = match lmdb_get::<_, Block>(txn, &self.orphans_db, hash.as_slice())? { + Some(orphan) => orphan, + None => { + // delete_orphan is idempotent + debug!( + target: LOG_TARGET, + "delete_orphan: request to delete orphan block {} that was not found.", + hash.to_hex() + ); + return Ok(()); + }, + }; - // Orphan is a tip hash - if lmdb_exists(txn, &self.orphan_chain_tips_db, hash.as_slice())? { - lmdb_delete(txn, &self.orphan_chain_tips_db, hash.as_slice(), "orphan_chain_tips_db")?; + let parent_hash = orphan.header.prev_hash; + lmdb_delete_key_value(txn, &self.orphan_parent_map_index, parent_hash.as_slice(), &hash)?; - // Parent becomes a tip hash - if lmdb_exists(txn, &self.orphans_db, parent_hash.as_slice())? { - lmdb_insert( - txn, - &self.orphan_chain_tips_db, - parent_hash.as_slice(), - &parent_hash, - "orphan_chain_tips_db", - )?; - } - } + // Orphan is a tip hash + if lmdb_exists(txn, &self.orphan_chain_tips_db, hash.as_slice())? { + lmdb_delete(txn, &self.orphan_chain_tips_db, hash.as_slice(), "orphan_chain_tips_db")?; - if lmdb_exists(txn, &self.orphan_header_accumulated_data_db, hash.as_slice())? { - lmdb_delete( + // Parent becomes a tip hash + if lmdb_exists(txn, &self.orphans_db, parent_hash.as_slice())? { + lmdb_insert( txn, - &self.orphan_header_accumulated_data_db, - hash.as_slice(), - "orphan_header_accumulated_data_db", + &self.orphan_chain_tips_db, + parent_hash.as_slice(), + &parent_hash, + "orphan_chain_tips_db", )?; } + } - if lmdb_get::<_, BlockHeaderAccumulatedData>(txn, &self.orphan_header_accumulated_data_db, hash.as_slice())? - .is_some() - { - lmdb_delete( - txn, - &self.orphan_header_accumulated_data_db, - hash.as_slice(), - "orphan_header_accumulated_data_db", - )?; - } - lmdb_delete(txn, &self.orphans_db, hash.as_slice(), "orphans_db")?; + if lmdb_exists(txn, &self.orphan_header_accumulated_data_db, hash.as_slice())? { + lmdb_delete( + txn, + &self.orphan_header_accumulated_data_db, + hash.as_slice(), + "orphan_header_accumulated_data_db", + )?; + } + + if lmdb_exists(txn, &self.orphan_header_accumulated_data_db, hash.as_slice())? { + lmdb_delete( + txn, + &self.orphan_header_accumulated_data_db, + hash.as_slice(), + "orphan_header_accumulated_data_db", + )?; } + lmdb_delete(txn, &self.orphans_db, hash.as_slice(), "orphans_db")?; Ok(()) } @@ -1514,7 +1512,7 @@ impl BlockchainBackend for LMDBDatabase { } Err(ChainStorageError::ValueNotFound { - entity: "chain_header_in_all_chains", + entity: "chain header (in chain_header_in_all_chains)", field: "hash", value: hash.to_hex(), }) @@ -2015,14 +2013,15 @@ impl BlockchainBackend for LMDBDatabase { Ok(Some(chain_header)) } - fn fetch_orphan_children_of(&self, hash: HashOutput) -> Result, ChainStorageError> { + fn fetch_orphan_children_of(&self, parent_hash: HashOutput) -> Result, ChainStorageError> { trace!( target: LOG_TARGET, "Call to fetch_orphan_children_of({})", - hash.to_hex() + parent_hash.to_hex() ); let txn = self.read_transaction()?; - let orphan_hashes: Vec = lmdb_get_multiple(&txn, &self.orphan_parent_map_index, hash.as_slice())?; + let orphan_hashes: Vec = + lmdb_get_multiple(&txn, &self.orphan_parent_map_index, parent_hash.as_slice())?; let mut res = Vec::with_capacity(orphan_hashes.len()); for hash in orphan_hashes { res.push(lmdb_get(&txn, &self.orphans_db, hash.as_slice())?.ok_or_else(|| { diff --git a/base_layer/core/src/lib.rs b/base_layer/core/src/lib.rs index 81da0f0c850..8d6b4fc3b5a 100644 --- a/base_layer/core/src/lib.rs +++ b/base_layer/core/src/lib.rs @@ -45,6 +45,7 @@ pub mod proof_of_work; pub mod validation; #[cfg(any(test, feature = "base_node"))] +#[macro_use] pub mod test_helpers; #[cfg(any(feature = "base_node", feature = "base_node_proto"))] diff --git a/base_layer/core/src/test_helpers/block_spec.rs b/base_layer/core/src/test_helpers/block_spec.rs new file mode 100644 index 00000000000..997e05ef292 --- /dev/null +++ b/base_layer/core/src/test_helpers/block_spec.rs @@ -0,0 +1,238 @@ +// Copyright 2021, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use crate::{ + proof_of_work::Difficulty, + transactions::{tari_amount::MicroTari, transaction::Transaction}, +}; + +pub struct BlockSpecs { + pub specs: Vec, +} + +impl BlockSpecs { + pub fn len(&self) -> usize { + self.specs.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn into_vec(self) -> Vec { + self.specs + } +} + +impl From> for BlockSpecs { + fn from(specs: Vec) -> Self { + Self { specs } + } +} + +impl<'a, const N: usize> From<&'a [(&'static str, u64, u64); N]> for BlockSpecs { + fn from(arr: &'a [(&'static str, u64, u64); N]) -> Self { + BlockSpecs::from(&arr[..]) + } +} + +impl<'a> From<&'a [(&'static str, u64, u64)]> for BlockSpecs { + fn from(arr: &'a [(&'static str, u64, u64)]) -> Self { + Self { + specs: arr + .iter() + .map(|(name, diff, time)| { + BlockSpec::builder() + .with_name(name) + .with_block_time(*time) + .with_difficulty((*diff).into()) + .finish() + }) + .collect(), + } + } +} + +impl IntoIterator for BlockSpecs { + type IntoIter = std::vec::IntoIter; + type Item = BlockSpec; + + fn into_iter(self) -> Self::IntoIter { + self.specs.into_iter() + } +} + +#[macro_export] +macro_rules! block_spec { + (@ { $spec:ident }) => {}; + + (@ { $spec: ident } height: $height:expr, $($tail:tt)*) => { + $spec = $spec.with_height($height); + $crate::block_spec!(@ { $spec } $($tail)*) + }; + (@ { $spec: ident } difficulty: $difficulty:expr, $($tail:tt)*) => { + $spec = $spec.with_difficulty($difficulty.into()); + $crate::block_spec!(@ { $spec } $($tail)*) + }; + (@ { $spec: ident } reward: $reward:expr, $($tail:tt)*) => { + $spec = $spec.with_reward($reward.into()); + $crate::block_spec!(@ { spec } $($tail)*) + }; + + (@ { $spec: ident } $k:ident: $v:expr $(,)?) => { $crate::block_spec!(@ { $spec } $k: $v,) }; + + ($name:expr, $($tail:tt)+) => {{ + let mut spec = $crate::block_spec!($name); + $crate::block_spec!(@ { spec } $($tail)+); + spec.finish() + }}; + ($name:expr $(,)?) => { + $crate::test_helpers::BlockSpec::builder().with_name($name).finish() + }; +} + +/// Usage: +/// ```ignore +/// block_specs!(["1a->GB"], ["2a->1a"], ["3a->2a", difficulty: 2], ["4a->3a", reward: 50000]); +/// ``` +#[macro_export] +macro_rules! block_specs { + (@ { $specs:ident }) => {}; + + (@ { $specs:ident } [$name:expr, $($k:ident: $v:expr),*], $($tail:tt)*) => { + $specs.push($crate::block_spec!($name, $($k: $v),*)); + block_specs!(@ { $specs } $($tail)*) + }; + + (@ { $specs:ident } [$name:expr $(,)?], $($tail:tt)*) => { block_specs!(@ { $specs } [$name,], $($tail)*) }; + + (@ { $specs:ident } [$name:expr $(,)?]$(,)?) => { block_specs!(@ { $specs } [$name,],) }; + + (@ { $specs:ident } [$name:expr, $($k:ident: $v:expr),* $(,)?] $(,)?) => { block_specs!(@ { $specs } [$name, $($k: $v),*],) }; + + // Entrypoints + ([$name:expr, $($k:ident: $v:expr),*], $($tail:tt)*) => { + #[allow(clippy::vec_init_then_push)] + { + let mut specs = Vec::new(); + $crate::block_specs!(@ { specs } [$name, $($k: $v),*], $($tail)*); + BlockSpecs::from(specs) + } + }; + ([$name:expr, $($k:ident: $v:expr),* $(,)?] $(,)*) => {{ + $crate::block_specs!([$name, $($k: $v),*],) + }}; + + ([$name:expr], $($tail:tt)*) => {{ $crate::block_specs!([$name,], $($tail)*) }}; + + ([$name:expr]) => {{ $crate::block_specs!([$name,],) }}; + + () => { BlockSpecs::from(Vec::new()) }; +} + +#[derive(Debug, Clone)] +pub struct BlockSpec { + pub name: &'static str, + pub prev_block: &'static str, + pub version: u16, + pub difficulty: Difficulty, + pub block_time: u64, + pub reward_override: Option, + pub height_override: Option, + pub transactions: Vec, + pub skip_coinbase: bool, +} + +impl BlockSpec { + pub fn new() -> Self { + Default::default() + } + + pub fn builder() -> Self { + Default::default() + } + + pub fn with_name(mut self, name: &'static str) -> Self { + let mut split = name.splitn(2, "->"); + let name = split.next().unwrap_or(""); + self.name = name; + if let Some(prev_block) = split.next() { + self.prev_block = prev_block; + } + self + } + + pub fn with_prev_block(mut self, prev_block_name: &'static str) -> Self { + self.prev_block = prev_block_name; + self + } + + pub fn with_height(mut self, height: u64) -> Self { + self.height_override = Some(height); + self + } + + pub fn with_difficulty(mut self, difficulty: Difficulty) -> Self { + self.difficulty = difficulty; + self + } + + pub fn with_block_time(mut self, block_time: u64) -> Self { + self.block_time = block_time; + self + } + + pub fn with_reward(mut self, reward: MicroTari) -> Self { + self.reward_override = Some(reward); + self + } + + pub fn skip_coinbase(mut self) -> Self { + self.skip_coinbase = true; + self + } + + pub fn with_transactions(mut self, transactions: Vec) -> Self { + self.transactions = transactions; + self + } + + pub fn finish(self) -> Self { + self + } +} + +impl Default for BlockSpec { + fn default() -> Self { + Self { + name: "", + prev_block: "", + version: 0, + difficulty: 1.into(), + block_time: 120, + height_override: None, + reward_override: None, + transactions: vec![], + skip_coinbase: false, + } + } +} diff --git a/base_layer/core/src/test_helpers/blockchain.rs b/base_layer/core/src/test_helpers/blockchain.rs index c6e05896663..a98a3189472 100644 --- a/base_layer/core/src/test_helpers/blockchain.rs +++ b/base_layer/core/src/test_helpers/blockchain.rs @@ -49,7 +49,7 @@ use crate::{ consensus::{chain_strength_comparer::ChainStrengthComparerBuilder, ConsensusConstantsBuilder, ConsensusManager}, crypto::tari_utilities::Hashable, proof_of_work::{AchievedTargetDifficulty, Difficulty, PowAlgorithm}, - test_helpers::BlockSpec, + test_helpers::{block_spec::BlockSpecs, BlockSpec}, transactions::{ transaction::{TransactionInput, TransactionKernel, UnblindedOutput}, CryptoFactories, @@ -381,32 +381,25 @@ impl BlockchainBackend for TempDatabase { } } -pub fn create_chained_blocks( - blocks: &[(&str, u64, u64)], +pub fn create_chained_blocks>( + blocks: T, genesis_block: Arc, ) -> (Vec, HashMap>) { let mut block_hashes = HashMap::new(); block_hashes.insert("GB".to_string(), genesis_block); let rules = ConsensusManager::builder(Network::LocalNet).build(); - + let blocks: BlockSpecs = blocks.into(); let mut block_names = Vec::with_capacity(blocks.len()); - for (name, difficulty, time) in blocks { - let split = name.split("->").collect::>(); - let to = split[0].to_string(); - let from = split[1].to_string(); - + for block_spec in blocks { let prev_block = block_hashes - .get(&from) - .unwrap_or_else(|| panic!("Could not find block {}", from)); - let block_spec = BlockSpec::new() - .with_difficulty((*difficulty).into()) - .with_block_time(*time) - .finish(); + .get(block_spec.prev_block) + .unwrap_or_else(|| panic!("Could not find block {}", block_spec.prev_block)); + let name = block_spec.name; + let difficulty = block_spec.difficulty; let (block, _) = create_block(&rules, prev_block.block(), block_spec); - let block = mine_block(block, prev_block.accumulated_data(), (*difficulty).into()); - - block_names.push(to.clone()); - block_hashes.insert(to, block); + let block = mine_block(block, prev_block.accumulated_data(), difficulty); + block_names.push(name.to_string()); + block_hashes.insert(name.to_string(), block); } (block_names, block_hashes) } @@ -425,9 +418,9 @@ fn mine_block(block: Block, prev_block_accum: &BlockHeaderAccumulatedData, diffi Arc::new(ChainBlock::try_construct(Arc::new(block), accum).unwrap()) } -pub fn create_main_chain( +pub fn create_main_chain>( db: &BlockchainDatabase, - blocks: &[(&str, u64, u64)], + blocks: T, ) -> (Vec, HashMap>) { let genesis_block = db.fetch_block(0).unwrap().try_into_chain_block().map(Arc::new).unwrap(); let (names, chain) = create_chained_blocks(blocks, genesis_block); @@ -439,9 +432,9 @@ pub fn create_main_chain( (names, chain) } -pub fn create_orphan_chain( +pub fn create_orphan_chain>( db: &BlockchainDatabase, - blocks: &[(&str, u64, u64)], + blocks: T, root_block: Arc, ) -> (Vec, HashMap>) { let (names, chain) = create_chained_blocks(blocks, root_block); diff --git a/base_layer/core/src/test_helpers/mod.rs b/base_layer/core/src/test_helpers/mod.rs index 691623b2926..278d9791b8a 100644 --- a/base_layer/core/src/test_helpers/mod.rs +++ b/base_layer/core/src/test_helpers/mod.rs @@ -23,6 +23,10 @@ //! Common test helper functions that are small and useful enough to be included in the main crate, rather than the //! integration test folder. +#[macro_use] +mod block_spec; +pub use block_spec::{BlockSpec, BlockSpecs}; + pub mod blockchain; use crate::{ @@ -32,7 +36,6 @@ use crate::{ crypto::tari_utilities::Hashable, proof_of_work::{sha3_difficulty, AchievedTargetDifficulty, Difficulty}, transactions::{ - tari_amount::MicroTari, transaction::{Transaction, UnblindedOutput}, CoinbaseBuilder, CryptoFactories, @@ -43,64 +46,6 @@ use std::{iter, path::Path, sync::Arc}; use tari_comms::PeerManager; use tari_storage::{lmdb_store::LMDBBuilder, LMDBWrapper}; -#[derive(Debug, Clone)] -pub struct BlockSpec { - version: u16, - difficulty: Difficulty, - block_time: u64, - reward_override: Option, - transactions: Vec, - skip_coinbase: bool, -} - -impl BlockSpec { - pub fn new() -> Self { - Default::default() - } - - pub fn with_difficulty(mut self, difficulty: Difficulty) -> Self { - self.difficulty = difficulty; - self - } - - pub fn with_block_time(mut self, block_time: u64) -> Self { - self.block_time = block_time; - self - } - - pub fn with_reward(mut self, reward: MicroTari) -> Self { - self.reward_override = Some(reward); - self - } - - pub fn skip_coinbase(mut self) -> Self { - self.skip_coinbase = true; - self - } - - pub fn with_transactions(mut self, transactions: Vec) -> Self { - self.transactions = transactions; - self - } - - pub fn finish(self) -> Self { - self - } -} - -impl Default for BlockSpec { - fn default() -> Self { - Self { - version: 0, - difficulty: 1.into(), - block_time: 120, - reward_override: None, - transactions: vec![], - skip_coinbase: false, - } - } -} - /// Create a partially constructed block using the provided set of transactions /// is chain_block, or rename it to `create_orphan_block` and drop the prev_block argument pub fn create_orphan_block(block_height: u64, transactions: Vec, consensus: &ConsensusManager) -> Block { @@ -111,7 +56,7 @@ pub fn create_orphan_block(block_height: u64, transactions: Vec, co pub fn create_block(rules: &ConsensusManager, prev_block: &Block, spec: BlockSpec) -> (Block, UnblindedOutput) { let mut header = BlockHeader::new(spec.version); - let block_height = prev_block.header.height + 1; + let block_height = spec.height_override.unwrap_or(prev_block.header.height + 1); header.height = block_height; header.prev_hash = prev_block.hash(); let reward = spec.reward_override.unwrap_or_else(|| { diff --git a/base_layer/core/src/validation/block_validators/body_only.rs b/base_layer/core/src/validation/block_validators/body_only.rs index 3c88d75640f..2e722654f76 100644 --- a/base_layer/core/src/validation/block_validators/body_only.rs +++ b/base_layer/core/src/validation/block_validators/body_only.rs @@ -46,22 +46,22 @@ impl PostOrphanBodyValidation for BodyOnlyValidator { /// 1. Are the block header MMR roots valid? fn validate_body_for_valid_orphan( &self, - block: &ChainBlock, backend: &B, + block: &ChainBlock, metadata: &ChainMetadata, ) -> Result<(), ValidationError> { - if block.header().height != metadata.height_of_longest_chain() + 1 { - return Err(ValidationError::IncorrectNextTipHeight { - expected: metadata.height_of_longest_chain() + 1, - block_height: block.height(), - }); - } if block.header().prev_hash != *metadata.best_block() { return Err(ValidationError::IncorrectPreviousHash { expected: metadata.best_block().to_hex(), block_hash: block.hash().to_hex(), }); } + if block.height() != metadata.height_of_longest_chain() + 1 { + return Err(ValidationError::IncorrectNextTipHeight { + expected: metadata.height_of_longest_chain() + 1, + block_height: block.height(), + }); + } let block_id = format!("block #{} ({})", block.header().height, block.hash().to_hex()); helpers::check_inputs_are_utxos(backend, &block.block().body)?; diff --git a/base_layer/core/src/validation/chain_balance.rs b/base_layer/core/src/validation/chain_balance.rs index f6bf03e8987..582c4402d9e 100644 --- a/base_layer/core/src/validation/chain_balance.rs +++ b/base_layer/core/src/validation/chain_balance.rs @@ -55,10 +55,10 @@ impl ChainBalanceValidator { impl FinalHorizonStateValidation for ChainBalanceValidator { fn validate( &self, + backend: &B, height: u64, total_utxo_sum: &Commitment, total_kernel_sum: &Commitment, - backend: &B, ) -> Result<(), ValidationError> { let emission_h = self.get_emission_commitment_at(height); let total_offset = self.fetch_total_offset_commitment(height, backend)?; diff --git a/base_layer/core/src/validation/mocks.rs b/base_layer/core/src/validation/mocks.rs index 891e7125a05..d5d347133b2 100644 --- a/base_layer/core/src/validation/mocks.rs +++ b/base_layer/core/src/validation/mocks.rs @@ -82,7 +82,7 @@ impl BlockSyncBodyValidation for MockValidator { } impl PostOrphanBodyValidation for MockValidator { - fn validate_body_for_valid_orphan(&self, _: &ChainBlock, _: &B, _: &ChainMetadata) -> Result<(), ValidationError> { + fn validate_body_for_valid_orphan(&self, _: &B, _: &ChainBlock, _: &ChainMetadata) -> Result<(), ValidationError> { if self.is_valid.load(Ordering::SeqCst) { Ok(()) } else { @@ -143,10 +143,10 @@ impl MempoolTransactionValidation for MockValidator { impl FinalHorizonStateValidation for MockValidator { fn validate( &self, + _backend: &B, _height: u64, _total_utxo_sum: &Commitment, _total_kernel_sum: &Commitment, - _backend: &B, ) -> Result<(), ValidationError> { if self.is_valid.load(Ordering::SeqCst) { Ok(()) diff --git a/base_layer/core/src/validation/test.rs b/base_layer/core/src/validation/test.rs index f60f28a2656..886a878dcb2 100644 --- a/base_layer/core/src/validation/test.rs +++ b/base_layer/core/src/validation/test.rs @@ -137,7 +137,7 @@ fn chain_balance_validation() { let validator = ChainBalanceValidator::new(consensus_manager.clone(), factories.clone()); // Validate the genesis state validator - .validate(0, &utxo_sum, &kernel_sum, &*db.db_read_access().unwrap()) + .validate(&*db.db_read_access().unwrap(), 0, &utxo_sum, &kernel_sum) .unwrap(); //---------------------------------- Add a new coinbase and header --------------------------------------------// @@ -187,7 +187,7 @@ fn chain_balance_validation() { utxo_sum = &coinbase.commitment + &utxo_sum; kernel_sum = &kernel.excess + &kernel_sum; validator - .validate(1, &utxo_sum, &kernel_sum, &*db.db_read_access().unwrap()) + .validate(&*db.db_read_access().unwrap(), 1, &utxo_sum, &kernel_sum) .unwrap(); //---------------------------------- Try to inflate --------------------------------------------// @@ -231,6 +231,6 @@ fn chain_balance_validation() { db.commit(txn).unwrap(); validator - .validate(2, &utxo_sum, &kernel_sum, &*db.db_read_access().unwrap()) + .validate(&*db.db_read_access().unwrap(), 2, &utxo_sum, &kernel_sum) .unwrap_err(); } diff --git a/base_layer/core/src/validation/traits.rs b/base_layer/core/src/validation/traits.rs index ad77c387e28..684e68600a3 100644 --- a/base_layer/core/src/validation/traits.rs +++ b/base_layer/core/src/validation/traits.rs @@ -41,8 +41,8 @@ pub trait BlockSyncBodyValidation: Send + Sync { pub trait PostOrphanBodyValidation: Send + Sync { fn validate_body_for_valid_orphan( &self, - block: &ChainBlock, backend: &B, + block: &ChainBlock, metadata: &ChainMetadata, ) -> Result<(), ValidationError>; } @@ -67,9 +67,9 @@ pub trait HeaderValidation: Send + Sync { pub trait FinalHorizonStateValidation: Send + Sync { fn validate( &self, + backend: &B, height: u64, total_utxo_sum: &Commitment, total_kernel_sum: &Commitment, - backend: &B, ) -> Result<(), ValidationError>; } diff --git a/base_layer/core/tests/block_validation.rs b/base_layer/core/tests/block_validation.rs index 8b7303ed47b..a05688f5ef0 100644 --- a/base_layer/core/tests/block_validation.rs +++ b/base_layer/core/tests/block_validation.rs @@ -439,7 +439,7 @@ OutputFeatures::default()), let metadata = db.get_chain_metadata().unwrap(); // this block should be okay assert!(body_only_validator - .validate_body_for_valid_orphan(&chain_block, &*db.db_read_access().unwrap(), &metadata) + .validate_body_for_valid_orphan(&*db.db_read_access().unwrap(), &chain_block, &metadata) .is_ok()); // lets break the chain sequence @@ -464,7 +464,7 @@ OutputFeatures::default()), let chain_block = ChainBlock::try_construct(Arc::new(new_block), accumulated_data).unwrap(); let metadata = db.get_chain_metadata().unwrap(); assert!(body_only_validator - .validate_body_for_valid_orphan(&chain_block, &*db.db_read_access().unwrap(), &metadata) + .validate_body_for_valid_orphan(&*db.db_read_access().unwrap(), &chain_block, &metadata) .is_err()); // lets have unknown inputs; @@ -503,7 +503,7 @@ OutputFeatures::default()), let chain_block = ChainBlock::try_construct(Arc::new(new_block), accumulated_data).unwrap(); let metadata = db.get_chain_metadata().unwrap(); assert!(body_only_validator - .validate_body_for_valid_orphan(&chain_block, &*db.db_read_access().unwrap(), &metadata) + .validate_body_for_valid_orphan(&*db.db_read_access().unwrap(), &chain_block, &metadata) .is_err()); // lets check duplicate txos @@ -533,7 +533,7 @@ OutputFeatures::default()), let chain_block = ChainBlock::try_construct(Arc::new(new_block), accumulated_data).unwrap(); let metadata = db.get_chain_metadata().unwrap(); assert!(body_only_validator - .validate_body_for_valid_orphan(&chain_block, &*db.db_read_access().unwrap(), &metadata) + .validate_body_for_valid_orphan(&*db.db_read_access().unwrap(), &chain_block, &metadata) .is_err()); // check mmr roots @@ -560,7 +560,7 @@ OutputFeatures::default()), let chain_block = ChainBlock::try_construct(Arc::new(new_block), accumulated_data).unwrap(); let metadata = db.get_chain_metadata().unwrap(); assert!(body_only_validator - .validate_body_for_valid_orphan(&chain_block, &*db.db_read_access().unwrap(), &metadata) + .validate_body_for_valid_orphan(&*db.db_read_access().unwrap(), &chain_block, &metadata) .is_err()); }