Skip to content

Commit

Permalink
feat: scan base node for constitutions (#4144)
Browse files Browse the repository at this point in the history
Description
---
Check for constitutions that the VN is part of.

Motivation and Context
---
For a VN to become part of a committee or help manage constitutions it must be able to find relevant constitutions it is a member, or proposed member of.

How Has This Been Tested?
---
It hasn't but it will be, and if this text is still here then maybe don't merge it yet.
  • Loading branch information
brianp authored May 27, 2022
1 parent b4991a4 commit 310a2d2
Show file tree
Hide file tree
Showing 19 changed files with 232 additions and 15 deletions.
7 changes: 7 additions & 0 deletions applications/tari_app_grpc/proto/base_node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ service BaseNode {
rpc GetTokens(GetTokensRequest) returns (stream GetTokensResponse);
rpc ListAssetRegistrations(ListAssetRegistrationsRequest) returns (stream ListAssetRegistrationsResponse);
rpc GetAssetMetadata(GetAssetMetadataRequest) returns (GetAssetMetadataResponse);

// Get all constitutions where the public key is in the committee
rpc GetConstitutions(GetConstitutionsRequest) returns (stream TransactionOutput);
}

message GetAssetMetadataRequest {
Expand Down Expand Up @@ -441,3 +444,7 @@ message MempoolStatsResponse {
uint64 reorg_txs = 3;
uint64 total_weight = 4;
}

message GetConstitutionsRequest {
bytes dan_node_public_key = 1;
}
4 changes: 4 additions & 0 deletions applications/tari_app_grpc/proto/validator_node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ service ValidatorNode {
// rpc ExecuteInstruction(ExecuteInstructionRequest) returns (ExecuteInstructionResponse);
rpc InvokeReadMethod(InvokeReadMethodRequest) returns (InvokeReadMethodResponse);
rpc InvokeMethod(InvokeMethodRequest) returns (InvokeMethodResponse);
rpc GetCommitteeRequests(GetCommitteeRequestsRequest) returns (stream TransactionOutput);
}

message GetCommitteeRequestsRequest {
// empty
}

message GetMetadataRequest {
// empty
Expand Down
64 changes: 64 additions & 0 deletions applications/tari_base_node/src/grpc/base_node_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ pub async fn get_heights(
impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
type FetchMatchingUtxosStream = mpsc::Receiver<Result<tari_rpc::FetchMatchingUtxosResponse, Status>>;
type GetBlocksStream = mpsc::Receiver<Result<tari_rpc::HistoricalBlock, Status>>;
type GetConstitutionsStream = mpsc::Receiver<Result<tari_rpc::TransactionOutput, Status>>;
type GetMempoolTransactionsStream = mpsc::Receiver<Result<tari_rpc::GetMempoolTransactionsResponse, Status>>;
type GetNetworkDifficultyStream = mpsc::Receiver<Result<tari_rpc::NetworkDifficultyResponse, Status>>;
type GetPeersStream = mpsc::Receiver<Result<tari_rpc::GetPeersResponse, Status>>;
Expand Down Expand Up @@ -1825,6 +1826,69 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {

Ok(Response::new(response))
}

async fn get_constitutions(
&self,
request: Request<tari_rpc::GetConstitutionsRequest>,
) -> Result<Response<Self::GetConstitutionsStream>, Status> {
let report_error_flag = self.report_error_flag();
let request = request.into_inner();
let dan_node_public_key = PublicKey::from_bytes(&request.dan_node_public_key).map_err(|err| {
report_error(
report_error_flag,
Status::invalid_argument(format!("Dan node public key is not a valid public key:{}", err)),
)
})?;

let mut handler = self.node_service.clone();
let (mut sender, receiver) = mpsc::channel(50);
task::spawn(async move {
let dan_node_public_key_hex = dan_node_public_key.to_hex();
debug!(
target: LOG_TARGET,
"Starting thread to process GetConstitutions: dan_node_public_key: {}", dan_node_public_key_hex,
);
let constitutions = match handler.get_constitutions(dan_node_public_key).await {
Ok(constitutions) => constitutions,
Err(err) => {
warn!(target: LOG_TARGET, "Error communicating with base node: {:?}", err,);
let _get_token_response =
sender.send(Err(report_error(report_error_flag, Status::internal("Internal error"))));
return;
},
};

debug!(
target: LOG_TARGET,
"Found {} constitutions for {}",
constitutions.len(),
dan_node_public_key_hex
);

for constitution in constitutions {
match sender.send(Ok(constitution.into())).await {
Ok(_) => (),
Err(err) => {
warn!(target: LOG_TARGET, "Error sending constitution via GRPC: {}", err);
match sender
.send(Err(report_error(
report_error_flag,
Status::unknown("Error sending data"),
)))
.await
{
Ok(_) => (),
Err(send_err) => {
warn!(target: LOG_TARGET, "Error sending error to GRPC client: {}", send_err)
},
}
return;
},
}
}
});
Ok(Response::new(receiver))
}
}

enum BlockGroupType {
Expand Down
2 changes: 2 additions & 0 deletions applications/tari_validator_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub struct ValidatorNodeConfig {
pub p2p: P2pConfig,
pub committee_management_polling_interval: u64,
pub committee_management_confirmation_time: u64,
pub grpc_address: Option<Multiaddr>,
}

impl ValidatorNodeConfig {
Expand Down Expand Up @@ -98,6 +99,7 @@ impl Default for ValidatorNodeConfig {
committee_management_confirmation_time: 10,
committee_management_polling_interval: 5,
p2p: P2pConfig::default(),
grpc_address: Some("/ip4/127.0.0.1/tcp/18144".parse().unwrap()),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl DanNode {
let tip = base_node_client
.get_tip_info()
.await
.map_err(|e| ExitError::new(ExitCode::DigitalAssetError, e))?;
.map_err(|e| ExitError::new(ExitCode::DigitalAssetError, &e))?;
if tip.height_of_longest_chain >= next_scanned_height {
info!(
target: LOG_TARGET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use async_trait::async_trait;
use log::*;
use tari_app_grpc::tari_rpc as grpc;
use tari_common_types::types::{PublicKey, COMMITTEE_DEFINITION_ID};
use tari_core::transactions::transaction_components::TransactionOutput;
use tari_crypto::tari_utilities::{hex::Hex, ByteArray};
use tari_dan_core::{
models::{AssetDefinition, BaseLayerMetadata, BaseLayerOutput},
Expand Down Expand Up @@ -99,6 +100,22 @@ impl BaseNodeClient for GrpcBaseNodeClient {
Ok(output)
}

async fn check_for_constitutions_for_me(
&mut self,
dan_node_public_key: PublicKey,
) -> Result<Vec<TransactionOutput>, DigitalAssetError> {
let inner = self.connection().await?;
let request = grpc::GetConstitutionsRequest {
dan_node_public_key: dan_node_public_key.as_bytes().to_vec(),
};
let mut result = inner.get_constitutions(request).await?.into_inner();
let mut outputs = vec![];
while let Some(output) = result.message().await? {
outputs.push(output.try_into().map_err(DigitalAssetError::ConversionError)?);
}
Ok(outputs)
}

async fn check_if_in_committee(
&mut self,
asset_public_key: PublicKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
use std::convert::TryInto;
use std::{convert::TryInto, time::Duration};

use tari_app_grpc::tari_rpc as rpc;
use futures::channel::mpsc;
use tari_app_grpc::tari_rpc::{self as rpc, TransactionOutput};
use tari_common_types::types::PublicKey;
use tari_comms::NodeIdentity;
use tari_crypto::tari_utilities::ByteArray;
Expand All @@ -30,8 +31,11 @@ use tari_dan_core::{
services::{AssetProcessor, AssetProxy, ServiceSpecification},
storage::DbFactory,
};
use tokio::{task, time};
use tonic::{Request, Response, Status};

const _LOG_TARGET: &str = "tari::validator_node::grpc";

pub struct ValidatorNodeGrpcServer<TServiceSpecification: ServiceSpecification> {
node_identity: NodeIdentity,
db_factory: TServiceSpecification::DbFactory,
Expand Down Expand Up @@ -59,6 +63,27 @@ impl<TServiceSpecification: ServiceSpecification> ValidatorNodeGrpcServer<TServi
impl<TServiceSpecification: ServiceSpecification + 'static> rpc::validator_node_server::ValidatorNode
for ValidatorNodeGrpcServer<TServiceSpecification>
{
type GetCommitteeRequestsStream = mpsc::Receiver<Result<TransactionOutput, tonic::Status>>;

async fn get_committee_requests(
&self,
_request: tonic::Request<rpc::GetCommitteeRequestsRequest>,
) -> Result<Response<Self::GetCommitteeRequestsStream>, tonic::Status> {
let (mut _sender, receiver) = mpsc::channel(100);
task::spawn(async move {
let mut _test = 1u64;
loop {
let _ = time::sleep(Duration::from_secs(1)).await;
// if let Err(err) = sender.send(Ok(ContractConstitution { test })).await {
// info!(target: LOG_TARGET, "The request was aborted, {}", err);
// break;
// }
_test += 1;
}
});
Ok(Response::new(receiver))
}

async fn get_identity(
&self,
_request: tonic::Request<rpc::GetIdentityRequest>,
Expand Down Expand Up @@ -129,6 +154,7 @@ impl<TServiceSpecification: ServiceSpecification + 'static> rpc::validator_node_
&self,
request: Request<rpc::InvokeReadMethodRequest>,
) -> Result<Response<rpc::InvokeReadMethodResponse>, Status> {
println!("invoke_read_method grpc call");
println!("{:?}", request);
let request = request.into_inner();
let asset_public_key = PublicKey::from_bytes(&request.asset_public_key)
Expand Down
24 changes: 15 additions & 9 deletions applications/tari_validator_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@ mod grpc;
mod monitoring;
mod p2p;

use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
process,
sync::Arc,
};
use std::{process, sync::Arc};

use clap::Parser;
use futures::FutureExt;
Expand All @@ -47,7 +43,12 @@ use tari_common::{
exit_codes::{ExitCode, ExitError},
load_configuration,
};
use tari_comms::{peer_manager::PeerFeatures, NodeIdentity};
use tari_comms::{
multiaddr::Multiaddr,
peer_manager::PeerFeatures,
utils::multiaddr::multiaddr_to_socketaddr,
NodeIdentity,
};
use tari_comms_dht::Dht;
use tari_dan_core::services::{ConcreteAssetProcessor, ConcreteAssetProxy, MempoolServiceHandle, ServiceSpecification};
use tari_dan_storage_sqlite::SqliteDbFactory;
Expand Down Expand Up @@ -141,9 +142,10 @@ async fn run_node(config: &ApplicationConfig) -> Result<(), ExitError> {
asset_processor,
asset_proxy,
);
let grpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 18144);

task::spawn(run_grpc(grpc_server, grpc_addr, shutdown.to_signal()));
if let Some(address) = config.validator_node.grpc_address.clone() {
task::spawn(run_grpc(grpc_server, address, shutdown.to_signal()));
}
println!("🚀 Validator node started!");
println!("{}", node_identity);
run_dan_node(
Expand Down Expand Up @@ -190,11 +192,14 @@ async fn run_dan_node(

async fn run_grpc<TServiceSpecification: ServiceSpecification + 'static>(
grpc_server: ValidatorNodeGrpcServer<TServiceSpecification>,
grpc_address: SocketAddr,
grpc_address: Multiaddr,
shutdown_signal: ShutdownSignal,
) -> Result<(), anyhow::Error> {
println!("Starting GRPC on {}", grpc_address);
info!(target: LOG_TARGET, "Starting GRPC on {}", grpc_address);

let grpc_address = multiaddr_to_socketaddr(&grpc_address)?;

Server::builder()
.add_service(ValidatorNodeServer::new(grpc_server))
.serve_with_shutdown(grpc_address, shutdown_signal.map(|_| ()))
Expand All @@ -204,6 +209,7 @@ async fn run_grpc<TServiceSpecification: ServiceSpecification + 'static>(
err
})?;

println!("Stopping GRPC");
info!(target: LOG_TARGET, "Stopping GRPC");
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ pub enum NodeCommsRequest {
FetchMempoolTransactionsByExcessSigs {
excess_sigs: Vec<PrivateKey>,
},
FetchConstitutions {
dan_node_public_key: PublicKey,
},
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -119,6 +122,9 @@ impl Display for NodeCommsRequest {
FetchMempoolTransactionsByExcessSigs { .. } => {
write!(f, "FetchMempoolTransactionsByExcessSigs")
},
FetchConstitutions { .. } => {
write!(f, "FetchConstitutions")
},
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ pub enum NodeCommsResponse {
output: Box<Option<UtxoMinedInfo>>,
},
FetchMempoolTransactionsByExcessSigsResponse(FetchMempoolTransactionsResponse),
FetchConstitutionsResponse {
outputs: Vec<TransactionOutput>,
},
}

impl Display for NodeCommsResponse {
Expand Down Expand Up @@ -103,6 +106,7 @@ impl Display for NodeCommsResponse {
resp.transactions.len(),
resp.not_found.len()
),
FetchConstitutionsResponse { .. } => write!(f, "FetchConstitutionsResponse"),
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,12 @@ where B: BlockchainBackend + 'static
}
Ok(NodeCommsResponse::FetchTokensResponse { outputs })
},
NodeCommsRequest::FetchConstitutions { dan_node_public_key } => {
debug!(target: LOG_TARGET, "Starting fetch constitutions");
Ok(NodeCommsResponse::FetchConstitutionsResponse {
outputs: self.blockchain_db.fetch_all_constitutions(dan_node_public_key).await?,
})
},
NodeCommsRequest::FetchAssetRegistrations { range } => {
let top_level_pubkey = PublicKey::default();
#[allow(clippy::range_plus_one)]
Expand Down
14 changes: 14 additions & 0 deletions base_layer/core/src/base_node/comms_interface/local_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,4 +318,18 @@ impl LocalNodeCommsInterface {
_ => Err(CommsInterfaceError::UnexpectedApiResponse),
}
}

pub async fn get_constitutions(
&mut self,
dan_node_public_key: PublicKey,
) -> Result<Vec<TransactionOutput>, CommsInterfaceError> {
match self
.request_sender
.call(NodeCommsRequest::FetchConstitutions { dan_node_public_key })
.await??
{
NodeCommsResponse::FetchConstitutionsResponse { outputs } => Ok(outputs),
_ => Err(CommsInterfaceError::UnexpectedApiResponse),
}
}
}
2 changes: 2 additions & 0 deletions base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {

make_async_fn!(utxo_count() -> usize, "utxo_count");

make_async_fn!(fetch_all_constitutions(dan_node_public_key: PublicKey) -> Vec<TransactionOutput>, "fetch_all_constitutions");

//---------------------------------- Kernel --------------------------------------------//
make_async_fn!(fetch_kernel_by_excess_sig(excess_sig: Signature) -> Option<(TransactionKernel, HashOutput)>, "fetch_kernel_by_excess_sig");

Expand Down
7 changes: 6 additions & 1 deletion base_layer/core/src/chain_storage/blockchain_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
Reorg,
UtxoMinedInfo,
},
transactions::transaction_components::{TransactionInput, TransactionKernel},
transactions::transaction_components::{TransactionInput, TransactionKernel, TransactionOutput},
};

/// Identify behaviour for Blockchain database backends. Implementations must support `Send` and `Sync` so that
Expand Down Expand Up @@ -138,6 +138,11 @@ pub trait BlockchainBackend: Send + Sync {
range: Range<usize>,
) -> Result<Vec<UtxoMinedInfo>, ChainStorageError>;

fn fetch_all_constitutions(
&self,
dan_node_public_key: &PublicKey,
) -> Result<Vec<TransactionOutput>, ChainStorageError>;

/// Fetch all outputs in a block
fn fetch_outputs_in_block(&self, header_hash: &HashOutput) -> Result<Vec<PrunedOutput>, ChainStorageError>;

Expand Down
Loading

0 comments on commit 310a2d2

Please sign in to comment.