Skip to content

Commit

Permalink
feat: update committee from the committee definions TXs (#3911)
Browse files Browse the repository at this point in the history
Description
---
Update the validation node based on the committee definitions.
Fixes the bug that I was not able to create asset.

How Has This Been Tested?
---
Manually (but only adding, since we don't have committee change option). But should work fine.
  • Loading branch information
Cifko authored Apr 4, 2022
1 parent a7d8c27 commit 0b29c89
Show file tree
Hide file tree
Showing 17 changed files with 329 additions and 74 deletions.
4 changes: 2 additions & 2 deletions applications/tari_base_node/src/grpc/base_node_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
asset_pub_key_hex
);

for token in tokens {
for (token, mined_height) in tokens {
let features = match token.features.clone().try_into() {
Ok(f) => f,
Err(err) => {
Expand All @@ -465,7 +465,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
unique_id: token.features.unique_id.unwrap_or_default(),
owner_commitment: token.commitment.to_vec(),
mined_in_block: vec![],
mined_height: 0,
mined_height,
script: token.script.as_bytes(),
features: Some(features),
}))
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_collectibles/web-app/src/binding.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async function command_assets_get_registration(assetPublicKey) {

async function command_asset_create_initial_checkpoint(assetPublicKey) {
return await invoke("assets_create_initial_checkpoint", {
assetPublicKey,
assetPubKey: assetPublicKey,
});
}

Expand Down
91 changes: 91 additions & 0 deletions applications/tari_validator_node/src/asset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2022. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};

use tari_dan_core::models::AssetDefinition;

#[derive(Debug)]
pub struct Asset {
definition: AssetDefinition,
current_state: bool,
// Changes in the committee for this asset.
// Mined height of the change TXs, and the involvement in the committe (true = part of committee)
next_states: HashMap<u64, bool>,
kill_signal: Option<Arc<AtomicBool>>,
}

impl Asset {
pub fn new(definition: AssetDefinition) -> Self {
Self {
definition,
current_state: false,
next_states: HashMap::new(),
kill_signal: None,
}
}

pub fn update_height<Fstart>(&mut self, height: u64, start: Fstart)
where Fstart: Fn(AssetDefinition) -> Arc<AtomicBool> {
if let Some((&height, &involvement)) = self
.next_states
.iter()
.find(|(&mined_height, _)| mined_height <= height)
{
// State change
if self.current_state != involvement {
if involvement {
self.kill_signal = Some(start(self.definition.clone()));
} else {
// Switch on the kill signal for the asset to end processing
let stop = self.kill_signal.clone().unwrap();
stop.as_ref().store(true, Ordering::Relaxed);
self.kill_signal = None;
}
}
self.current_state = involvement;
// We have the current state set and we will keep only future updates
self.next_states
.retain(|&effective_height, _| effective_height > height);
// Monitor this asset if we are part of committee or there is a next state
}
}

// If we are part of committe or there is a next state then monitor this asset
pub fn should_monitor(&self) -> bool {
self.current_state || !self.next_states.is_empty()
}

pub fn add_state(&mut self, height: u64, involvement: bool) {
self.next_states.insert(height, involvement);
}

pub fn is_committee_member(&self) -> bool {
self.current_state
}
}
101 changes: 61 additions & 40 deletions applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{
sync::{atomic::AtomicBool, Arc},
time::Duration,
};

use log::*;
use tari_common::{
configuration::ValidatorNodeConfig,
exit_codes::{ExitCode, ExitError},
GlobalConfig,
};
use tari_common_types::types::PublicKey;
use tari_comms::{types::CommsPublicKey, NodeIdentity};
use tari_comms_dht::Dht;
use tari_crypto::tari_utilities::hex::Hex;
Expand Down Expand Up @@ -55,6 +59,7 @@ use tokio::{task, time};
use crate::{
default_service_specification::DefaultServiceSpecification,
grpc::services::{base_node_client::GrpcBaseNodeClient, wallet_client::GrpcWalletClient},
monitoring::Monitoring,
p2p::services::{
inbound_connection_service::TariCommsInboundConnectionService,
outbound_connection_service::TariCommsOutboundService,
Expand Down Expand Up @@ -89,9 +94,9 @@ impl DanNode {
.ok_or_else(|| ExitError::new(ExitCode::ConfigError, "Missing dan section"))?;

let mut base_node_client = GrpcBaseNodeClient::new(dan_config.base_node_grpc_address);
#[allow(clippy::mutable_key_type)]
let mut tasks = HashMap::new();
let mut next_scanned_height = 0u64;
let mut last_tip = 0u64;
let mut monitoring = Monitoring::new(dan_config.committee_management_confirmation_time);
loop {
let tip = base_node_client
.get_tip_info()
Expand All @@ -103,12 +108,13 @@ impl DanNode {
"Scanning base layer (tip : {}) for new assets", tip.height_of_longest_chain
);
if dan_config.scan_for_assets {
next_scanned_height = tip.height_of_longest_chain + dan_config.new_asset_scanning_interval;
next_scanned_height =
tip.height_of_longest_chain + dan_config.committee_management_polling_interval;
info!(target: LOG_TARGET, "Next scanning height {}", next_scanned_height);
} else {
next_scanned_height = u64::MAX; // Never run again.
}
let assets = base_node_client
let mut assets = base_node_client
.get_assets_for_dan_node(node_identity.public_key().clone())
.await
.map_err(|e| ExitError::new(ExitCode::DigitalAssetError, e))?;
Expand All @@ -117,54 +123,64 @@ impl DanNode {
"Base node returned {} asset(s) to process",
assets.len()
);
for asset in assets {
if tasks.contains_key(&asset.public_key) {
debug!(
target: LOG_TARGET,
"Asset task already running for asset '{}'", asset.public_key
);
continue;
}
if let Some(allow_list) = &dan_config.assets_allow_list {
if !allow_list.contains(&asset.public_key.to_hex()) {
debug!(
target: LOG_TARGET,
"Asset '{}' is not allowlisted for processing ", asset.public_key
);
continue;
if let Some(allow_list) = &dan_config.assets_allow_list {
assets.retain(|(asset, _)| allow_list.contains(&asset.public_key.to_hex()));
}
for (asset, mined_height) in assets.clone() {
monitoring.add_if_unmonitored(asset.clone());
monitoring.add_state(asset.public_key, mined_height, true);
}
let mut known_active_public_keys = assets.into_iter().map(|(asset, _)| asset.public_key);
let active_public_keys = monitoring
.get_active_public_keys()
.into_iter()
.cloned()
.collect::<Vec<PublicKey>>();
for public_key in active_public_keys {
if !known_active_public_keys.any(|pk| pk == public_key) {
// Active asset is not part of the newly known active assets, maybe there were no checkpoint for
// the asset. Are we still part of the committee?
if let (false, height) = base_node_client
.check_if_in_committee(public_key.clone(), node_identity.public_key().clone())
.await
.unwrap()
{
// We are not part of the latest committee, set the state to false
monitoring.add_state(public_key.clone(), height, false)
}
}
info!(target: LOG_TARGET, "Adding asset '{}'", asset.public_key);
}
}
if tip.height_of_longest_chain > last_tip {
last_tip = tip.height_of_longest_chain;
monitoring.update_height(last_tip, |asset| {
let node_identity = node_identity.as_ref().clone();
let mempool = mempool_service.clone();
let handles = handles.clone();
let subscription_factory = subscription_factory.clone();
let shutdown = shutdown.clone();
// Create a kill signal for each asset
let kill = Arc::new(AtomicBool::new(false));
let dan_config = dan_config.clone();
let db_factory = db_factory.clone();
tasks.insert(
asset.public_key.clone(),
task::spawn(DanNode::start_asset_worker(
asset,
node_identity,
mempool,
handles,
subscription_factory,
shutdown,
dan_config,
db_factory,
)),
);
}
task::spawn(DanNode::start_asset_worker(
asset,
node_identity,
mempool,
handles,
subscription_factory,
shutdown,
dan_config,
db_factory,
kill.clone(),
));
kill
});
}
time::sleep(Duration::from_secs(120)).await;
}
}

// async fn start_asset_proxy(&self) -> Result<(), ExitCodes> {
// todo!()
// }

pub async fn start_asset_worker(
asset_definition: AssetDefinition,
node_identity: NodeIdentity,
Expand All @@ -174,6 +190,7 @@ impl DanNode {
shutdown: ShutdownSignal,
config: ValidatorNodeConfig,
db_factory: SqliteDbFactory,
kill: Arc<AtomicBool>,
) -> Result<(), ExitError> {
let timeout = Duration::from_secs(asset_definition.phase_timeout);
let committee = asset_definition
Expand Down Expand Up @@ -235,11 +252,15 @@ impl DanNode {
validator_node_client_factory,
);

if let Err(err) = consensus_worker.run(shutdown.clone(), None).await {
if let Err(err) = consensus_worker.run(shutdown.clone(), None, kill).await {
error!(target: LOG_TARGET, "Consensus worker failed with error: {}", err);
return Err(ExitError::new(ExitCode::UnknownError, err));
}

Ok(())
}

// async fn start_asset_proxy(&self) -> Result<(), ExitCodes> {
// todo!()
// }
}
Loading

0 comments on commit 0b29c89

Please sign in to comment.