Skip to content

Commit

Permalink
feat: miningcore transcoder (#3003)
Browse files Browse the repository at this point in the history
This PR adds the tari_stratum_ffi library and tari_stratum_transcoder application for use with Miningcore.

This PR also expands tari_mining_node with stratum support to interact with Miningcore.

See here for the Tari implementation in Miningcore:
Source: https://github.com/StriderDM/miningcore/tree/tari
Compare: coinfoundry/miningcore@master...StriderDM:tari

Edit: Functional, will need refactoring
  • Loading branch information
stringhandler committed Aug 3, 2021
2 parents 0e0bfe0 + 8bd46cd commit ee9a225
Show file tree
Hide file tree
Showing 52 changed files with 3,686 additions and 93 deletions.
317 changes: 271 additions & 46 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ members = [
"base_layer/service_framework",
"base_layer/wallet",
"base_layer/wallet_ffi",
"base_layer/tari_stratum_ffi",
"comms",
"comms/dht",
"comms/rpc_macros",
Expand All @@ -20,5 +21,6 @@ members = [
"applications/test_faucet",
"applications/tari_app_utilities",
"applications/tari_merge_mining_proxy",
"applications/tari_stratum_transcoder",
"applications/tari_mining_node",
]
9 changes: 8 additions & 1 deletion applications/tari_mining_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,14 @@ serde = { version = "1.0", default_features = false, features = ["derive"] }
tonic = { version = "0.2", features = ["transport"] }
tokio = { version = "0.2", default_features = false, features = ["rt-core"] }
thiserror = "1.0"

jsonrpc = "0.11.0"
reqwest = { version = "0.11", features = ["blocking", "json"] }
serde_json = "1.0.57"
native-tls = "0.2"
bufstream = "0.1"
time = "0.1"
chrono = "0.4"
hex = "0.4.2"

[dev-dependencies]
tari_crypto = "0.11.1"
Expand Down
4 changes: 4 additions & 0 deletions applications/tari_mining_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub struct MinerConfig {
pub mine_on_tip_only: bool,
pub proof_of_work_algo: ProofOfWork,
pub validate_tip_timeout_sec: u64,
pub mining_pool_address: String,
pub mining_wallet_address: String,
}

#[derive(Serialize, Deserialize, Debug)]
Expand All @@ -71,6 +73,8 @@ impl Default for MinerConfig {
mine_on_tip_only: true,
proof_of_work_algo: ProofOfWork::Sha3,
validate_tip_timeout_sec: 30,
mining_pool_address: "".to_string(),
mining_wallet_address: "".to_string(),
}
}
}
Expand Down
158 changes: 112 additions & 46 deletions applications/tari_mining_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,33 @@ mod config;
mod difficulty;
mod errors;
mod miner;
mod stratum;
mod utils;

use crate::miner::MiningReport;
use crate::{
miner::MiningReport,
stratum::{stratum_controller::controller::Controller, stratum_miner::miner::StratumMiner},
};
use errors::{err_empty, MinerError};
use miner::Miner;
use std::{convert::TryFrom, time::Instant};
use std::{
convert::TryFrom,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread,
time::Instant,
};

/// Application entry point
fn main() {
let mut rt = Runtime::new().expect("Failed to start tokio runtime");
match rt.block_on(main_inner()) {
Ok(_) => std::process::exit(0),
Err(exit_code) => {
eprintln!("Fatal error: {}", exit_code);
error!("Exiting with code: {}", exit_code);
eprintln!("Fatal error: {:?}", exit_code);
error!("Exiting with code: {:?}", exit_code);
std::process::exit(exit_code.as_i32())
},
}
Expand All @@ -64,53 +76,107 @@ async fn main_inner() -> Result<(), ExitCodes> {
debug!("{:?}", bootstrap);
debug!("{:?}", config);

let (mut node_conn, mut wallet_conn) = connect(&config, &global).await.map_err(ExitCodes::grpc)?;
if !config.mining_wallet_address.is_empty() && !config.mining_pool_address.is_empty() {
let url = config.mining_pool_address.clone();
let miner_address = config.mining_wallet_address.clone();
let mut mc = Controller::new().unwrap_or_else(|e| {
panic!("Error loading mining controller: {}", e);
});
let cc = stratum::controller::Controller::new(&url, Some(miner_address), None, None, mc.tx.clone())
.unwrap_or_else(|e| {
panic!("Error loading stratum client controller: {:?}", e);
});
let miner_stopped = Arc::new(AtomicBool::new(false));
let client_stopped = Arc::new(AtomicBool::new(false));

let mut blocks_found: u64 = 0;
loop {
debug!("Starting new mining cycle");
match mining_cycle(&mut node_conn, &mut wallet_conn, &config, &bootstrap).await {
err @ Err(MinerError::GrpcConnection(_)) | err @ Err(MinerError::GrpcStatus(_)) => {
// Any GRPC error we will try to reconnect with a standard delay
error!("Connection error: {:?}", err);
loop {
mc.set_client_tx(cc.tx.clone());
let mut miner = StratumMiner::new(config);
if let Err(e) = miner.start_solvers() {
println!("Error. Please check logs for further info.");
println!("Error details:");
println!("{:?}", e);
println!("Exiting");
}

let miner_stopped_internal = miner_stopped.clone();
let _ = thread::Builder::new()
.name("mining_controller".to_string())
.spawn(move || {
if let Err(e) = mc.run(miner) {
error!("Error. Please check logs for further info: {:?}", e);
return;
}
miner_stopped_internal.store(true, Ordering::Relaxed);
});

let client_stopped_internal = client_stopped.clone();
let _ = thread::Builder::new()
.name("client_controller".to_string())
.spawn(move || {
cc.run();
client_stopped_internal.store(true, Ordering::Relaxed);
});

loop {
if miner_stopped.load(Ordering::Relaxed) && client_stopped.load(Ordering::Relaxed) {
thread::sleep(std::time::Duration::from_millis(100));
break;
}
thread::sleep(std::time::Duration::from_millis(100));
}
Ok(())
} else {
config.mine_on_tip_only = global.mine_on_tip_only;
debug!("mine_on_tip_only is {}", config.mine_on_tip_only);

let (mut node_conn, mut wallet_conn) = connect(&config, &global).await.map_err(ExitCodes::grpc)?;

let mut blocks_found: u64 = 0;
loop {
debug!("Starting new mining cycle");
match mining_cycle(&mut node_conn, &mut wallet_conn, &config, &bootstrap).await {
err @ Err(MinerError::GrpcConnection(_)) | err @ Err(MinerError::GrpcStatus(_)) => {
// Any GRPC error we will try to reconnect with a standard delay
error!("Connection error: {:?}", err);
loop {
debug!("Holding for {:?}", config.wait_timeout());
delay_for(config.wait_timeout()).await;
match connect(&config, &global).await {
Ok((nc, wc)) => {
node_conn = nc;
wallet_conn = wc;
break;
},
Err(err) => {
error!("Connection error: {:?}", err);
continue;
},
}
}
},
Err(MinerError::MineUntilHeightReached(h)) => {
info!("Prescribed blockchain height {} reached. Aborting ...", h);
return Ok(());
},
Err(MinerError::MinerLostBlock(h)) => {
info!("Height {} already mined by other node. Restarting ...", h);
},
Err(err) => {
error!("Error: {:?}", err);
debug!("Holding for {:?}", config.wait_timeout());
delay_for(config.wait_timeout()).await;
match connect(&config, &global).await {
Ok((nc, wc)) => {
node_conn = nc;
wallet_conn = wc;
break;
},
Err(err) => {
error!("Connection error: {:?}", err);
continue;
},
},
Ok(submitted) => {
if submitted {
blocks_found += 1;
}
}
},
Err(MinerError::MineUntilHeightReached(h)) => {
info!("Prescribed blockchain height {} reached. Aborting ...", h);
return Ok(());
},
Err(MinerError::MinerLostBlock(h)) => {
info!("Height {} already mined by other node. Restarting ...", h);
},
Err(err) => {
error!("Error: {:?}", err);
debug!("Holding for {:?}", config.wait_timeout());
delay_for(config.wait_timeout()).await;
},
Ok(submitted) => {
if submitted {
blocks_found += 1;
}
if let Some(max_blocks) = bootstrap.miner_max_blocks {
if blocks_found >= max_blocks {
return Ok(());
if let Some(max_blocks) = bootstrap.miner_max_blocks {
if blocks_found >= max_blocks {
return Ok(());
}
}
}
},
},
}
}
}
}
Expand Down
Loading

0 comments on commit ee9a225

Please sign in to comment.