From 9279de5002ad266526b9c0cae08eb37835bec7b1 Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Tue, 5 Apr 2022 14:40:04 +0300 Subject: [PATCH] refactor: add lock error to the StorageError (#3998) Description --- These changes remove unwrapping grpc connections from options in every method. Also it adds the `LockError` variant to the `StorageError`. Motivation and Context --- Reducing amount of `unwrap` calls, replace with `Result` when possible. How Has This Been Tested? --- CI --- .../src/grpc/services/base_node_client.rs | 50 +++++++------------ .../src/grpc/services/wallet_client.rs | 24 +++++---- dan_layer/core/src/services/asset_proxy.rs | 2 +- .../core/src/services/base_node_client.rs | 2 +- .../src/services/service_specification.rs | 22 +++----- .../src/services/validator_node_rpc_client.rs | 6 +-- .../storage/chain/chain_db_unit_of_work.rs | 18 +++---- dan_layer/core/src/storage/error.rs | 10 +++- dan_layer/core/src/storage/mocks/chain_db.rs | 28 +++++------ .../storage/state/state_db_unit_of_work.rs | 16 +++--- 10 files changed, 83 insertions(+), 95 deletions(-) diff --git a/applications/tari_validator_node/src/grpc/services/base_node_client.rs b/applications/tari_validator_node/src/grpc/services/base_node_client.rs index b31bc98874..46a1e9975d 100644 --- a/applications/tari_validator_node/src/grpc/services/base_node_client.rs +++ b/applications/tari_validator_node/src/grpc/services/base_node_client.rs @@ -35,10 +35,12 @@ use tari_dan_core::{ const LOG_TARGET: &str = "tari::validator_node::app"; +type Inner = grpc::base_node_client::BaseNodeClient; + #[derive(Clone)] pub struct GrpcBaseNodeClient { endpoint: SocketAddr, - inner: Option>, + inner: Option, } impl GrpcBaseNodeClient { @@ -46,21 +48,21 @@ impl GrpcBaseNodeClient { Self { endpoint, inner: None } } - pub async fn connect(&mut self) -> Result<(), DigitalAssetError> { - self.inner = Some(grpc::base_node_client::BaseNodeClient::connect(format!("http://{}", self.endpoint)).await?); - Ok(()) + pub async fn connection(&mut self) -> Result<&mut Inner, DigitalAssetError> { + if self.inner.is_none() { + let url = format!("http://{}", self.endpoint); + let inner = Inner::connect(url).await?; + self.inner = Some(inner); + } + self.inner + .as_mut() + .ok_or_else(|| DigitalAssetError::FatalError("no connection".into())) } } #[async_trait] impl BaseNodeClient for GrpcBaseNodeClient { async fn get_tip_info(&mut self) -> Result { - let inner = match self.inner.as_mut() { - Some(i) => i, - None => { - self.connect().await?; - self.inner.as_mut().unwrap() - }, - }; + let inner = self.connection().await?; let request = grpc::Empty {}; let result = inner.get_tip_info(request).await?.into_inner(); Ok(BaseLayerMetadata { @@ -74,13 +76,7 @@ impl BaseNodeClient for GrpcBaseNodeClient { asset_public_key: PublicKey, checkpoint_unique_id: Vec, ) -> Result, DigitalAssetError> { - let inner = match self.inner.as_mut() { - Some(i) => i, - None => { - self.connect().await?; - self.inner.as_mut().unwrap() - }, - }; + let inner = self.connection().await?; let request = grpc::GetTokensRequest { asset_public_key: asset_public_key.as_bytes().to_vec(), unique_ids: vec![checkpoint_unique_id], @@ -140,13 +136,7 @@ impl BaseNodeClient for GrpcBaseNodeClient { &mut self, dan_node_public_key: PublicKey, ) -> Result, DigitalAssetError> { - let inner = match self.inner.as_mut() { - Some(i) => i, - None => { - self.connect().await?; - self.inner.as_mut().unwrap() - }, - }; + let inner = self.connection().await?; // TODO: probably should use output mmr indexes here let request = grpc::ListAssetRegistrationsRequest { offset: 0, count: 100 }; let mut result = inner.list_asset_registrations(request).await?.into_inner(); @@ -198,18 +188,12 @@ impl BaseNodeClient for GrpcBaseNodeClient { &mut self, asset_public_key: PublicKey, ) -> Result, DigitalAssetError> { - let conn = match self.inner.as_mut() { - Some(i) => i, - None => { - self.connect().await?; - self.inner.as_mut().unwrap() - }, - }; + let inner = self.connection().await?; let req = grpc::GetAssetMetadataRequest { asset_public_key: asset_public_key.to_vec(), }; - let output = conn.get_asset_metadata(req).await.unwrap().into_inner(); + let output = inner.get_asset_metadata(req).await.unwrap().into_inner(); let mined_height = output.mined_height; let output = output diff --git a/applications/tari_validator_node/src/grpc/services/wallet_client.rs b/applications/tari_validator_node/src/grpc/services/wallet_client.rs index 616190fff4..4bb47d1e82 100644 --- a/applications/tari_validator_node/src/grpc/services/wallet_client.rs +++ b/applications/tari_validator_node/src/grpc/services/wallet_client.rs @@ -29,10 +29,12 @@ use tari_comms::types::CommsPublicKey; use tari_crypto::tari_utilities::ByteArray; use tari_dan_core::{models::StateRoot, services::WalletClient, DigitalAssetError}; +type Inner = grpc::wallet_client::WalletClient; + #[derive(Clone)] pub struct GrpcWalletClient { endpoint: SocketAddr, - inner: Option>, + inner: Option, } impl GrpcWalletClient { @@ -40,9 +42,15 @@ impl GrpcWalletClient { Self { endpoint, inner: None } } - pub async fn connect(&mut self) -> Result<(), DigitalAssetError> { - self.inner = Some(grpc::wallet_client::WalletClient::connect(format!("http://{}", self.endpoint)).await?); - Ok(()) + pub async fn connection(&mut self) -> Result<&mut Inner, DigitalAssetError> { + if self.inner.is_none() { + let url = format!("http://{}", self.endpoint); + let inner = Inner::connect(url).await?; + self.inner = Some(inner); + } + self.inner + .as_mut() + .ok_or_else(|| DigitalAssetError::FatalError("no connection".into())) } } @@ -55,13 +63,7 @@ impl WalletClient for GrpcWalletClient { state_root: &StateRoot, next_committee: Vec, ) -> Result<(), DigitalAssetError> { - let inner = match self.inner.as_mut() { - Some(i) => i, - None => { - self.connect().await?; - self.inner.as_mut().unwrap() - }, - }; + let inner = self.connection().await?; let request = CreateFollowOnAssetCheckpointRequest { asset_public_key: asset_public_key.as_bytes().to_vec(), diff --git a/dan_layer/core/src/services/asset_proxy.rs b/dan_layer/core/src/services/asset_proxy.rs index 04aec04df9..6dc34917e5 100644 --- a/dan_layer/core/src/services/asset_proxy.rs +++ b/dan_layer/core/src/services/asset_proxy.rs @@ -43,7 +43,7 @@ use crate::{ const LOG_TARGET: &str = "tari::dan_layer::core::services::asset_proxy"; #[async_trait] -pub trait AssetProxy { +pub trait AssetProxy: Send + Sync { async fn invoke_method( &self, asset_public_key: &PublicKey, diff --git a/dan_layer/core/src/services/base_node_client.rs b/dan_layer/core/src/services/base_node_client.rs index c6fae1207e..29547e290c 100644 --- a/dan_layer/core/src/services/base_node_client.rs +++ b/dan_layer/core/src/services/base_node_client.rs @@ -29,7 +29,7 @@ use crate::{ }; #[async_trait] -pub trait BaseNodeClient { +pub trait BaseNodeClient: Send + Sync { async fn get_tip_info(&mut self) -> Result; async fn get_current_checkpoint( diff --git a/dan_layer/core/src/services/service_specification.rs b/dan_layer/core/src/services/service_specification.rs index e2a5583d4a..8b493dd566 100644 --- a/dan_layer/core/src/services/service_specification.rs +++ b/dan_layer/core/src/services/service_specification.rs @@ -45,9 +45,9 @@ use crate::{ /// This trait is intended to only include `types` and no methods. pub trait ServiceSpecification: Default + Clone { type Addr: NodeAddressable; - type AssetProcessor: AssetProcessor + Clone + Sync + Send + 'static; - type AssetProxy: AssetProxy + Clone + Sync + Send + 'static; - type BaseNodeClient: BaseNodeClient + Clone + Sync + Send + 'static; + type AssetProcessor: AssetProcessor + Clone; + type AssetProxy: AssetProxy + Clone; + type BaseNodeClient: BaseNodeClient + Clone; type ChainDbBackendAdapter: ChainDbBackendAdapter; type ChainStorageService: ChainStorageService; type CheckpointManager: CheckpointManager; @@ -55,22 +55,16 @@ pub trait ServiceSpecification: Default + Clone { type DbFactory: DbFactory< StateDbBackendAdapter = Self::StateDbBackendAdapter, ChainDbBackendAdapter = Self::ChainDbBackendAdapter, - > + Clone - + Sync - + Send - + 'static; + > + Clone; type EventsPublisher: EventsPublisher; - type InboundConnectionService: InboundConnectionService - + 'static - + Send - + Sync; - type MempoolService: MempoolService + Clone + Sync + Send + 'static; + type InboundConnectionService: InboundConnectionService; + type MempoolService: MempoolService + Clone; type OutboundService: OutboundService; type Payload: Payload; type PayloadProcessor: PayloadProcessor; type PayloadProvider: PayloadProvider; type SigningService: SigningService; type StateDbBackendAdapter: StateDbBackendAdapter; - type ValidatorNodeClientFactory: ValidatorNodeClientFactory + Clone + Sync + Send + 'static; - type WalletClient: WalletClient + Clone + Sync + Send + 'static; + type ValidatorNodeClientFactory: ValidatorNodeClientFactory + Clone; + type WalletClient: WalletClient + Clone; } diff --git a/dan_layer/core/src/services/validator_node_rpc_client.rs b/dan_layer/core/src/services/validator_node_rpc_client.rs index 1215f518bf..9152077c3f 100644 --- a/dan_layer/core/src/services/validator_node_rpc_client.rs +++ b/dan_layer/core/src/services/validator_node_rpc_client.rs @@ -34,14 +34,14 @@ use crate::{ services::infrastructure_services::NodeAddressable, }; -pub trait ValidatorNodeClientFactory { +pub trait ValidatorNodeClientFactory: Send + Sync { type Addr: NodeAddressable; - type Client: ValidatorNodeRpcClient + Sync + Send; + type Client: ValidatorNodeRpcClient; fn create_client(&self, address: &Self::Addr) -> Self::Client; } #[async_trait] -pub trait ValidatorNodeRpcClient { +pub trait ValidatorNodeRpcClient: Send + Sync { async fn invoke_read_method( &mut self, asset_public_key: &PublicKey, diff --git a/dan_layer/core/src/storage/chain/chain_db_unit_of_work.rs b/dan_layer/core/src/storage/chain/chain_db_unit_of_work.rs index 1f13a37b5c..78b28c3666 100644 --- a/dan_layer/core/src/storage/chain/chain_db_unit_of_work.rs +++ b/dan_layer/core/src/storage/chain/chain_db_unit_of_work.rs @@ -79,7 +79,7 @@ impl ChainDbUnitOfWork for ChainDbUnitOf // } fn commit(&mut self) -> Result<(), StorageError> { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.write()?; let tx = inner .backend_adapter .create_transaction() @@ -143,7 +143,7 @@ impl ChainDbUnitOfWork for ChainDbUnitOf } fn add_node(&mut self, hash: TreeNodeHash, parent: TreeNodeHash, height: u32) -> Result<(), StorageError> { - self.inner.write().unwrap().nodes.push(( + self.inner.write()?.nodes.push(( None, UnitOfWorkTracker::new( DbNode { @@ -159,7 +159,7 @@ impl ChainDbUnitOfWork for ChainDbUnitOf } fn add_instruction(&mut self, node_hash: TreeNodeHash, instruction: Instruction) -> Result<(), StorageError> { - self.inner.write().unwrap().instructions.push(( + self.inner.write()?.instructions.push(( None, UnitOfWorkTracker::new(DbInstruction { node_hash, instruction }, true), )); @@ -167,7 +167,7 @@ impl ChainDbUnitOfWork for ChainDbUnitOf } fn get_locked_qc(&mut self) -> Result { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.write()?; if let Some(locked_qc) = &inner.locked_qc { let locked_qc = locked_qc.get(); @@ -197,7 +197,7 @@ impl ChainDbUnitOfWork for ChainDbUnitOf } fn set_locked_qc(&mut self, qc: &QuorumCertificate) -> Result<(), StorageError> { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.write()?; if let Some(locked_qc) = &inner.locked_qc.as_ref() { let mut locked_qc = locked_qc.get_mut(); @@ -230,7 +230,7 @@ impl ChainDbUnitOfWork for ChainDbUnitOf } fn get_prepare_qc(&mut self) -> Result, StorageError> { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.write()?; if let Some(prepare_qc) = &inner.prepare_qc { let prepare_qc = prepare_qc.get(); @@ -265,7 +265,7 @@ impl ChainDbUnitOfWork for ChainDbUnitOf fn set_prepare_qc(&mut self, qc: &QuorumCertificate) -> Result<(), StorageError> { // put it in the tracker let _ = self.get_prepare_qc()?; - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.write()?; match inner.prepare_qc.as_mut() { None => { inner.prepare_qc = Some(UnitOfWorkTracker::new( @@ -291,7 +291,7 @@ impl ChainDbUnitOfWork for ChainDbUnitOf } fn commit_node(&mut self, node_hash: &TreeNodeHash) -> Result<(), StorageError> { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.write()?; let found_node = inner.find_proposed_node(node_hash)?; let mut node = found_node.1.get_mut(); node.is_committed = true; @@ -299,7 +299,7 @@ impl ChainDbUnitOfWork for ChainDbUnitOf } fn get_tip_node(&self) -> Result, StorageError> { - let inner = self.inner.read().unwrap(); + let inner = self.inner.read()?; inner.get_tip_node() } } diff --git a/dan_layer/core/src/storage/error.rs b/dan_layer/core/src/storage/error.rs index dd6dc11f2f..8fc2705705 100644 --- a/dan_layer/core/src/storage/error.rs +++ b/dan_layer/core/src/storage/error.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::io; +use std::{io, sync::PoisonError}; use lmdb_zero as lmdb; use tari_mmr::error::MerkleMountainRangeError; @@ -52,4 +52,12 @@ pub enum StorageError { MerkleMountainRangeError(#[from] MerkleMountainRangeError), #[error("General storage error: {details}")] General { details: String }, + #[error("Lock error")] + LockError, +} + +impl From> for StorageError { + fn from(_err: PoisonError) -> Self { + Self::LockError + } } diff --git a/dan_layer/core/src/storage/mocks/chain_db.rs b/dan_layer/core/src/storage/mocks/chain_db.rs index 70eeb830a0..2ee2caa636 100644 --- a/dan_layer/core/src/storage/mocks/chain_db.rs +++ b/dan_layer/core/src/storage/mocks/chain_db.rs @@ -49,12 +49,12 @@ impl ChainDbBackendAdapter for MockChainDbBackupAdapter { type Payload = String; fn is_empty(&self) -> Result { - let lock = self.db.read().unwrap(); + let lock = self.db.read()?; Ok(lock.nodes.is_empty()) } fn node_exists(&self, node_hash: &TreeNodeHash) -> Result { - let lock = self.db.read().unwrap(); + let lock = self.db.read()?; let exists = lock.nodes.rows().any(|rec| rec.hash == *node_hash); Ok(exists) } @@ -64,13 +64,13 @@ impl ChainDbBackendAdapter for MockChainDbBackupAdapter { } fn insert_node(&self, node: &DbNode, _: &Self::BackendTransaction) -> Result<(), Self::Error> { - let mut lock = self.db.write().unwrap(); + let mut lock = self.db.write()?; lock.nodes.insert(node.clone()); Ok(()) } fn update_node(&self, id: &Self::Id, item: &DbNode, _: &Self::BackendTransaction) -> Result<(), Self::Error> { - let mut lock = self.db.write().unwrap(); + let mut lock = self.db.write()?; if lock.nodes.update(*id, item.clone()) { Ok(()) } else { @@ -79,7 +79,7 @@ impl ChainDbBackendAdapter for MockChainDbBackupAdapter { } fn insert_instruction(&self, item: &DbInstruction, _: &Self::BackendTransaction) -> Result<(), Self::Error> { - let mut lock = self.db.write().unwrap(); + let mut lock = self.db.write()?; lock.instructions.insert(item.clone()); Ok(()) } @@ -97,7 +97,7 @@ impl ChainDbBackendAdapter for MockChainDbBackupAdapter { } fn find_highest_prepared_qc(&self) -> Result { - let lock = self.db.read().unwrap(); + let lock = self.db.read()?; let highest = lock .prepare_qc .rows() @@ -112,21 +112,21 @@ impl ChainDbBackendAdapter for MockChainDbBackupAdapter { } fn get_locked_qc(&self) -> Result { - let lock = self.db.read().unwrap(); + let lock = self.db.read()?; // FIXME: when this implementation is finalized in sqlite/lmdb impl let rec = lock.locked_qc.rows().next().cloned().map(Into::into).unwrap(); Ok(rec) } fn get_prepare_qc(&self) -> Result, Self::Error> { - let lock = self.db.read().unwrap(); + let lock = self.db.read()?; // FIXME: when this implementation is finalized in sqlite/lmdb impl let rec = lock.prepare_qc.rows().next().cloned().map(Into::into); Ok(rec) } fn find_node_by_hash(&self, node_hash: &TreeNodeHash) -> Result, Self::Error> { - let lock = self.db.read().unwrap(); + let lock = self.db.read()?; let recs = lock .nodes .records() @@ -136,7 +136,7 @@ impl ChainDbBackendAdapter for MockChainDbBackupAdapter { } fn find_node_by_parent_hash(&self, parent_hash: &TreeNodeHash) -> Result, Self::Error> { - let lock = self.db.read().unwrap(); + let lock = self.db.read()?; let rec = lock .nodes .records() @@ -146,7 +146,7 @@ impl ChainDbBackendAdapter for MockChainDbBackupAdapter { } fn find_all_instructions_by_node(&self, node_id: Self::Id) -> Result, Self::Error> { - let lock = self.db.read().unwrap(); + let lock = self.db.read()?; let node = lock.nodes.get(node_id).ok_or(StorageError::NotFound)?; let recs = lock .instructions @@ -158,7 +158,7 @@ impl ChainDbBackendAdapter for MockChainDbBackupAdapter { } fn update_prepare_qc(&self, item: &DbQc, _transaction: &Self::BackendTransaction) -> Result<(), Self::Error> { - let mut lock = self.db.write().unwrap(); + let mut lock = self.db.write()?; let id = lock .prepare_qc .records() @@ -170,7 +170,7 @@ impl ChainDbBackendAdapter for MockChainDbBackupAdapter { } fn update_locked_qc(&self, locked_qc: &DbQc, _transaction: &Self::BackendTransaction) -> Result<(), Self::Error> { - let mut lock = self.db.write().unwrap(); + let mut lock = self.db.write()?; let id = lock .locked_qc .records() @@ -182,7 +182,7 @@ impl ChainDbBackendAdapter for MockChainDbBackupAdapter { } fn get_tip_node(&self) -> Result, Self::Error> { - let lock = self.db.read().unwrap(); + let lock = self.db.read()?; let found = lock .nodes .rows() diff --git a/dan_layer/core/src/storage/state/state_db_unit_of_work.rs b/dan_layer/core/src/storage/state/state_db_unit_of_work.rs index 6a5c760229..5901c4897b 100644 --- a/dan_layer/core/src/storage/state/state_db_unit_of_work.rs +++ b/dan_layer/core/src/storage/state/state_db_unit_of_work.rs @@ -109,7 +109,7 @@ impl Clone for StateDbUnitOfWorkImpl StateDbUnitOfWork for StateDbUnitOfWorkImpl { fn set_value(&mut self, schema: String, key: Vec, value: Vec) -> Result<(), StorageError> { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.write()?; inner .updates .push(UnitOfWorkTracker::new(DbKeyValue { schema, key, value }, true)); @@ -122,7 +122,7 @@ impl StateDbUnitOfWork for StateDbUnitOf } fn commit(&mut self) -> Result<(), StorageError> { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.write()?; let tx = inner .backend_adapter .create_transaction() @@ -167,7 +167,7 @@ impl StateDbUnitOfWork for StateDbUnitOf /// Clears the state db immediately (before commit) - this will not be needed in future when build up the state from /// instructions/op logs fn clear_all_state(&self) -> Result<(), StorageError> { - let inner = self.inner.write().unwrap(); + let inner = self.inner.write()?; let tx = inner .backend_adapter .create_transaction() @@ -185,7 +185,7 @@ impl StateDbUnitOfWorkReader for StateDb } fn get_value(&self, schema: &str, key: &[u8]) -> Result>, StorageError> { - let inner = self.inner.read().unwrap(); + let inner = self.inner.read()?; // Hit the DB. inner .backend_adapter @@ -206,7 +206,7 @@ impl StateDbUnitOfWorkReader for StateDb } fn find_keys_by_value(&self, schema: &str, value: &[u8]) -> Result>, StorageError> { - let inner = self.inner.read().unwrap(); + let inner = self.inner.read()?; inner .backend_adapter .find_keys_by_value(schema, value) @@ -214,7 +214,7 @@ impl StateDbUnitOfWorkReader for StateDb } fn calculate_root(&self) -> Result { - let inner = self.inner.read().unwrap(); + let inner = self.inner.read()?; let tx = inner .backend_adapter .create_transaction() @@ -267,7 +267,7 @@ impl StateDbUnitOfWorkReader for StateDb } fn get_all_state(&self) -> Result, StorageError> { - let inner = self.inner.read().unwrap(); + let inner = self.inner.read()?; let tx = inner .backend_adapter .create_transaction() @@ -298,7 +298,7 @@ impl StateDbUnitOfWorkReader for StateDb } fn get_op_logs_for_height(&self, height: u64) -> Result, StorageError> { - let inner = self.inner.read().unwrap(); + let inner = self.inner.read()?; let tx = inner .backend_adapter .create_transaction()