From 16e07a0b4ab9079f84645d8796a4fc6bb27f0303 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Thu, 7 Jul 2022 12:20:02 +0200 Subject: [PATCH] feat(dan_layer/core): track checkpoint number for each checkpoint submitted (#4268) Description --- - tracks the checkpoint number for a contract - adds general-purpose metadata table to chain db - adds `MetadataBackendAdapter` that specified interface for databases that support getting and setting metadata Motivation and Context --- This PR adds functionality to persist the checkpoint number each time a checkpoint is submitted so that sequential checkpoints are submitted. How Has This Been Tested? --- Manually - VN submits sequential checkpoints --- .../src/grpc/wallet_grpc_server.rs | 2 +- .../src/grpc/services/wallet_client.rs | 5 +- .../core/src/services/checkpoint_manager.rs | 36 ++-- .../src/services/service_specification.rs | 10 +- dan_layer/core/src/storage/atomic_db.rs | 32 ++++ dan_layer/core/src/storage/chain/chain_db.rs | 43 ++++- .../storage/chain/chain_db_backend_adapter.rs | 27 +-- .../core/src/storage/chain/metadata_key.rs | 44 +++++ dan_layer/core/src/storage/chain/mod.rs | 3 + .../src/storage/metadata_backend_adapter.rs | 47 +++++ dan_layer/core/src/storage/mocks/chain_db.rs | 85 +++++++-- dan_layer/core/src/storage/mocks/mod.rs | 3 +- dan_layer/core/src/storage/mod.rs | 4 + .../core/src/workers/consensus_worker.rs | 15 +- .../core/src/workers/states/commit_state.rs | 58 +++--- dan_layer/storage_sqlite/Cargo.toml | 12 +- .../down.sql | 1 + .../2022-07-05-131723_create_metadata/up.sql | 5 + dan_layer/storage_sqlite/src/error.rs | 2 + .../src/global/models/metadata.rs | 2 +- .../sqlite_global_db_backend_adapter.rs | 8 +- .../storage_sqlite/src/models/metadata.rs | 31 ++++ dan_layer/storage_sqlite/src/models/mod.rs | 1 + dan_layer/storage_sqlite/src/schema.rs | 40 +++-- .../src/sqlite_chain_backend_adapter.rs | 170 +++++++++++++----- 25 files changed, 511 insertions(+), 175 deletions(-) create mode 100644 dan_layer/core/src/storage/atomic_db.rs create mode 100644 dan_layer/core/src/storage/chain/metadata_key.rs create mode 100644 dan_layer/core/src/storage/metadata_backend_adapter.rs create mode 100644 dan_layer/storage_sqlite/migrations/2022-07-05-131723_create_metadata/down.sql create mode 100644 dan_layer/storage_sqlite/migrations/2022-07-05-131723_create_metadata/up.sql create mode 100644 dan_layer/storage_sqlite/src/models/metadata.rs diff --git a/applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs b/applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs index d8df91db53..0b12a95e7a 100644 --- a/applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs +++ b/applications/tari_console_wallet/src/grpc/wallet_grpc_server.rs @@ -884,7 +884,7 @@ impl wallet_server::Wallet for WalletGrpcServer { .await .map_err(|e| Status::internal(e.to_string()))?; - let message = format!("Sidechain state checkpoint for {}", contract_id); + let message = format!("Checkpoint #{} for {}", checkpoint_number, contract_id); transaction_service .submit_transaction(tx_id, transaction, 10.into(), message) .await diff --git a/applications/tari_validator_node/src/grpc/services/wallet_client.rs b/applications/tari_validator_node/src/grpc/services/wallet_client.rs index 4dd1c45710..8c8d6feb68 100644 --- a/applications/tari_validator_node/src/grpc/services/wallet_client.rs +++ b/applications/tari_validator_node/src/grpc/services/wallet_client.rs @@ -23,7 +23,6 @@ use std::net::SocketAddr; use async_trait::async_trait; -use log::*; use tari_app_grpc::{ tari_rpc as grpc, tari_rpc::{ @@ -39,7 +38,7 @@ use tari_crypto::tari_utilities::ByteArray; use tari_dan_core::{services::WalletClient, DigitalAssetError}; use tari_dan_engine::state::models::StateRoot; -const LOG_TARGET: &str = "tari::dan::wallet_grpc"; +const _LOG_TARGET: &str = "tari::dan::wallet_grpc"; type Inner = grpc::wallet_client::WalletClient; @@ -80,8 +79,6 @@ impl WalletClient for GrpcWalletClient { signatures: checkpoint_signatures.into_iter().map(Into::into).collect(), }; - info!(target: LOG_TARGET, "✅ Creating checkpoint #{}", checkpoint_number); - if checkpoint_number == 0 { let request = CreateInitialAssetCheckpointRequest { contract_id: contract_id.to_vec(), diff --git a/dan_layer/core/src/services/checkpoint_manager.rs b/dan_layer/core/src/services/checkpoint_manager.rs index d60a025d14..e4f1c308ae 100644 --- a/dan_layer/core/src/services/checkpoint_manager.rs +++ b/dan_layer/core/src/services/checkpoint_manager.rs @@ -33,6 +33,7 @@ const LOG_TARGET: &str = "tari::dan::checkpoint_manager"; pub trait CheckpointManager { async fn create_checkpoint( &mut self, + checkpoint_number: u64, state_root: StateRoot, signature: Vec, ) -> Result<(), DigitalAssetError>; @@ -42,8 +43,6 @@ pub trait CheckpointManager { pub struct ConcreteCheckpointManager { asset_definition: AssetDefinition, wallet: TWallet, - num_calls: u32, - checkpoint_interval: u32, } impl ConcreteCheckpointManager { @@ -51,8 +50,6 @@ impl ConcreteCheckpointManager { Self { asset_definition, wallet, - num_calls: 0, - checkpoint_interval: 100, } } } @@ -61,26 +58,23 @@ impl ConcreteCheckpointManager { impl CheckpointManager for ConcreteCheckpointManager { async fn create_checkpoint( &mut self, + checkpoint_number: u64, state_root: StateRoot, signatures: Vec, ) -> Result<(), DigitalAssetError> { - if self.num_calls == 0 || self.num_calls >= self.checkpoint_interval { - // TODO: fetch and increment checkpoint number - let checkpoint_number = u64::from(self.num_calls / self.checkpoint_interval); - info!( - target: LOG_TARGET, - "Creating checkpoint for contract {}", self.asset_definition.contract_id - ); - self.wallet - .create_new_checkpoint( - &self.asset_definition.contract_id, - &state_root, - checkpoint_number, - signatures, - ) - .await?; - } - self.num_calls += 1; + info!( + target: LOG_TARGET, + "✅ Creating checkpoint #{} for contract {}", checkpoint_number, self.asset_definition.contract_id + ); + + self.wallet + .create_new_checkpoint( + &self.asset_definition.contract_id, + &state_root, + checkpoint_number, + signatures, + ) + .await?; Ok(()) } } diff --git a/dan_layer/core/src/services/service_specification.rs b/dan_layer/core/src/services/service_specification.rs index 5c073ebbc8..7b6586769e 100644 --- a/dan_layer/core/src/services/service_specification.rs +++ b/dan_layer/core/src/services/service_specification.rs @@ -39,7 +39,13 @@ use crate::{ SigningService, ValidatorNodeClientFactory, }, - storage::{chain::ChainDbBackendAdapter, global::GlobalDbBackendAdapter, ChainStorageService, DbFactory}, + storage::{ + chain::{ChainDbBackendAdapter, ChainDbMetadataKey}, + global::GlobalDbBackendAdapter, + ChainStorageService, + DbFactory, + MetadataBackendAdapter, + }, }; /// A trait to describe a specific configuration of services. This type allows other services to @@ -50,7 +56,7 @@ pub trait ServiceSpecification: Default + Clone { type AssetProcessor: AssetProcessor + Clone; type AssetProxy: AssetProxy + Clone; type BaseNodeClient: BaseNodeClient + Clone; - type ChainDbBackendAdapter: ChainDbBackendAdapter; + type ChainDbBackendAdapter: ChainDbBackendAdapter + MetadataBackendAdapter; type ChainStorageService: ChainStorageService; type CheckpointManager: CheckpointManager; type CommitteeManager: CommitteeManager; diff --git a/dan_layer/core/src/storage/atomic_db.rs b/dan_layer/core/src/storage/atomic_db.rs new file mode 100644 index 0000000000..3e1023956b --- /dev/null +++ b/dan_layer/core/src/storage/atomic_db.rs @@ -0,0 +1,32 @@ +// Copyright 2022. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use crate::storage::StorageError; + +pub trait AtomicDb { + type Error: Into; + type DbTransaction; + + fn create_transaction(&self) -> Result; + + fn commit(&self, transaction: &Self::DbTransaction) -> Result<(), Self::Error>; +} diff --git a/dan_layer/core/src/storage/chain/chain_db.rs b/dan_layer/core/src/storage/chain/chain_db.rs index 59c85f9bfd..fc239faa82 100644 --- a/dan_layer/core/src/storage/chain/chain_db.rs +++ b/dan_layer/core/src/storage/chain/chain_db.rs @@ -24,12 +24,13 @@ use crate::{ models::{Node, QuorumCertificate, SideChainBlock, TreeNodeHash}, storage::{ - chain::{chain_db_unit_of_work::ChainDbUnitOfWorkImpl, ChainDbBackendAdapter}, + chain::{chain_db_unit_of_work::ChainDbUnitOfWorkImpl, ChainDbBackendAdapter, ChainDbMetadataKey}, + MetadataBackendAdapter, StorageError, }, }; -pub struct ChainDb { +pub struct ChainDb { adapter: TBackendAdapter, } @@ -107,6 +108,44 @@ impl ChainDb { } } +impl ChainDb +where TBackendAdapter: MetadataBackendAdapter +{ + pub fn get_current_checkpoint_number(&self) -> Result { + let tx = self + .adapter + .create_transaction() + .map_err(TBackendAdapter::Error::into)?; + let number = self + .adapter + .get_metadata(&ChainDbMetadataKey::CheckpointNumber, &tx) + .map_err(TBackendAdapter::Error::into)? + .unwrap_or(0); + Ok(number) + } + + /// Increments checkpoint number and returns the incremented value. If the key did not previously exist, it + /// is created and set to 1. + pub fn increment_checkpoint_number(&self) -> Result { + let tx = self + .adapter + .create_transaction() + .map_err(TBackendAdapter::Error::into)?; + const KEY: ChainDbMetadataKey = ChainDbMetadataKey::CheckpointNumber; + let n = self + .adapter + .get_metadata::(&KEY, &tx) + .map_err(TBackendAdapter::Error::into)? + .unwrap_or(0); + let next = n + 1; + self.adapter + .set_metadata(KEY, next, &tx) + .map_err(TBackendAdapter::Error::into)?; + self.adapter.commit(&tx).map_err(TBackendAdapter::Error::into)?; + Ok(next) + } +} + impl ChainDb { pub fn new_unit_of_work(&self) -> ChainDbUnitOfWorkImpl { ChainDbUnitOfWorkImpl::new(self.adapter.clone()) diff --git a/dan_layer/core/src/storage/chain/chain_db_backend_adapter.rs b/dan_layer/core/src/storage/chain/chain_db_backend_adapter.rs index 031a8c96c6..6b0104a1fd 100644 --- a/dan_layer/core/src/storage/chain/chain_db_backend_adapter.rs +++ b/dan_layer/core/src/storage/chain/chain_db_backend_adapter.rs @@ -26,33 +26,20 @@ use crate::{ models::{Payload, QuorumCertificate, TreeNodeHash}, storage::{ chain::{DbInstruction, DbNode, DbQc}, - StorageError, + AtomicDb, }, }; -pub trait ChainDbBackendAdapter: Send + Sync + Clone { - type BackendTransaction; - type Error: Into; +pub trait ChainDbBackendAdapter: AtomicDb + Send + Sync + Clone { type Id: Copy + Send + Sync + Debug + PartialEq; type Payload: Payload; fn is_empty(&self) -> Result; - fn create_transaction(&self) -> Result; fn node_exists(&self, node_hash: &TreeNodeHash) -> Result; fn get_tip_node(&self) -> Result, Self::Error>; - fn insert_node(&self, item: &DbNode, transaction: &Self::BackendTransaction) -> Result<(), Self::Error>; - fn update_node( - &self, - id: &Self::Id, - item: &DbNode, - transaction: &Self::BackendTransaction, - ) -> Result<(), Self::Error>; - fn insert_instruction( - &self, - item: &DbInstruction, - transaction: &Self::BackendTransaction, - ) -> Result<(), Self::Error>; - fn commit(&self, transaction: &Self::BackendTransaction) -> Result<(), Self::Error>; + fn insert_node(&self, item: &DbNode, transaction: &Self::DbTransaction) -> Result<(), Self::Error>; + fn update_node(&self, id: &Self::Id, item: &DbNode, transaction: &Self::DbTransaction) -> Result<(), Self::Error>; + fn insert_instruction(&self, item: &DbInstruction, transaction: &Self::DbTransaction) -> Result<(), Self::Error>; fn locked_qc_id(&self) -> Self::Id; fn prepare_qc_id(&self) -> Self::Id; fn find_highest_prepared_qc(&self) -> Result; @@ -61,6 +48,6 @@ pub trait ChainDbBackendAdapter: Send + Sync + Clone { fn find_node_by_hash(&self, node_hash: &TreeNodeHash) -> Result, Self::Error>; fn find_node_by_parent_hash(&self, parent_hash: &TreeNodeHash) -> Result, Self::Error>; fn find_all_instructions_by_node(&self, node_id: Self::Id) -> Result, Self::Error>; - fn update_prepare_qc(&self, item: &DbQc, transaction: &Self::BackendTransaction) -> Result<(), Self::Error>; - fn update_locked_qc(&self, locked_qc: &DbQc, transaction: &Self::BackendTransaction) -> Result<(), Self::Error>; + fn update_prepare_qc(&self, item: &DbQc, transaction: &Self::DbTransaction) -> Result<(), Self::Error>; + fn update_locked_qc(&self, locked_qc: &DbQc, transaction: &Self::DbTransaction) -> Result<(), Self::Error>; } diff --git a/dan_layer/core/src/storage/chain/metadata_key.rs b/dan_layer/core/src/storage/chain/metadata_key.rs new file mode 100644 index 0000000000..2bc40fba25 --- /dev/null +++ b/dan_layer/core/src/storage/chain/metadata_key.rs @@ -0,0 +1,44 @@ +// Copyright 2022. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::fmt::{Display, Formatter}; + +use crate::storage::metadata_backend_adapter::AsKeyBytes; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ChainDbMetadataKey { + CheckpointNumber, +} + +impl AsKeyBytes for ChainDbMetadataKey { + fn as_key_bytes(&self) -> &[u8] { + match self { + ChainDbMetadataKey::CheckpointNumber => b"checkpoint-number", + } + } +} + +impl Display for ChainDbMetadataKey { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", String::from_utf8_lossy(self.as_key_bytes())) + } +} diff --git a/dan_layer/core/src/storage/chain/mod.rs b/dan_layer/core/src/storage/chain/mod.rs index 9cd991c70d..c7eb62aca6 100644 --- a/dan_layer/core/src/storage/chain/mod.rs +++ b/dan_layer/core/src/storage/chain/mod.rs @@ -26,9 +26,12 @@ mod chain_db_unit_of_work; mod db_instruction; mod db_node; mod db_qc; +mod metadata_key; + pub use chain_db::ChainDb; pub use chain_db_backend_adapter::ChainDbBackendAdapter; pub use chain_db_unit_of_work::ChainDbUnitOfWork; pub use db_instruction::DbInstruction; pub use db_node::DbNode; pub use db_qc::DbQc; +pub use metadata_key::ChainDbMetadataKey; diff --git a/dan_layer/core/src/storage/metadata_backend_adapter.rs b/dan_layer/core/src/storage/metadata_backend_adapter.rs new file mode 100644 index 0000000000..6d3999c93c --- /dev/null +++ b/dan_layer/core/src/storage/metadata_backend_adapter.rs @@ -0,0 +1,47 @@ +// Copyright 2022. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +// TODO: probably want to use something like bors or consensus encoding +use tari_utilities::message_format::MessageFormat; + +use crate::storage::AtomicDb; + +pub trait MetadataBackendAdapter: AtomicDb + Send + Sync + Clone { + fn get_metadata( + &self, + key: &K, + transaction: &Self::DbTransaction, + ) -> Result, Self::Error>; + + fn set_metadata( + &self, + key: K, + value: T, + transaction: &Self::DbTransaction, + ) -> Result<(), Self::Error>; + + fn metadata_key_exists(&self, key: &K, transaction: &Self::DbTransaction) -> Result; +} + +pub trait AsKeyBytes { + fn as_key_bytes(&self) -> &[u8]; +} diff --git a/dan_layer/core/src/storage/mocks/chain_db.rs b/dan_layer/core/src/storage/mocks/chain_db.rs index 2ee2caa636..b6d648afe7 100644 --- a/dan_layer/core/src/storage/mocks/chain_db.rs +++ b/dan_layer/core/src/storage/mocks/chain_db.rs @@ -22,11 +22,15 @@ use std::sync::{Arc, RwLock}; +use tari_utilities::message_format::MessageFormat; + use super::MemoryChainDb; use crate::{ models::{QuorumCertificate, TreeNodeHash}, storage::{ - chain::{ChainDbBackendAdapter, DbInstruction, DbNode, DbQc}, + chain::{ChainDbBackendAdapter, ChainDbMetadataKey, DbInstruction, DbNode, DbQc}, + AtomicDb, + MetadataBackendAdapter, StorageError, }, }; @@ -42,9 +46,20 @@ impl MockChainDbBackupAdapter { } } -impl ChainDbBackendAdapter for MockChainDbBackupAdapter { - type BackendTransaction = (); +impl AtomicDb for MockChainDbBackupAdapter { + type DbTransaction = (); type Error = StorageError; + + fn create_transaction(&self) -> Result { + Ok(()) + } + + fn commit(&self, _: &Self::DbTransaction) -> Result<(), Self::Error> { + Ok(()) + } +} + +impl ChainDbBackendAdapter for MockChainDbBackupAdapter { type Id = usize; type Payload = String; @@ -59,17 +74,13 @@ impl ChainDbBackendAdapter for MockChainDbBackupAdapter { Ok(exists) } - fn create_transaction(&self) -> Result { - Ok(()) - } - - fn insert_node(&self, node: &DbNode, _: &Self::BackendTransaction) -> Result<(), Self::Error> { + fn insert_node(&self, node: &DbNode, _: &Self::DbTransaction) -> Result<(), Self::Error> { let mut lock = self.db.write()?; lock.nodes.insert(node.clone()); Ok(()) } - fn update_node(&self, id: &Self::Id, item: &DbNode, _: &Self::BackendTransaction) -> Result<(), Self::Error> { + fn update_node(&self, id: &Self::Id, item: &DbNode, _: &Self::DbTransaction) -> Result<(), Self::Error> { let mut lock = self.db.write()?; if lock.nodes.update(*id, item.clone()) { Ok(()) @@ -78,16 +89,12 @@ impl ChainDbBackendAdapter for MockChainDbBackupAdapter { } } - fn insert_instruction(&self, item: &DbInstruction, _: &Self::BackendTransaction) -> Result<(), Self::Error> { + fn insert_instruction(&self, item: &DbInstruction, _: &Self::DbTransaction) -> Result<(), Self::Error> { let mut lock = self.db.write()?; lock.instructions.insert(item.clone()); Ok(()) } - fn commit(&self, _: &Self::BackendTransaction) -> Result<(), Self::Error> { - Ok(()) - } - fn locked_qc_id(&self) -> Self::Id { 1 } @@ -157,7 +164,7 @@ impl ChainDbBackendAdapter for MockChainDbBackupAdapter { Ok(recs) } - fn update_prepare_qc(&self, item: &DbQc, _transaction: &Self::BackendTransaction) -> Result<(), Self::Error> { + fn update_prepare_qc(&self, item: &DbQc, _transaction: &Self::DbTransaction) -> Result<(), Self::Error> { let mut lock = self.db.write()?; let id = lock .prepare_qc @@ -169,7 +176,7 @@ impl ChainDbBackendAdapter for MockChainDbBackupAdapter { Ok(()) } - fn update_locked_qc(&self, locked_qc: &DbQc, _transaction: &Self::BackendTransaction) -> Result<(), Self::Error> { + fn update_locked_qc(&self, locked_qc: &DbQc, _transaction: &Self::DbTransaction) -> Result<(), Self::Error> { let mut lock = self.db.write()?; let id = lock .locked_qc @@ -196,3 +203,49 @@ impl ChainDbBackendAdapter for MockChainDbBackupAdapter { Ok(found) } } + +impl MetadataBackendAdapter for MockChainDbBackupAdapter { + fn get_metadata( + &self, + key: &ChainDbMetadataKey, + _transaction: &Self::DbTransaction, + ) -> Result, Self::Error> { + let lock = self.db.read()?; + let v = lock + .metadata + .rows() + .find(|(k, _)| k == key) + .map(|(_, v)| v) + .map(|v| T::from_binary(v).unwrap()); + Ok(v) + } + + fn set_metadata( + &self, + key: ChainDbMetadataKey, + value: T, + _transaction: &Self::DbTransaction, + ) -> Result<(), Self::Error> { + let mut lock = self.db.write()?; + let value = value.to_binary().unwrap(); + let id = lock.metadata.records().find(|(_, (k, _))| *k == key).map(|(id, _)| id); + match id { + Some(id) => { + lock.metadata.update(id, (key, value)); + }, + None => { + lock.metadata.insert((key, value)); + }, + } + + Ok(()) + } + + fn metadata_key_exists( + &self, + _key: &ChainDbMetadataKey, + _transaction: &Self::DbTransaction, + ) -> Result { + todo!() + } +} diff --git a/dan_layer/core/src/storage/mocks/mod.rs b/dan_layer/core/src/storage/mocks/mod.rs index e8f7ce4a19..e5af0da55c 100644 --- a/dan_layer/core/src/storage/mocks/mod.rs +++ b/dan_layer/core/src/storage/mocks/mod.rs @@ -32,7 +32,7 @@ use tari_common_types::types::FixedHash; use tari_dan_engine::state::{mocks::state_db::MockStateDbBackupAdapter, StateDb}; use crate::storage::{ - chain::{ChainDb, DbInstruction, DbNode, DbQc}, + chain::{ChainDb, ChainDbMetadataKey, DbInstruction, DbNode, DbQc}, global::GlobalDb, mocks::{chain_db::MockChainDbBackupAdapter, global_db::MockGlobalDbBackupAdapter}, DbFactory, @@ -106,6 +106,7 @@ pub(self) struct MemoryChainDb { pub instructions: MemoryDbTable, pub prepare_qc: MemoryDbTable, pub locked_qc: MemoryDbTable, + pub metadata: MemoryDbTable<(ChainDbMetadataKey, Vec)>, } #[derive(Debug)] diff --git a/dan_layer/core/src/storage/mod.rs b/dan_layer/core/src/storage/mod.rs index 5f085f3f03..82e2ba4369 100644 --- a/dan_layer/core/src/storage/mod.rs +++ b/dan_layer/core/src/storage/mod.rs @@ -28,8 +28,12 @@ mod chain_storage_service; mod db_factory; mod error; pub mod global; +mod metadata_backend_adapter; mod store; +pub use atomic_db::AtomicDb; pub use db_factory::DbFactory; +pub use metadata_backend_adapter::{AsKeyBytes, MetadataBackendAdapter}; +mod atomic_db; pub mod mocks; diff --git a/dan_layer/core/src/workers/consensus_worker.rs b/dan_layer/core/src/workers/consensus_worker.rs index 61b25de7c6..da23388a3c 100644 --- a/dan_layer/core/src/workers/consensus_worker.rs +++ b/dan_layer/core/src/workers/consensus_worker.rs @@ -271,6 +271,7 @@ impl<'a, T: ServiceSpecification> ConsensusWorkerProcessor<'a, } async fn commit(&mut self) -> Result { + let current_checkpoint_num = self.chain_db.get_current_checkpoint_number()?; let mut unit_of_work = self.chain_db.new_unit_of_work(); let mut state = states::CommitState::::new( self.worker.node_address.clone(), @@ -285,6 +286,7 @@ impl<'a, T: ServiceSpecification> ConsensusWorkerProcessor<'a, &mut self.worker.outbound_service, &self.worker.signing_service, unit_of_work.clone(), + current_checkpoint_num, ) .await?; unit_of_work.commit()?; @@ -313,10 +315,15 @@ impl<'a, T: ServiceSpecification> ConsensusWorkerProcessor<'a, if let Some(mut state_tx) = self.worker.state_db_unit_of_work.take() { state_tx.commit()?; let signatures = state.collected_checkpoint_signatures(); - self.worker - .checkpoint_manager - .create_checkpoint(state_tx.calculate_root()?, signatures) - .await?; + // TODO: Read checkpoint interval from constitution + if self.worker.current_view_id.as_u64() % 50 == 0 { + let checkpoint_number = self.chain_db.get_current_checkpoint_number()?; + self.worker + .checkpoint_manager + .create_checkpoint(checkpoint_number, state_tx.calculate_root()?, signatures) + .await?; + self.chain_db.increment_checkpoint_number()?; + } Ok(res) } else { // technically impossible diff --git a/dan_layer/core/src/workers/states/commit_state.rs b/dan_layer/core/src/workers/states/commit_state.rs index 1ba3a374d8..e6a80b1f90 100644 --- a/dan_layer/core/src/workers/states/commit_state.rs +++ b/dan_layer/core/src/workers/states/commit_state.rs @@ -82,6 +82,7 @@ impl CommitState { outbound_service: &mut TSpecification::OutboundService, signing_service: &TSpecification::SigningService, mut unit_of_work: TUnitOfWork, + checkpoint_number: u64, ) -> Result { self.received_new_view_messages.clear(); let timeout = sleep(timeout); @@ -89,21 +90,21 @@ impl CommitState { loop { tokio::select! { r = inbound_services.wait_for_message(HotStuffMessageType::PreCommit, current_view.view_id()) => { - let (from, message) = r?; - if current_view.is_leader() { - if let Some(result) = self.process_leader_message(current_view, message.clone(), &from, outbound_service).await?{ - break Ok(result); + let (from, message) = r?; + if current_view.is_leader() { + if let Some(result) = self.process_leader_message(current_view, message.clone(), &from, outbound_service).await?{ + break Ok(result); + } } - } - }, - r = inbound_services.wait_for_qc(HotStuffMessageType::PreCommit, current_view.view_id()) => { - let (from, message) = r?; - let leader = self.committee.leader_for_view(current_view.view_id).clone(); - if let Some(result) = self.process_replica_message(&message, current_view, &from, &leader, outbound_service, signing_service, &mut unit_of_work).await? { - break Ok(result); - } - } - _ = &mut timeout => { + }, + r = inbound_services.wait_for_qc(HotStuffMessageType::PreCommit, current_view.view_id()) => { + let (from, message) = r?; + let leader = self.committee.leader_for_view(current_view.view_id).clone(); + if let Some(result) = self.process_replica_message(&message, current_view, &from, &leader, outbound_service, signing_service, &mut unit_of_work, checkpoint_number).await? { + break Ok(result); + } + } + _ = &mut timeout => { break Ok(ConsensusWorkerStateEvent::TimedOut); } } @@ -166,21 +167,6 @@ impl CommitState { .await } - fn generate_checkpoint_signature(&self) -> SignerSignature { - // TODO: wire in the signer secret (probably node identity) - let signer_secret = PrivateKey::random(&mut OsRng); - // TODO: Validators should have agreed on a checkpoint commitment and included this in the signature for base - // layer validation - let commitment = Commitment::default(); - // TODO: We need the finalized state root to be able to produce a signature - let state_root = FixedHash::zero(); - // TODO: Load next checkpoint number from db - let checkpoint_number = 0; - - let challenge = CheckpointChallenge::new(&self.contract_id, &commitment, state_root, checkpoint_number); - SignerSignature::sign(&signer_secret, challenge) - } - fn create_qc(&self, current_view: &View) -> Option { // TODO: This can be done in one loop instead of two let mut node_hash = None; @@ -217,6 +203,7 @@ impl CommitState { outbound: &mut TSpecification::OutboundService, signing_service: &TSpecification::SigningService, unit_of_work: &mut TUnitOfWork, + checkpoint_number: u64, ) -> Result, DigitalAssetError> { if let Some(justify) = message.justify() { if !justify.matches(HotStuffMessageType::PreCommit, current_view.view_id) { @@ -245,6 +232,7 @@ impl CommitState { view_leader, current_view.view_id, signing_service, + checkpoint_number, ) .await?; Ok(Some(ConsensusWorkerStateEvent::Committed)) @@ -261,8 +249,18 @@ impl CommitState { view_leader: &TSpecification::Addr, view_number: ViewId, signing_service: &TSpecification::SigningService, + checkpoint_number: u64, ) -> Result<(), DigitalAssetError> { - let checkpoint_signature = self.generate_checkpoint_signature(); + // TODO: wire in the signer secret (probably node identity) + let signer_secret = PrivateKey::random(&mut OsRng); + // TODO: Validators should have agreed on a checkpoint commitment and included this in the signature for base + // layer validation + let commitment = Commitment::default(); + // TODO: We need the finalized state root to be able to produce a signature + let state_root = FixedHash::zero(); + + let challenge = CheckpointChallenge::new(&self.contract_id, &commitment, state_root, checkpoint_number); + let checkpoint_signature = SignerSignature::sign(&signer_secret, challenge); let mut message = HotStuffMessage::vote_commit(node, view_number, self.contract_id, checkpoint_signature); message.add_partial_sig(signing_service.sign(&self.node_id, &message.create_signature_challenge())?); outbound.send(self.node_id.clone(), view_leader.clone(), message).await diff --git a/dan_layer/storage_sqlite/Cargo.toml b/dan_layer/storage_sqlite/Cargo.toml index 0eb363fc10..e211645cce 100644 --- a/dan_layer/storage_sqlite/Cargo.toml +++ b/dan_layer/storage_sqlite/Cargo.toml @@ -5,16 +5,16 @@ edition = "2018" license = "BSD-3-Clause" [dependencies] -tari_dan_core = {path="../core"} -tari_common = { path = "../../common"} -tari_common_types = {path = "../../base_layer/common_types"} +tari_dan_core = { path = "../core" } +tari_common = { path = "../../common" } +tari_common_types = { path = "../../base_layer/common_types" } tari_utilities = { git = "https://github.com/tari-project/tari_utilities.git", tag = "v0.4.4" } -tari_dan_engine = { path = "../engine"} +tari_dan_engine = { path = "../engine" } -diesel = { version = "1.4.8", features = ["sqlite"] } +diesel = { version = "1.4.8", default-features = false, features = ["sqlite"] } diesel_migrations = "1.4.0" thiserror = "1.0.30" async-trait = "0.1.50" -tokio = { version="1.10", features = ["macros", "time"]} +tokio = { version = "1.10", features = ["macros", "time"] } tokio-stream = { version = "0.1.7", features = ["sync"] } log = { version = "0.4.8", features = ["std"] } diff --git a/dan_layer/storage_sqlite/migrations/2022-07-05-131723_create_metadata/down.sql b/dan_layer/storage_sqlite/migrations/2022-07-05-131723_create_metadata/down.sql new file mode 100644 index 0000000000..dfc0d8e01b --- /dev/null +++ b/dan_layer/storage_sqlite/migrations/2022-07-05-131723_create_metadata/down.sql @@ -0,0 +1 @@ +drop table metadata; \ No newline at end of file diff --git a/dan_layer/storage_sqlite/migrations/2022-07-05-131723_create_metadata/up.sql b/dan_layer/storage_sqlite/migrations/2022-07-05-131723_create_metadata/up.sql new file mode 100644 index 0000000000..c94fb1cd2b --- /dev/null +++ b/dan_layer/storage_sqlite/migrations/2022-07-05-131723_create_metadata/up.sql @@ -0,0 +1,5 @@ +create table metadata +( + key blob primary key not null, + value blob not null +) diff --git a/dan_layer/storage_sqlite/src/error.rs b/dan_layer/storage_sqlite/src/error.rs index fc07993ddd..b8a6c8cd44 100644 --- a/dan_layer/storage_sqlite/src/error.rs +++ b/dan_layer/storage_sqlite/src/error.rs @@ -50,6 +50,8 @@ pub enum SqliteStorageError { ModelError(#[from] ModelError), #[error("Conversion error:{reason}")] ConversionError { reason: String }, + #[error("Malformed metadata for key '{key}'")] + MalformedMetadata { key: String }, } impl From for StorageError { diff --git a/dan_layer/storage_sqlite/src/global/models/metadata.rs b/dan_layer/storage_sqlite/src/global/models/metadata.rs index 79075a6c3b..4d587f7a1e 100644 --- a/dan_layer/storage_sqlite/src/global/models/metadata.rs +++ b/dan_layer/storage_sqlite/src/global/models/metadata.rs @@ -25,7 +25,7 @@ use crate::global::schema::*; #[derive(Queryable, Insertable, Identifiable)] #[table_name = "metadata"] #[primary_key(key_name)] -pub struct Metadata { +pub struct GlobalMetadata { pub key_name: Vec, pub value: Vec, } diff --git a/dan_layer/storage_sqlite/src/global/sqlite_global_db_backend_adapter.rs b/dan_layer/storage_sqlite/src/global/sqlite_global_db_backend_adapter.rs index c7fbd50d4f..4bc3d7e219 100644 --- a/dan_layer/storage_sqlite/src/global/sqlite_global_db_backend_adapter.rs +++ b/dan_layer/storage_sqlite/src/global/sqlite_global_db_backend_adapter.rs @@ -28,7 +28,7 @@ use crate::{ error::SqliteStorageError, global::models::{ contract::{Contract, NewContract}, - metadata::Metadata, + metadata::GlobalMetadata, }, SqliteTransaction, }; @@ -74,7 +74,7 @@ impl GlobalDbBackendAdapter for SqliteGlobalDbBackendAdapter { let tx = self.create_transaction()?; match self.get_data_with_connection(&key, &tx) { - Ok(Some(r)) => diesel::update(&Metadata { + Ok(Some(r)) => diesel::update(&GlobalMetadata { key_name: key.as_key_bytes().to_vec(), value: r, }) @@ -103,7 +103,7 @@ impl GlobalDbBackendAdapter for SqliteGlobalDbBackendAdapter { use crate::global::schema::metadata::dsl; let connection = SqliteConnection::establish(self.database_url.as_str())?; - let row: Option = dsl::metadata + let row: Option = dsl::metadata .find(key.as_key_bytes()) .first(&connection) .optional() @@ -122,7 +122,7 @@ impl GlobalDbBackendAdapter for SqliteGlobalDbBackendAdapter { ) -> Result>, Self::Error> { use crate::global::schema::metadata::dsl; - let row: Option = dsl::metadata + let row: Option = dsl::metadata .find(key.as_key_bytes()) .first(tx.connection()) .optional() diff --git a/dan_layer/storage_sqlite/src/models/metadata.rs b/dan_layer/storage_sqlite/src/models/metadata.rs new file mode 100644 index 0000000000..f8853174a9 --- /dev/null +++ b/dan_layer/storage_sqlite/src/models/metadata.rs @@ -0,0 +1,31 @@ +// Copyright 2022. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use crate::schema::metadata; + +#[derive(Queryable, Insertable, Identifiable)] +#[table_name = "metadata"] +#[primary_key(key)] +pub struct Metadata { + pub key: Vec, + pub value: Vec, +} diff --git a/dan_layer/storage_sqlite/src/models/mod.rs b/dan_layer/storage_sqlite/src/models/mod.rs index bc30cc1599..b12e4238a0 100644 --- a/dan_layer/storage_sqlite/src/models/mod.rs +++ b/dan_layer/storage_sqlite/src/models/mod.rs @@ -22,6 +22,7 @@ pub mod instruction; pub mod locked_qc; +pub mod metadata; pub mod node; pub mod prepare_qc; pub mod state_key; diff --git a/dan_layer/storage_sqlite/src/schema.rs b/dan_layer/storage_sqlite/src/schema.rs index 0c2761b269..746f8b4efa 100644 --- a/dan_layer/storage_sqlite/src/schema.rs +++ b/dan_layer/storage_sqlite/src/schema.rs @@ -1,24 +1,24 @@ -// Copyright 2022. The Tari Project +// Copyright 2022. The Tari Project // -// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the -// following conditions are met: +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: // -// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following -// disclaimer. +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. // -// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the -// following disclaimer in the documentation and/or other materials provided with the distribution. +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. // -// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote -// products derived from this software without specific prior written permission. +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. // -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, -// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE -// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. table! { instructions (id) { @@ -42,6 +42,13 @@ table! { } } +table! { + metadata (key) { + key -> Binary, + value -> Binary, + } +} + table! { nodes (id) { id -> Integer, @@ -96,6 +103,7 @@ joinable!(instructions -> nodes (node_id)); allow_tables_to_appear_in_same_query!( instructions, locked_qc, + metadata, nodes, prepare_qc, state_keys, diff --git a/dan_layer/storage_sqlite/src/sqlite_chain_backend_adapter.rs b/dan_layer/storage_sqlite/src/sqlite_chain_backend_adapter.rs index f48f95618f..ec82bf75e0 100644 --- a/dan_layer/storage_sqlite/src/sqlite_chain_backend_adapter.rs +++ b/dan_layer/storage_sqlite/src/sqlite_chain_backend_adapter.rs @@ -20,21 +20,30 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::convert::{TryFrom, TryInto}; +use std::{ + convert::{TryFrom, TryInto}, + fmt::Display, +}; use diesel::{prelude::*, Connection, SqliteConnection}; use log::*; use tari_dan_core::{ models::{HotStuffMessageType, QuorumCertificate, TariDanPayload, TreeNodeHash, ValidatorSignature, ViewId}, - storage::chain::{ChainDbBackendAdapter, DbInstruction, DbNode, DbQc}, + storage::{ + chain::{ChainDbBackendAdapter, DbInstruction, DbNode, DbQc}, + AsKeyBytes, + AtomicDb, + MetadataBackendAdapter, + }, }; -use tari_utilities::ByteArray; +use tari_utilities::{message_format::MessageFormat, ByteArray}; use crate::{ error::SqliteStorageError, models::{ instruction::{Instruction, NewInstruction}, locked_qc::LockedQc, + metadata::Metadata, node::{NewNode, Node}, prepare_qc::PrepareQc, }, @@ -59,26 +68,11 @@ impl SqliteChainBackendAdapter { } } -impl ChainDbBackendAdapter for SqliteChainBackendAdapter { - type BackendTransaction = SqliteTransaction; +impl AtomicDb for SqliteChainBackendAdapter { + type DbTransaction = SqliteTransaction; type Error = SqliteStorageError; - type Id = i32; - type Payload = TariDanPayload; - - fn is_empty(&self) -> Result { - let connection = self.get_connection()?; - let n: Option = - nodes::table - .first(&connection) - .optional() - .map_err(|source| SqliteStorageError::DieselError { - source, - operation: "is_empty".to_string(), - })?; - Ok(n.is_none()) - } - fn create_transaction(&self) -> Result { + fn create_transaction(&self) -> Result { let connection = self.get_connection()?; connection .execute("PRAGMA foreign_keys = ON;") @@ -96,6 +90,36 @@ impl ChainDbBackendAdapter for SqliteChainBackendAdapter { Ok(SqliteTransaction::new(connection)) } + fn commit(&self, transaction: &Self::DbTransaction) -> Result<(), Self::Error> { + debug!(target: LOG_TARGET, "Committing transaction"); + transaction + .connection() + .execute("COMMIT TRANSACTION") + .map_err(|source| SqliteStorageError::DieselError { + source, + operation: "commit::chain".to_string(), + })?; + Ok(()) + } +} + +impl ChainDbBackendAdapter for SqliteChainBackendAdapter { + type Id = i32; + type Payload = TariDanPayload; + + fn is_empty(&self) -> Result { + let connection = self.get_connection()?; + let n: Option = + nodes::table + .first(&connection) + .optional() + .map_err(|source| SqliteStorageError::DieselError { + source, + operation: "is_empty".to_string(), + })?; + Ok(n.is_none()) + } + fn node_exists(&self, node_hash: &TreeNodeHash) -> Result { let connection = self.get_connection()?; use crate::schema::nodes::dsl; @@ -137,7 +161,7 @@ impl ChainDbBackendAdapter for SqliteChainBackendAdapter { } } - fn insert_node(&self, item: &DbNode, transaction: &Self::BackendTransaction) -> Result<(), Self::Error> { + fn insert_node(&self, item: &DbNode, transaction: &Self::DbTransaction) -> Result<(), Self::Error> { debug!(target: LOG_TARGET, "Inserting {:?}", item); #[allow(clippy::cast_possible_wrap)] let new_node = NewNode { @@ -155,12 +179,7 @@ impl ChainDbBackendAdapter for SqliteChainBackendAdapter { Ok(()) } - fn update_node( - &self, - id: &Self::Id, - item: &DbNode, - transaction: &Self::BackendTransaction, - ) -> Result<(), Self::Error> { + fn update_node(&self, id: &Self::Id, item: &DbNode, transaction: &Self::DbTransaction) -> Result<(), Self::Error> { use crate::schema::nodes::dsl; // Should not be allowed to update hash, parent and height diesel::update(dsl::nodes.find(id)) @@ -179,7 +198,7 @@ impl ChainDbBackendAdapter for SqliteChainBackendAdapter { } #[allow(clippy::cast_possible_wrap)] - fn update_locked_qc(&self, item: &DbQc, transaction: &Self::BackendTransaction) -> Result<(), Self::Error> { + fn update_locked_qc(&self, item: &DbQc, transaction: &Self::DbTransaction) -> Result<(), Self::Error> { use crate::schema::locked_qc::dsl; let message_type = i32::from(item.message_type.as_u8()); let existing: Result = dsl::locked_qc.find(1).first(transaction.connection()); @@ -218,7 +237,7 @@ impl ChainDbBackendAdapter for SqliteChainBackendAdapter { } #[allow(clippy::cast_possible_wrap)] - fn update_prepare_qc(&self, item: &DbQc, transaction: &Self::BackendTransaction) -> Result<(), Self::Error> { + fn update_prepare_qc(&self, item: &DbQc, transaction: &Self::DbTransaction) -> Result<(), Self::Error> { use crate::schema::prepare_qc::dsl; let message_type = i32::from(item.message_type.as_u8()); let existing: Result = dsl::prepare_qc.find(1).first(transaction.connection()); @@ -279,18 +298,6 @@ impl ChainDbBackendAdapter for SqliteChainBackendAdapter { .transpose() } - fn commit(&self, transaction: &Self::BackendTransaction) -> Result<(), Self::Error> { - debug!(target: LOG_TARGET, "Committing transaction"); - transaction - .connection() - .execute("COMMIT TRANSACTION") - .map_err(|source| SqliteStorageError::DieselError { - source, - operation: "commit::chain".to_string(), - })?; - Ok(()) - } - fn locked_qc_id(&self) -> Self::Id { 1 } @@ -406,11 +413,7 @@ impl ChainDbBackendAdapter for SqliteChainBackendAdapter { } } - fn insert_instruction( - &self, - item: &DbInstruction, - transaction: &Self::BackendTransaction, - ) -> Result<(), Self::Error> { + fn insert_instruction(&self, item: &DbInstruction, transaction: &Self::DbTransaction) -> Result<(), Self::Error> { use crate::schema::nodes::dsl; // TODO: this could be made more efficient let node: Node = dsl::nodes @@ -469,3 +472,76 @@ impl ChainDbBackendAdapter for SqliteChainBackendAdapter { Ok(instructions) } } + +impl MetadataBackendAdapter for SqliteChainBackendAdapter { + fn get_metadata( + &self, + key: &K, + transaction: &Self::DbTransaction, + ) -> Result, Self::Error> { + use crate::schema::metadata::dsl; + debug!(target: LOG_TARGET, "get_metadata: key = {}", key); + let value = dsl::metadata + .select(metadata::value) + .filter(metadata::key.eq(key.as_key_bytes())) + .first::>(transaction.connection()) + .optional() + .map_err(|source| SqliteStorageError::DieselError { + source, + operation: "chain_db::get_metadata".to_string(), + })?; + + value + .map(|v| T::from_binary(&v)) + .transpose() + .map_err(|_| SqliteStorageError::MalformedMetadata { key: key.to_string() }) + } + + fn set_metadata(&self, key: K, value: T, tx: &Self::DbTransaction) -> Result<(), Self::Error> { + use crate::schema::metadata::dsl; + debug!(target: LOG_TARGET, "set_metadata: key = {}", key); + let value = value + .to_binary() + .map_err(|_| SqliteStorageError::MalformedMetadata { key: key.to_string() })?; + + // One day we will have upsert in diesel + if self.metadata_key_exists(&key, tx)? { + debug!(target: LOG_TARGET, "update_metadata: key = {}", key); + diesel::update(metadata::table.filter(dsl::key.eq(key.as_key_bytes()))) + .set(metadata::value.eq(value)) + .execute(tx.connection()) + .map_err(|source| SqliteStorageError::DieselError { + source, + operation: "chain_db::set_metadata".to_string(), + })?; + } else { + debug!(target: LOG_TARGET, "insert_metadata: key = {}", key); + diesel::insert_into(metadata::table) + .values(Metadata { + key: key.as_key_bytes().to_vec(), + value, + }) + .execute(tx.connection()) + .map_err(|source| SqliteStorageError::DieselError { + source, + operation: "chain_db::set_metadata".to_string(), + })?; + } + + Ok(()) + } + + fn metadata_key_exists(&self, key: &K, transaction: &Self::DbTransaction) -> Result { + use crate::schema::metadata::dsl; + let v = dsl::metadata + .select(metadata::key) + .filter(metadata::key.eq(key.as_key_bytes())) + .first::>(transaction.connection()) + .optional() + .map_err(|source| SqliteStorageError::DieselError { + source, + operation: "chain_db::get_metadata".to_string(), + })?; + Ok(v.is_some()) + } +}