diff --git a/applications/tari_base_node/src/grpc/base_node_grpc_server.rs b/applications/tari_base_node/src/grpc/base_node_grpc_server.rs index 6f328b4404..d99c2e6183 100644 --- a/applications/tari_base_node/src/grpc/base_node_grpc_server.rs +++ b/applications/tari_base_node/src/grpc/base_node_grpc_server.rs @@ -446,7 +446,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { asset_pub_key_hex ); - for token in tokens { + for (token, mined_height) in tokens { let features = match token.features.clone().try_into() { Ok(f) => f, Err(err) => { @@ -465,7 +465,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { unique_id: token.features.unique_id.unwrap_or_default(), owner_commitment: token.commitment.to_vec(), mined_in_block: vec![], - mined_height: 0, + mined_height, script: token.script.as_bytes(), features: Some(features), })) diff --git a/applications/tari_collectibles/web-app/src/binding.js b/applications/tari_collectibles/web-app/src/binding.js index c4778c4fc9..780c221514 100644 --- a/applications/tari_collectibles/web-app/src/binding.js +++ b/applications/tari_collectibles/web-app/src/binding.js @@ -53,7 +53,7 @@ async function command_assets_get_registration(assetPublicKey) { async function command_asset_create_initial_checkpoint(assetPublicKey) { return await invoke("assets_create_initial_checkpoint", { - assetPublicKey, + assetPubKey: assetPublicKey, }); } diff --git a/applications/tari_validator_node/src/asset.rs b/applications/tari_validator_node/src/asset.rs new file mode 100644 index 0000000000..e88195ef69 --- /dev/null +++ b/applications/tari_validator_node/src/asset.rs @@ -0,0 +1,91 @@ +// Copyright 2022. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; + +use tari_dan_core::models::AssetDefinition; + +#[derive(Debug)] +pub struct Asset { + definition: AssetDefinition, + current_state: bool, + // Changes in the committee for this asset. + // Mined height of the change TXs, and the involvement in the committe (true = part of committee) + next_states: HashMap, + kill_signal: Option>, +} + +impl Asset { + pub fn new(definition: AssetDefinition) -> Self { + Self { + definition, + current_state: false, + next_states: HashMap::new(), + kill_signal: None, + } + } + + pub fn update_height(&mut self, height: u64, start: Fstart) + where Fstart: Fn(AssetDefinition) -> Arc { + if let Some((&height, &involvement)) = self + .next_states + .iter() + .find(|(&mined_height, _)| mined_height <= height) + { + // State change + if self.current_state != involvement { + if involvement { + self.kill_signal = Some(start(self.definition.clone())); + } else { + // Switch on the kill signal for the asset to end processing + let stop = self.kill_signal.clone().unwrap(); + stop.as_ref().store(true, Ordering::Relaxed); + self.kill_signal = None; + } + } + self.current_state = involvement; + // We have the current state set and we will keep only future updates + self.next_states + .retain(|&effective_height, _| effective_height > height); + // Monitor this asset if we are part of committee or there is a next state + } + } + + // If we are part of committe or there is a next state then monitor this asset + pub fn should_monitor(&self) -> bool { + self.current_state || !self.next_states.is_empty() + } + + pub fn add_state(&mut self, height: u64, involvement: bool) { + self.next_states.insert(height, involvement); + } + + pub fn is_committee_member(&self) -> bool { + self.current_state + } +} diff --git a/applications/tari_validator_node/src/dan_node.rs b/applications/tari_validator_node/src/dan_node.rs index 57f9b2be79..82078aa0bb 100644 --- a/applications/tari_validator_node/src/dan_node.rs +++ b/applications/tari_validator_node/src/dan_node.rs @@ -20,7 +20,10 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{ + sync::{atomic::AtomicBool, Arc}, + time::Duration, +}; use log::*; use tari_common::{ @@ -28,6 +31,7 @@ use tari_common::{ exit_codes::{ExitCode, ExitError}, GlobalConfig, }; +use tari_common_types::types::PublicKey; use tari_comms::{types::CommsPublicKey, NodeIdentity}; use tari_comms_dht::Dht; use tari_crypto::tari_utilities::hex::Hex; @@ -55,6 +59,7 @@ use tokio::{task, time}; use crate::{ default_service_specification::DefaultServiceSpecification, grpc::services::{base_node_client::GrpcBaseNodeClient, wallet_client::GrpcWalletClient}, + monitoring::Monitoring, p2p::services::{ inbound_connection_service::TariCommsInboundConnectionService, outbound_connection_service::TariCommsOutboundService, @@ -89,9 +94,9 @@ impl DanNode { .ok_or_else(|| ExitError::new(ExitCode::ConfigError, "Missing dan section"))?; let mut base_node_client = GrpcBaseNodeClient::new(dan_config.base_node_grpc_address); - #[allow(clippy::mutable_key_type)] - let mut tasks = HashMap::new(); let mut next_scanned_height = 0u64; + let mut last_tip = 0u64; + let mut monitoring = Monitoring::new(dan_config.committee_management_confirmation_time); loop { let tip = base_node_client .get_tip_info() @@ -103,12 +108,13 @@ impl DanNode { "Scanning base layer (tip : {}) for new assets", tip.height_of_longest_chain ); if dan_config.scan_for_assets { - next_scanned_height = tip.height_of_longest_chain + dan_config.new_asset_scanning_interval; + next_scanned_height = + tip.height_of_longest_chain + dan_config.committee_management_polling_interval; info!(target: LOG_TARGET, "Next scanning height {}", next_scanned_height); } else { next_scanned_height = u64::MAX; // Never run again. } - let assets = base_node_client + let mut assets = base_node_client .get_assets_for_dan_node(node_identity.public_key().clone()) .await .map_err(|e| ExitError::new(ExitCode::DigitalAssetError, e))?; @@ -117,54 +123,64 @@ impl DanNode { "Base node returned {} asset(s) to process", assets.len() ); - for asset in assets { - if tasks.contains_key(&asset.public_key) { - debug!( - target: LOG_TARGET, - "Asset task already running for asset '{}'", asset.public_key - ); - continue; - } - if let Some(allow_list) = &dan_config.assets_allow_list { - if !allow_list.contains(&asset.public_key.to_hex()) { - debug!( - target: LOG_TARGET, - "Asset '{}' is not allowlisted for processing ", asset.public_key - ); - continue; + if let Some(allow_list) = &dan_config.assets_allow_list { + assets.retain(|(asset, _)| allow_list.contains(&asset.public_key.to_hex())); + } + for (asset, mined_height) in assets.clone() { + monitoring.add_if_unmonitored(asset.clone()); + monitoring.add_state(asset.public_key, mined_height, true); + } + let mut known_active_public_keys = assets.into_iter().map(|(asset, _)| asset.public_key); + let active_public_keys = monitoring + .get_active_public_keys() + .into_iter() + .cloned() + .collect::>(); + for public_key in active_public_keys { + if !known_active_public_keys.any(|pk| pk == public_key) { + // Active asset is not part of the newly known active assets, maybe there were no checkpoint for + // the asset. Are we still part of the committee? + if let (false, height) = base_node_client + .check_if_in_committee(public_key.clone(), node_identity.public_key().clone()) + .await + .unwrap() + { + // We are not part of the latest committee, set the state to false + monitoring.add_state(public_key.clone(), height, false) } } - info!(target: LOG_TARGET, "Adding asset '{}'", asset.public_key); + } + } + if tip.height_of_longest_chain > last_tip { + last_tip = tip.height_of_longest_chain; + monitoring.update_height(last_tip, |asset| { let node_identity = node_identity.as_ref().clone(); let mempool = mempool_service.clone(); let handles = handles.clone(); let subscription_factory = subscription_factory.clone(); let shutdown = shutdown.clone(); + // Create a kill signal for each asset + let kill = Arc::new(AtomicBool::new(false)); let dan_config = dan_config.clone(); let db_factory = db_factory.clone(); - tasks.insert( - asset.public_key.clone(), - task::spawn(DanNode::start_asset_worker( - asset, - node_identity, - mempool, - handles, - subscription_factory, - shutdown, - dan_config, - db_factory, - )), - ); - } + task::spawn(DanNode::start_asset_worker( + asset, + node_identity, + mempool, + handles, + subscription_factory, + shutdown, + dan_config, + db_factory, + kill.clone(), + )); + kill + }); } time::sleep(Duration::from_secs(120)).await; } } - // async fn start_asset_proxy(&self) -> Result<(), ExitCodes> { - // todo!() - // } - pub async fn start_asset_worker( asset_definition: AssetDefinition, node_identity: NodeIdentity, @@ -174,6 +190,7 @@ impl DanNode { shutdown: ShutdownSignal, config: ValidatorNodeConfig, db_factory: SqliteDbFactory, + kill: Arc, ) -> Result<(), ExitError> { let timeout = Duration::from_secs(asset_definition.phase_timeout); let committee = asset_definition @@ -235,11 +252,15 @@ impl DanNode { validator_node_client_factory, ); - if let Err(err) = consensus_worker.run(shutdown.clone(), None).await { + if let Err(err) = consensus_worker.run(shutdown.clone(), None, kill).await { error!(target: LOG_TARGET, "Consensus worker failed with error: {}", err); return Err(ExitError::new(ExitCode::UnknownError, err)); } Ok(()) } + + // async fn start_asset_proxy(&self) -> Result<(), ExitCodes> { + // todo!() + // } } diff --git a/applications/tari_validator_node/src/grpc/services/base_node_client.rs b/applications/tari_validator_node/src/grpc/services/base_node_client.rs index b86198aaeb..b31bc98874 100644 --- a/applications/tari_validator_node/src/grpc/services/base_node_client.rs +++ b/applications/tari_validator_node/src/grpc/services/base_node_client.rs @@ -93,17 +93,53 @@ impl BaseNodeClient for GrpcBaseNodeClient { let output = outputs .first() .map(|o| match o.features.clone().unwrap().try_into() { - Ok(f) => Ok(BaseLayerOutput { features: f }), + Ok(f) => Ok(BaseLayerOutput { + features: f, + height: o.mined_height, + }), Err(e) => Err(DigitalAssetError::ConversionError(e)), }) .transpose()?; Ok(output) } + async fn check_if_in_committee( + &mut self, + asset_public_key: PublicKey, + dan_node_public_key: PublicKey, + ) -> Result<(bool, u64), DigitalAssetError> { + let tip = self.get_tip_info().await?; + if let Some(checkpoint) = self + .get_current_checkpoint( + tip.height_of_longest_chain, + asset_public_key, + COMMITTEE_DEFINITION_ID.into(), + ) + .await? + { + if let Some(committee) = checkpoint.get_side_chain_committee() { + if committee.contains(&dan_node_public_key) { + // We know it's part of the committe at this height + // TODO: there could be a scenario where it was not part of the committe for one block (or more, + // depends on the config) + Ok((true, checkpoint.height)) + } else { + // We know it's no longer part of the committe at this height + // TODO: if the committe changes twice in short period of time, this will cause some glitches + Ok((false, checkpoint.height)) + } + } else { + Ok((false, 0)) + } + } else { + Ok((false, 0)) + } + } + async fn get_assets_for_dan_node( &mut self, dan_node_public_key: PublicKey, - ) -> Result, DigitalAssetError> { + ) -> Result, DigitalAssetError> { let inner = match self.inner.as_mut() { Some(i) => i, None => { @@ -114,7 +150,7 @@ impl BaseNodeClient for GrpcBaseNodeClient { // TODO: probably should use output mmr indexes here let request = grpc::ListAssetRegistrationsRequest { offset: 0, count: 100 }; let mut result = inner.list_asset_registrations(request).await?.into_inner(); - let mut assets: Vec = vec![]; + let mut assets: Vec<(AssetDefinition, u64)> = vec![]; let tip = self.get_tip_info().await?; while let Some(r) = result.message().await? { if let Ok(asset_public_key) = PublicKey::from_bytes(r.asset_public_key.as_bytes()) { @@ -133,20 +169,23 @@ impl BaseNodeClient for GrpcBaseNodeClient { "Node is on committee for asset : {}", asset_public_key ); let committee = committee.iter().map(Hex::to_hex).collect::>(); - assets.push(AssetDefinition { - committee, - public_key: asset_public_key, - template_parameters: r - .features - .unwrap() - .asset - .unwrap() - .template_parameters - .into_iter() - .map(|tp| tp.into()) - .collect(), - ..Default::default() - }); + assets.push(( + AssetDefinition { + committee, + public_key: asset_public_key, + template_parameters: r + .features + .unwrap() + .asset + .unwrap() + .template_parameters + .into_iter() + .map(|tp| tp.into()) + .collect(), + ..Default::default() + }, + checkpoint.height, + )); } } } @@ -172,10 +211,14 @@ impl BaseNodeClient for GrpcBaseNodeClient { }; let output = conn.get_asset_metadata(req).await.unwrap().into_inner(); + let mined_height = output.mined_height; let output = output .features .map(|features| match features.try_into() { - Ok(f) => Ok(BaseLayerOutput { features: f }), + Ok(f) => Ok(BaseLayerOutput { + features: f, + height: mined_height, + }), Err(e) => Err(DigitalAssetError::ConversionError(e)), }) .transpose()?; diff --git a/applications/tari_validator_node/src/main.rs b/applications/tari_validator_node/src/main.rs index f6c5ac8b22..27d143880a 100644 --- a/applications/tari_validator_node/src/main.rs +++ b/applications/tari_validator_node/src/main.rs @@ -21,11 +21,13 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #![allow(clippy::too_many_arguments)] +mod asset; mod cmd_args; mod comms; mod dan_node; mod default_service_specification; mod grpc; +mod monitoring; mod p2p; use std::{ diff --git a/applications/tari_validator_node/src/monitoring.rs b/applications/tari_validator_node/src/monitoring.rs new file mode 100644 index 0000000000..aef8656953 --- /dev/null +++ b/applications/tari_validator_node/src/monitoring.rs @@ -0,0 +1,76 @@ +// Copyright 2022. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::{ + collections::HashMap, + sync::{atomic::AtomicBool, Arc}, +}; + +use tari_common_types::types::PublicKey; +use tari_dan_core::models::AssetDefinition; + +use crate::asset::Asset; + +#[derive(Debug)] +pub struct Monitoring { + committee_management_confirmation_time: u64, + assets: HashMap, +} + +impl Monitoring { + pub fn new(committee_management_confirmation_time: u64) -> Self { + Self { + committee_management_confirmation_time, + assets: HashMap::new(), + } + } + + pub fn add_if_unmonitored(&mut self, asset: AssetDefinition) { + if !self.assets.contains_key(&asset.public_key) { + self.assets.insert(asset.public_key.clone(), Asset::new(asset)); + } + } + + pub fn add_state(&mut self, asset_public_key: PublicKey, height: u64, involvement: bool) { + // Add committee_management_confirmation_time to the mined height = effective height + self.assets + .get_mut(&asset_public_key) + .unwrap() + .add_state(height + self.committee_management_confirmation_time, involvement); + } + + pub fn update_height(&mut self, height: u64, start: Fstart) + where Fstart: Fn(AssetDefinition) -> Arc { + for proc in self.assets.values_mut() { + proc.update_height(height, start.clone()); + } + self.assets.retain(|_, proc| proc.should_monitor()) + } + + // Get active public keys, so we can check if we are still part of the committee + pub fn get_active_public_keys(&self) -> Vec<&PublicKey> { + self.assets + .keys() + .filter(|&a| self.assets.get(a).unwrap().is_committee_member()) + .collect() + } +} diff --git a/base_layer/core/src/base_node/comms_interface/comms_response.rs b/base_layer/core/src/base_node/comms_interface/comms_response.rs index 2089ac2155..a9d5303923 100644 --- a/base_layer/core/src/base_node/comms_interface/comms_response.rs +++ b/base_layer/core/src/base_node/comms_interface/comms_response.rs @@ -57,7 +57,7 @@ pub enum NodeCommsResponse { FetchHeadersAfterResponse(Vec), MmrNodes(Vec, Vec), FetchTokensResponse { - outputs: Vec, + outputs: Vec<(TransactionOutput, u64)>, }, FetchAssetRegistrationsResponse { outputs: Vec, diff --git a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs index 7821833c61..fdf58995af 100644 --- a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs +++ b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs @@ -432,11 +432,12 @@ where B: BlockchainBackend + 'static .fetch_all_unspent_by_parent_public_key(asset_public_key.clone(), 0..1000) .await? { + let mined_height = output.mined_height; match output.output { PrunedOutput::Pruned { .. } => { // TODO: should we return this? }, - PrunedOutput::NotPruned { output } => outputs.push(output), + PrunedOutput::NotPruned { output } => outputs.push((output, mined_height)), } } } else { @@ -450,7 +451,7 @@ where B: BlockchainBackend + 'static PrunedOutput::Pruned { .. } => { // TODO: should we return this? }, - PrunedOutput::NotPruned { output } => outputs.push(output), + PrunedOutput::NotPruned { output } => outputs.push((output, out.mined_height)), } } } diff --git a/base_layer/core/src/base_node/comms_interface/local_interface.rs b/base_layer/core/src/base_node/comms_interface/local_interface.rs index 620159bc36..04a383656c 100644 --- a/base_layer/core/src/base_node/comms_interface/local_interface.rs +++ b/base_layer/core/src/base_node/comms_interface/local_interface.rs @@ -277,7 +277,7 @@ impl LocalNodeCommsInterface { &mut self, asset_public_key: PublicKey, unique_ids: Vec>, - ) -> Result, CommsInterfaceError> { + ) -> Result, CommsInterfaceError> { match self .request_sender .call(NodeCommsRequest::FetchTokens { diff --git a/base_layer/wallet/src/assets/asset_manager.rs b/base_layer/wallet/src/assets/asset_manager.rs index 2cceb11e7d..4c99530845 100644 --- a/base_layer/wallet/src/assets/asset_manager.rs +++ b/base_layer/wallet/src/assets/asset_manager.rs @@ -266,12 +266,7 @@ impl AssetManager { let (tx_id, transaction) = self .output_manager - .create_send_to_self_with_output( - vec![output], - ASSET_FPG.into(), - Some(COMMITTEE_DEFINITION_ID.into()), - Some(asset_public_key), - ) + .create_send_to_self_with_output(vec![output], ASSET_FPG.into(), None, None) .await?; Ok((tx_id, transaction)) diff --git a/common/config/presets/validator_node.toml b/common/config/presets/validator_node.toml index 735cd0def8..832122e2ff 100644 --- a/common/config/presets/validator_node.toml +++ b/common/config/presets/validator_node.toml @@ -16,3 +16,6 @@ scan_for_assets = true new_asset_scanning_interval = 10 # If set then only the specific assets will be checked. # assets_allow_list = [""] + +committee_management_polling_interval = 5 +committee_management_confirmation_time = 20 \ No newline at end of file diff --git a/common/src/configuration/validator_node_config.rs b/common/src/configuration/validator_node_config.rs index 83fc31c162..d311f804eb 100644 --- a/common/src/configuration/validator_node_config.rs +++ b/common/src/configuration/validator_node_config.rs @@ -46,6 +46,8 @@ pub struct ValidatorNodeConfig { #[serde(default = "default_asset_scanning_interval")] pub new_asset_scanning_interval: u64, pub assets_allow_list: Option>, + pub committee_management_polling_interval: u64, + pub committee_management_confirmation_time: u64, } fn default_true() -> bool { diff --git a/dan_layer/core/src/models/base_layer_output.rs b/dan_layer/core/src/models/base_layer_output.rs index 93c9d607dd..68f3ced298 100644 --- a/dan_layer/core/src/models/base_layer_output.rs +++ b/dan_layer/core/src/models/base_layer_output.rs @@ -31,6 +31,7 @@ use crate::{fixed_hash::FixedHash, models::ModelError}; #[derive(Debug)] pub struct BaseLayerOutput { pub features: OutputFeatures, + pub height: u64, } impl BaseLayerOutput { diff --git a/dan_layer/core/src/services/base_node_client.rs b/dan_layer/core/src/services/base_node_client.rs index 2afbacab4d..c6fae1207e 100644 --- a/dan_layer/core/src/services/base_node_client.rs +++ b/dan_layer/core/src/services/base_node_client.rs @@ -39,10 +39,16 @@ pub trait BaseNodeClient { checkpoint_unique_id: Vec, ) -> Result, DigitalAssetError>; + async fn check_if_in_committee( + &mut self, + asset_public_key: PublicKey, + dan_node_public_key: PublicKey, + ) -> Result<(bool, u64), DigitalAssetError>; + async fn get_assets_for_dan_node( &mut self, dan_node_public_key: PublicKey, - ) -> Result, DigitalAssetError>; + ) -> Result, DigitalAssetError>; async fn get_asset_registration( &mut self, diff --git a/dan_layer/core/src/services/mocks/mod.rs b/dan_layer/core/src/services/mocks/mod.rs index d18914c4e0..7f8e95d53f 100644 --- a/dan_layer/core/src/services/mocks/mod.rs +++ b/dan_layer/core/src/services/mocks/mod.rs @@ -199,10 +199,18 @@ impl BaseNodeClient for MockBaseNodeClient { todo!(); } + async fn check_if_in_committee( + &mut self, + _asset_public_key: PublicKey, + _dan_node_public_key: PublicKey, + ) -> Result<(bool, u64), DigitalAssetError> { + todo!(); + } + async fn get_assets_for_dan_node( &mut self, _dan_node_public_key: PublicKey, - ) -> Result, DigitalAssetError> { + ) -> Result, DigitalAssetError> { todo!(); } diff --git a/dan_layer/core/src/workers/consensus_worker.rs b/dan_layer/core/src/workers/consensus_worker.rs index a626d241d8..710b1af8fa 100644 --- a/dan_layer/core/src/workers/consensus_worker.rs +++ b/dan_layer/core/src/workers/consensus_worker.rs @@ -20,6 +20,11 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; + use log::*; use tari_common_types::types::PublicKey; use tari_shutdown::ShutdownSignal; @@ -115,6 +120,7 @@ impl> ConsensusWorker, + stop: Arc, ) -> Result<(), DigitalAssetError> { let chain_db = self .db_factory @@ -128,7 +134,7 @@ impl> ConsensusWorker