Skip to content

Commit

Permalink
feat(validator_node): add global db (#4210)
Browse files Browse the repository at this point in the history
Description
---
This PR intorduces a new global database for the validator node. This is so it can store relevant state to help process contracts such as:
- the constitutions it's a part of so it can query acceptances
- the last scanned block hash so we don't always scan from genesis

Motivation and Context
---
Everytime the VN would scan for constitutions it would do so from the genesis block. It would then re-accept the same constitutions on every loop. We had no way to keep track of already scanned block, or what constitutions we're a part of. Long term we needed a solution and not a bandaid so we introduced a global state db.

How Has This Been Tested?
---
Manually. Ensuring the VN no longer eager accepts constitutions non-stop. (not actually tested yet).

* feat(blockchain-db): add {block_hash, output_type, output_hash} index to contract_index

* docs(chain_storage/lmdb-db): document mapping for each lmdb database

* feat(chain-storage): replace fetch_all_constitutions with fetch_contract_outputs_for_block

* test(chain-storage): add unit tests for fetch_contract_outputs_for_block

* feat(base-node-service): replace get_constitutions with get_contract_outputs_for_block

* feat(base-node-grpc): use get_contract_outputs_for_block to fetch constitutions

* test(cucumber): mark `Publish contract acceptance` as broken

* test(cucumber):           fix command to publish contract utxos

* fix(chain-storage): fetch_chain_headers should return empty vec if query is out of range

* Introduce the VN global db

This db is for the validator node to store global state information and
data it needs to process information. For example:
 - the constitutions it's a part of so it can query acceptances
 - the last scanned block hash so we don't always scan from genesis

* Intro the db to the VN

* Fix default relative path

* Use the latest block hash when scanning

* Refactor

* Update the defaults

* Ensure we commit the transaction to save the last hash

* Dont include the last scanned block in the next batch

Co-authored-by: Stanimal <sdbondi@users.noreply.github.com>
  • Loading branch information
brianp and sdbondi authored Jun 20, 2022
1 parent 0fcde31 commit 3965267
Show file tree
Hide file tree
Showing 22 changed files with 460 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1873,7 +1873,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {

loop {
let headers = match node_service
.get_headers(current_height..=current_height + BATCH_SIZE)
.get_headers((current_height + 1)..=current_height + BATCH_SIZE)
.await
{
Ok(h) => h,
Expand Down
6 changes: 3 additions & 3 deletions applications/tari_validator_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ impl Default for ValidatorNodeConfig {
scan_for_assets: true,
new_asset_scanning_interval: 10,
assets_allow_list: None,
data_dir: PathBuf::from("/data/validator_node"),
data_dir: PathBuf::from("data/validator_node"),
constitution_auto_accept: false,
constitution_management_confirmation_time: 10,
constitution_management_polling_interval: 5,
constitution_management_confirmation_time: 20,
constitution_management_polling_interval: 120,
p2p,
grpc_address: Some("/ip4/127.0.0.1/tcp/18144".parse().unwrap()),
}
Expand Down
76 changes: 43 additions & 33 deletions applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ use log::{error, info};
use tari_common::exit_codes::{ExitCode, ExitError};
use tari_common_types::types::Signature;
use tari_comms::NodeIdentity;
use tari_dan_core::services::{BaseNodeClient, WalletClient};
use tari_dan_core::{
services::{BaseNodeClient, WalletClient},
storage::global::GlobalDb,
};
use tari_dan_storage_sqlite::SqliteGlobalDbBackendAdapter;
use tokio::{task, time};

use crate::{
Expand All @@ -40,29 +44,30 @@ const _LOG_TARGET: &str = "tari::validator_node::app";
pub struct DanNode {
config: ValidatorNodeConfig,
identity: Arc<NodeIdentity>,
global_db: Arc<GlobalDb<SqliteGlobalDbBackendAdapter>>,
}

impl DanNode {
pub fn new(config: ValidatorNodeConfig, identity: Arc<NodeIdentity>) -> Self {
Self { config, identity }
pub fn new(
config: ValidatorNodeConfig,
identity: Arc<NodeIdentity>,
global_db: Arc<GlobalDb<SqliteGlobalDbBackendAdapter>>,
) -> Self {
Self {
config,
identity,
global_db,
}
}

pub async fn start(&self) -> Result<(), ExitError> {
let mut base_node_client = GrpcBaseNodeClient::new(self.config.base_node_grpc_address);
let mut last_tip = 0u64;
let base_node_client = GrpcBaseNodeClient::new(self.config.base_node_grpc_address);
let node = self.clone();

if self.config.constitution_auto_accept {
task::spawn(async move {
loop {
if let Ok(metadata) = base_node_client.get_tip_info().await {
last_tip = metadata.height_of_longest_chain;
}

match node
.find_and_accept_constitutions(base_node_client.clone(), last_tip)
.await
{
match node.find_and_accept_constitutions(base_node_client.clone()).await {
Ok(()) => info!("Contracts accepted"),
Err(e) => error!("Contracts not accepted because {:?}", e),
}
Expand All @@ -82,38 +87,43 @@ impl DanNode {
}
}

async fn find_and_accept_constitutions(
&self,
mut base_node_client: GrpcBaseNodeClient,
last_tip: u64,
) -> Result<(), ExitError> {
async fn find_and_accept_constitutions(&self, mut base_node_client: GrpcBaseNodeClient) -> Result<(), ExitError> {
let mut wallet_client = GrpcWalletClient::new(self.config.wallet_grpc_address);
let metadata_key_name = "last_scanned_constitution_hash".as_bytes();

let outputs = base_node_client
.get_constitutions(self.identity.public_key().clone())
let last_hash = match self.global_db.get_data(metadata_key_name) {
Ok(Some(h)) => h,
_ => vec![],
};

let (outputs, latest_hash) = base_node_client
.get_constitutions(self.identity.public_key().clone(), last_hash)
.await
.map_err(|e| ExitError::new(ExitCode::DigitalAssetError, &e))?;

let outputs_len = outputs.len();

for output in outputs {
if let Some(sidechain_features) = output.features.sidechain_features {
let contract_id = sidechain_features.contract_id;
// TODO: expect will crash the validator node if the base node misbehaves
let constitution = sidechain_features.constitution.expect("Constitution wasn't present");

if constitution.acceptance_requirements.acceptance_period_expiry < last_tip {
let signature = Signature::default();

match wallet_client
.submit_contract_acceptance(&contract_id, self.identity.public_key(), &signature)
.await
{
Ok(tx_id) => info!("Accepted with id={}", tx_id),
Err(_) => error!("Did not accept the contract acceptance"),
};
let signature = Signature::default();

match wallet_client
.submit_contract_acceptance(&contract_id, self.identity.public_key(), &signature)
.await
{
Ok(tx_id) => info!("Accepted with id={}", tx_id),
Err(_) => error!("Did not accept the contract acceptance"),
};
}
}

if outputs_len > 0 {
self.global_db
.set_data(metadata_key_name, &latest_hash)
.map_err(|e| ExitError::new(ExitCode::DatabaseError, e))?;
}

Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::{convert::TryInto, net::SocketAddr};
use async_trait::async_trait;
use log::*;
use tari_app_grpc::tari_rpc as grpc;
use tari_common_types::types::{PublicKey, COMMITTEE_DEFINITION_ID};
use tari_common_types::types::{BlockHash, PublicKey, COMMITTEE_DEFINITION_ID};
use tari_core::transactions::transaction_components::TransactionOutput;
use tari_crypto::tari_utilities::{hex::Hex, ByteArray};
use tari_dan_core::{
Expand Down Expand Up @@ -103,22 +103,25 @@ impl BaseNodeClient for GrpcBaseNodeClient {
async fn get_constitutions(
&mut self,
dan_node_public_key: PublicKey,
) -> Result<Vec<TransactionOutput>, DigitalAssetError> {
block_hash: Vec<u8>,
) -> Result<(Vec<TransactionOutput>, BlockHash), DigitalAssetError> {
let inner = self.connection().await?;
let request = grpc::GetConstitutionsRequest {
// TODO: pass in the last block hash that was scanned
start_block_hash: vec![],
start_block_hash: block_hash,
dan_node_public_key: dan_node_public_key.as_bytes().to_vec(),
};
let mut result = inner.get_constitutions(request).await?.into_inner();
let mut outputs = vec![];
let mut last_hash: BlockHash = vec![];
while let Some(mined_info) = result.message().await? {
let output = mined_info
.output
.ok_or_else(|| DigitalAssetError::InvalidPeerMessage("Mined info contained no output".to_string()))?;
outputs.push(output.try_into().map_err(DigitalAssetError::ConversionError)?);
last_hash = mined_info.header_hash;
}
Ok(outputs)
Ok((outputs, last_hash))
}

async fn check_if_in_committee(
Expand Down
25 changes: 20 additions & 5 deletions applications/tari_validator_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,11 @@ use tari_comms::{
NodeIdentity,
};
use tari_comms_dht::Dht;
use tari_dan_core::services::{ConcreteAssetProcessor, ConcreteAssetProxy, MempoolServiceHandle, ServiceSpecification};
use tari_dan_storage_sqlite::SqliteDbFactory;
use tari_dan_core::{
services::{ConcreteAssetProcessor, ConcreteAssetProxy, MempoolServiceHandle, ServiceSpecification},
storage::{global::GlobalDb, DbFactory},
};
use tari_dan_storage_sqlite::{SqliteDbFactory, SqliteGlobalDbBackendAdapter};
use tari_shutdown::{Shutdown, ShutdownSignal};
use tokio::{runtime, runtime::Runtime, task};
use tonic::transport::Server;
Expand Down Expand Up @@ -112,6 +115,9 @@ async fn run_node(config: &ApplicationConfig) -> Result<(), ExitError> {
PeerFeatures::NONE,
)?;
let db_factory = SqliteDbFactory::new(config.validator_node.data_dir.clone());
let global_db = db_factory
.get_or_create_global_db()
.map_err(|e| ExitError::new(ExitCode::DatabaseError, e))?;
let mempool_service = MempoolServiceHandle::default();

info!(
Expand Down Expand Up @@ -158,7 +164,12 @@ async fn run_node(config: &ApplicationConfig) -> Result<(), ExitError> {
println!("🚀 Validator node started!");
println!("{}", node_identity);

run_dan_node(config.validator_node.clone(), node_identity.clone()).await?;
run_dan_node(
config.validator_node.clone(),
node_identity.clone(),
Arc::new(global_db),
)
.await?;

Ok(())
}
Expand All @@ -171,8 +182,12 @@ fn build_runtime() -> Result<Runtime, ExitError> {
.map_err(|e| ExitError::new(ExitCode::UnknownError, e))
}

async fn run_dan_node(config: ValidatorNodeConfig, node_identity: Arc<NodeIdentity>) -> Result<(), ExitError> {
let node = DanNode::new(config, node_identity);
async fn run_dan_node(
config: ValidatorNodeConfig,
node_identity: Arc<NodeIdentity>,
global_db: Arc<GlobalDb<SqliteGlobalDbBackendAdapter>>,
) -> Result<(), ExitError> {
let node = DanNode::new(config, node_identity, global_db);
node.start().await
}

Expand Down
5 changes: 3 additions & 2 deletions dan_layer/core/src/services/base_node_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use async_trait::async_trait;
use tari_common_types::types::PublicKey;
use tari_common_types::types::{BlockHash, PublicKey};
use tari_core::transactions::transaction_components::TransactionOutput;

use crate::{
Expand All @@ -43,7 +43,8 @@ pub trait BaseNodeClient: Send + Sync {
async fn get_constitutions(
&mut self,
dan_node_public_key: PublicKey,
) -> Result<Vec<TransactionOutput>, DigitalAssetError>;
block_hash: Vec<u8>,
) -> Result<(Vec<TransactionOutput>, BlockHash), DigitalAssetError>;

async fn check_if_in_committee(
&mut self,
Expand Down
5 changes: 3 additions & 2 deletions dan_layer/core/src/services/mocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::{
};

use async_trait::async_trait;
use tari_common_types::types::{FixedHash, PublicKey};
use tari_common_types::types::{BlockHash, FixedHash, PublicKey};
use tari_core::transactions::transaction_components::TransactionOutput;

use super::CommitteeManager;
Expand Down Expand Up @@ -224,7 +224,8 @@ impl BaseNodeClient for MockBaseNodeClient {
async fn get_constitutions(
&mut self,
_dan_node_public_key: PublicKey,
) -> Result<Vec<TransactionOutput>, DigitalAssetError> {
_block_hash: Vec<u8>,
) -> Result<(Vec<TransactionOutput>, BlockHash), DigitalAssetError> {
todo!();
}
}
Expand Down
4 changes: 4 additions & 0 deletions dan_layer/core/src/storage/db_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ use tari_common_types::types::PublicKey;

use crate::storage::{
chain::{ChainDb, ChainDbBackendAdapter},
global::{GlobalDb, GlobalDbBackendAdapter},
state::{StateDb, StateDbBackendAdapter},
StorageError,
};

pub trait DbFactory: Sync + Send + 'static {
type ChainDbBackendAdapter: ChainDbBackendAdapter;
type StateDbBackendAdapter: StateDbBackendAdapter;
type GlobalDbBackendAdapter: GlobalDbBackendAdapter;

fn get_chain_db(
&self,
Expand All @@ -51,4 +53,6 @@ pub trait DbFactory: Sync + Send + 'static {
&self,
asset_public_key: &PublicKey,
) -> Result<StateDb<Self::StateDbBackendAdapter>, StorageError>;

fn get_or_create_global_db(&self) -> Result<GlobalDb<Self::GlobalDbBackendAdapter>, StorageError>;
}
43 changes: 43 additions & 0 deletions dan_layer/core/src/storage/global/global_db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2022. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::storage::{global::GlobalDbBackendAdapter, StorageError};

pub struct GlobalDb<TGlobalDbBackendAdapter> {
adapter: TGlobalDbBackendAdapter,
}

impl<TGlobalDbBackendAdapter: GlobalDbBackendAdapter> GlobalDb<TGlobalDbBackendAdapter> {
pub fn new(adapter: TGlobalDbBackendAdapter) -> Self {
Self { adapter }
}

pub fn set_data(&self, key: &[u8], value: &[u8]) -> Result<(), StorageError> {
self.adapter
.set_data(key, value)
.map_err(TGlobalDbBackendAdapter::Error::into)
}

pub fn get_data(&self, key: &[u8]) -> Result<Option<Vec<u8>>, StorageError> {
self.adapter.get_data(key).map_err(TGlobalDbBackendAdapter::Error::into)
}
}
33 changes: 33 additions & 0 deletions dan_layer/core/src/storage/global/global_db_backend_adapter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2021. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::storage::StorageError;

pub trait GlobalDbBackendAdapter: Send + Sync + Clone {
type BackendTransaction;
type Error: Into<StorageError>;

fn create_transaction(&self) -> Result<Self::BackendTransaction, Self::Error>;
fn commit(&self, tx: &Self::BackendTransaction) -> Result<(), Self::Error>;
fn get_data(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
fn set_data(&self, key: &[u8], value: &[u8]) -> Result<(), Self::Error>;
}
26 changes: 26 additions & 0 deletions dan_layer/core/src/storage/global/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2022. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
mod global_db;
pub use global_db::GlobalDb;

mod global_db_backend_adapter;
pub use global_db_backend_adapter::GlobalDbBackendAdapter;
Loading

0 comments on commit 3965267

Please sign in to comment.