Skip to content

Commit

Permalink
Merge branch 'development' into core-output-features-canonical-encoding
Browse files Browse the repository at this point in the history
* development:
  fix: fix confusing names in get_balance functions (tari-project#3447)
  feat: add sql query to obtain balance (tari-project#3446)
  fix: u64->i64->u64 conversion; chain split height as u64 (tari-project#3442)
  • Loading branch information
sdbondi committed Oct 12, 2021
2 parents b43270e + 6cd9228 commit 95a7868
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 107 deletions.
33 changes: 24 additions & 9 deletions applications/tari_base_node/src/grpc/base_node_grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,19 +324,34 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer {
let num_headers = cmp::min(num_headers, LIST_HEADERS_MAX_NUM_HEADERS);
let (mut tx, rx) = mpsc::channel(LIST_HEADERS_PAGE_SIZE);

let headers: Vec<u64> = if request.from_height != 0 {
let from_height = cmp::min(request.from_height, tip);

let headers: Vec<u64> = if from_height != 0 {
match sorting {
Sorting::Desc => ((cmp::max(0, request.from_height as i64 - num_headers as i64 + 1) as u64)..=
request.from_height)
.rev()
.collect(),
Sorting::Asc => (request.from_height..(request.from_height + num_headers)).collect(),
Sorting::Desc => {
let from = match from_height.overflowing_sub(num_headers) {
(_, true) => 0,
(res, false) => res + 1,
};
(from..=from_height).rev().collect()
},
Sorting::Asc => {
let to = match from_height.overflowing_add(num_headers) {
(_, true) => u64::MAX,
(res, false) => res,
};
(from_height..to).collect()
},
}
} else {
match sorting {
Sorting::Desc => ((cmp::max(0, tip as i64 - num_headers as i64 + 1) as u64)..=tip)
.rev()
.collect(),
Sorting::Desc => {
let from = match tip.overflowing_sub(num_headers) {
(_, true) => 0,
(res, false) => res + 1,
};
(from..=tip).rev().collect()
},
Sorting::Asc => (0..num_headers).collect(),
}
};
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ message FindChainSplitResponse {
// An ordered list of headers starting from next header after the matching hash, up until `FindChainSplitRequest::count`
repeated tari.core.BlockHeader headers = 1;
// The index of the hash that matched from `FindChainSplitRequest::block_hashes`. This value could also be used to know how far back a split occurs.
uint32 fork_hash_index = 2;
uint64 fork_hash_index = 2;
/// The current header height of this node
uint64 tip_height = 3;
}
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/sync/header_sync/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub enum BlockHeaderSyncError {
#[error("Sync failed for all peers")]
SyncFailedAllPeers,
#[error("Peer sent a found hash index that was out of range (Expected less than {0}, Found: {1})")]
FoundHashIndexOutOfRange(u32, u32),
FoundHashIndexOutOfRange(u64, u64),
#[error("Failed to ban peer: {0}")]
FailedToBan(ConnectivityError),
#[error("Connectivity Error: {0}")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,15 +374,15 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
peer
);

if fork_hash_index >= block_hashes.len() as u32 {
if fork_hash_index >= block_hashes.len() as u64 {
let _ = self
.ban_peer_long(peer.clone(), BanReason::SplitHashGreaterThanHashes {
fork_hash_index,
num_block_hashes: block_hashes.len(),
})
.await;
return Err(BlockHeaderSyncError::FoundHashIndexOutOfRange(
block_hashes.len() as u32,
block_hashes.len() as u64,
fork_hash_index,
));
}
Expand Down Expand Up @@ -658,7 +658,7 @@ enum BanReason {
({num_block_hashes})"
)]
SplitHashGreaterThanHashes {
fork_hash_index: u32,
fork_hash_index: u64,
num_block_hashes: usize,
},
#[error("Peer sent invalid header: {0}")]
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/sync/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;

Ok(Response::new(FindChainSplitResponse {
fork_hash_index: idx as u32,
fork_hash_index: idx as u64,
headers: headers.into_iter().map(Into::into).collect(),
tip_height: metadata.height_of_longest_chain(),
}))
Expand Down
19 changes: 13 additions & 6 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,11 @@ where
.await
.map(|_| OutputManagerResponse::OutputMetadataSignatureUpdated),
OutputManagerRequest::GetBalance => {
let current_chain_tip = match self.base_node_service.get_chain_metadata().await {
let current_tip_for_time_lock_calculation = match self.base_node_service.get_chain_metadata().await {
Ok(metadata) => metadata.map(|m| m.height_of_longest_chain()),
Err(_) => None,
};
self.get_balance(current_chain_tip)
self.get_balance(current_tip_for_time_lock_calculation)
.await
.map(OutputManagerResponse::Balance)
},
Expand Down Expand Up @@ -406,8 +406,15 @@ where
Ok(())
}

async fn get_balance(&self, current_chain_tip: Option<u64>) -> Result<Balance, OutputManagerError> {
let balance = self.resources.db.get_balance(current_chain_tip).await?;
async fn get_balance(
&self,
current_tip_for_time_lock_calculation: Option<u64>,
) -> Result<Balance, OutputManagerError> {
let balance = self
.resources
.db
.get_balance(current_tip_for_time_lock_calculation)
.await?;
trace!(target: LOG_TARGET, "Balance: {:?}", balance);
Ok(balance)
}
Expand Down Expand Up @@ -965,8 +972,8 @@ where
let enough_spendable = utxos_total_value > amount + fee_with_change;

if !perfect_utxo_selection && !enough_spendable {
let current_chain_tip = chain_metadata.map(|cm| cm.height_of_longest_chain());
let balance = self.get_balance(current_chain_tip).await?;
let current_tip_for_time_lock_calculation = chain_metadata.map(|cm| cm.height_of_longest_chain());
let balance = self.get_balance(current_tip_for_time_lock_calculation).await?;
let pending_incoming = balance.pending_incoming_balance;
if utxos_total_value + pending_incoming >= amount + fee_with_change {
return Err(OutputManagerError::FundsPending);
Expand Down
77 changes: 12 additions & 65 deletions base_layer/wallet/src/output_manager_service/storage/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use std::{
sync::Arc,
};
use tari_common_types::types::{BlindingFactor, Commitment, HashOutput, PrivateKey};
use tari_core::transactions::{tari_amount::MicroTari, transaction::TransactionOutput};
use tari_core::transactions::transaction::TransactionOutput;

const LOG_TARGET: &str = "wallet::output_manager_service::database";

Expand All @@ -51,7 +51,6 @@ pub trait OutputManagerBackend: Send + Sync + Clone {
/// Modify the state the of the backend with a write operation
fn write(&self, op: WriteOperation) -> Result<Option<DbValue>, OutputManagerStorageError>;
fn fetch_pending_incoming_outputs(&self) -> Result<Vec<DbUnblindedOutput>, OutputManagerStorageError>;
fn fetch_pending_outgoing_outputs(&self) -> Result<Vec<DbUnblindedOutput>, OutputManagerStorageError>;

fn set_received_output_mined_height(
&self,
Expand Down Expand Up @@ -119,6 +118,11 @@ pub trait OutputManagerBackend: Send + Sync + Clone {
fn set_coinbase_abandoned(&self, tx_id: TxId, abandoned: bool) -> Result<(), OutputManagerStorageError>;
/// Reinstate a cancelled inbound output
fn reinstate_cancelled_inbound_output(&self, tx_id: TxId) -> Result<(), OutputManagerStorageError>;
/// Return the available, time locked, pending incoming and pending outgoing balance
fn get_balance(
&self,
current_tip_for_time_lock_calculation: Option<u64>,
) -> Result<Balance, OutputManagerStorageError>;
}

/// Holds the state of the KeyManager being used by the Output Manager Service
Expand Down Expand Up @@ -274,71 +278,14 @@ where T: OutputManagerBackend + 'static
Ok(())
}

pub async fn get_balance(&self, current_chain_tip: Option<u64>) -> Result<Balance, OutputManagerStorageError> {
pub async fn get_balance(
&self,
current_tip_for_time_lock_calculation: Option<u64>,
) -> Result<Balance, OutputManagerStorageError> {
let db_clone = self.db.clone();
let db_clone2 = self.db.clone();
let db_clone3 = self.db.clone();
let db_clone4 = self.db.clone();

let unspent_outputs = tokio::task::spawn_blocking(move || match db_clone.fetch(&DbKey::UnspentOutputs) {
Ok(None) => log_error(
DbKey::UnspentOutputs,
OutputManagerStorageError::UnexpectedResult("Could not retrieve unspent outputs".to_string()),
),
Ok(Some(DbValue::UnspentOutputs(uo))) => Ok(uo),
Ok(Some(other)) => unexpected_result(DbKey::UnspentOutputs, other),
Err(e) => log_error(DbKey::UnspentOutputs, e),
})
.await
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??;

let pending_incoming_outputs = tokio::task::spawn_blocking(move || db_clone2.fetch_pending_incoming_outputs())
.await
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??;

let pending_outgoing_outputs = tokio::task::spawn_blocking(move || db_clone3.fetch_pending_outgoing_outputs())
tokio::task::spawn_blocking(move || db_clone.get_balance(current_tip_for_time_lock_calculation))
.await
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??;

let time_locked_balance = if let Some(tip) = current_chain_tip {
let time_locked_outputs = tokio::task::spawn_blocking(move || {
db_clone4.fetch(&DbKey::TimeLockedUnspentOutputs(tip))?.ok_or_else(|| {
OutputManagerStorageError::UnexpectedResult("Time-locked Outputs cannot be retrieved".to_string())
})
})
.await
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))??;
if let DbValue::UnspentOutputs(time_locked_uo) = time_locked_outputs {
Some(
time_locked_uo
.iter()
.fold(MicroTari::from(0), |acc, x| acc + x.unblinded_output.value),
)
} else {
None
}
} else {
None
};

let available_balance = unspent_outputs
.iter()
.fold(MicroTari::from(0), |acc, x| acc + x.unblinded_output.value);

let pending_incoming = pending_incoming_outputs
.iter()
.fold(MicroTari::from(0), |acc, x| acc + x.unblinded_output.value);

let pending_outgoing = pending_outgoing_outputs
.iter()
.fold(MicroTari::from(0), |acc, x| acc + x.unblinded_output.value);

Ok(Balance {
available_balance,
time_locked_balance,
pending_incoming_balance: pending_incoming,
pending_outgoing_balance: pending_outgoing,
})
.map_err(|err| OutputManagerStorageError::BlockingTaskSpawnError(err.to_string()))?
}

/// This method is called when a transaction is built to be sent. It will encumber unspent outputs against a pending
Expand Down
Loading

0 comments on commit 95a7868

Please sign in to comment.