Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(UTXO): balance event streaming for Electrum clients #2013

Merged
merged 23 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
18dc21b
[WIP] save development state
onur-ozkan Nov 23, 2023
58f35c5
implement scripthash channels between electrum and utxo coin
onur-ozkan Nov 23, 2023
6ab27b6
subscribe to scripthashes
onur-ozkan Nov 24, 2023
5866d3e
[wip] fetch balances when scripthash triggered
onur-ozkan Nov 24, 2023
7f9cb9b
[wip] broadcast balances when they change
onur-ozkan Nov 24, 2023
1abd13f
proper balance event errors on UTXO activations
onur-ozkan Nov 27, 2023
88894e7
create ScripthashNotification types
onur-ozkan Nov 27, 2023
fb864df
add doc-comment for scripthash_notification_sender
onur-ozkan Nov 27, 2023
4d00657
avoid unwrap in `fn scripthash_notification_sender`
onur-ozkan Nov 27, 2023
4831076
do pass by ref for ScripthashNotificationSender
onur-ozkan Nov 27, 2023
a4563f8
impl `get_scripthash_notification_handlers` for MmArc
onur-ozkan Nov 27, 2023
6ab47b2
fix WASM functions
onur-ozkan Nov 27, 2023
4bac4fa
broadcast multiple balances at once
onur-ozkan Nov 27, 2023
37e8a8d
move `SubscriptionNotification` to top in `ElectrumRpcResponseEnum`
onur-ozkan Nov 27, 2023
98d6553
fix review notes
onur-ozkan Nov 29, 2023
aa11435
handle disconnections, new addresses and mpsc capacity
onur-ozkan Nov 30, 2023
c3e264d
fix new address handling TODO
onur-ozkan Nov 30, 2023
255b27e
fix `test_get_new_address` and `test_scan_for_new_addresses` tests
onur-ozkan Nov 30, 2023
681f039
handle subscriptions from rpc_client connections
onur-ozkan Dec 1, 2023
2c3d03c
create utxo common function `address_to_scripthash`
onur-ozkan Dec 5, 2023
cc3dfac
add `prepare_addresses_for_balance_stream_if_enabled` into `HDWalletB…
onur-ozkan Dec 5, 2023
8357033
update `utxo_prepare_addresses_for_balance_stream_if_enabled`
onur-ozkan Dec 5, 2023
a290522
send scripthash event instead of calling rpcs
onur-ozkan Dec 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion mm2src/coins/coin_balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures::compat::Future01CompatExt;
use mm2_err_handle::prelude::*;
use mm2_number::BigDecimal;
#[cfg(test)] use mocktopus::macros::*;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::ops::Range;
use std::{fmt, iter};

Expand Down Expand Up @@ -321,6 +321,12 @@ pub trait HDWalletBalanceOps: HDWalletCoinOps {
let balance = self.known_address_balance(address).await?;
Ok(AddressBalanceStatus::Used(balance))
}

/// Prepares addresses for real time balance streaming if coin balance event is enabled.
async fn prepare_addresses_for_balance_stream_if_enabled(
&self,
addresses: HashSet<Self::Address>,
) -> MmResult<(), String>;
}

#[async_trait]
Expand Down
23 changes: 19 additions & 4 deletions mm2src/coins/rpc_command/get_new_address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub enum GetNewAddressRpcError {
RpcInvalidResponse(String),
#[display(fmt = "HD wallet storage error: {_0}")]
WalletStorageError(String),
#[display(fmt = "Failed scripthash subscription. Error: {_0}")]
FailedScripthashSubscription(String),
#[from_trait(WithTimeout::timeout)]
#[display(fmt = "RPC timed out {_0:?}")]
Timeout(Duration),
Expand Down Expand Up @@ -183,6 +185,7 @@ impl HttpStatusCode for GetNewAddressRpcError {
GetNewAddressRpcError::Transport(_)
| GetNewAddressRpcError::RpcInvalidResponse(_)
| GetNewAddressRpcError::WalletStorageError(_)
| GetNewAddressRpcError::FailedScripthashSubscription(_)
| GetNewAddressRpcError::HwError(_)
| GetNewAddressRpcError::Internal(_) => StatusCode::INTERNAL_SERVER_ERROR,
GetNewAddressRpcError::Timeout(_) => StatusCode::REQUEST_TIMEOUT,
Expand Down Expand Up @@ -380,8 +383,10 @@ pub(crate) mod common_impl {
use super::*;
use crate::coin_balance::{HDAddressBalanceScanner, HDWalletBalanceOps};
use crate::hd_wallet::{HDAccountOps, HDWalletCoinOps, HDWalletOps};
use crate::utxo::UtxoCommonOps;
use crate::{CoinWithDerivationMethod, HDAddress};
use crypto::RpcDerivationPath;
use std::collections::HashSet;
use std::fmt;
use std::ops::DerefMut;

Expand Down Expand Up @@ -435,9 +440,12 @@ pub(crate) mod common_impl {
) -> MmResult<GetNewAddressResponse, GetNewAddressRpcError>
where
ConfirmAddress: HDConfirmAddress,
Coin:
HDWalletBalanceOps + CoinWithDerivationMethod<HDWallet = <Coin as HDWalletCoinOps>::HDWallet> + Send + Sync,
<Coin as HDWalletCoinOps>::Address: fmt::Display,
Coin: UtxoCommonOps
+ HDWalletBalanceOps
+ CoinWithDerivationMethod<HDWallet = <Coin as HDWalletCoinOps>::HDWallet>
+ Send
+ Sync,
<Coin as HDWalletCoinOps>::Address: fmt::Display + Into<keys::Address> + std::hash::Hash + std::cmp::Eq,
{
let hd_wallet = coin.derivation_method().hd_wallet_or_err()?;

Expand All @@ -462,9 +470,16 @@ pub(crate) mod common_impl {
.await?;

let balance = coin.known_address_balance(&address).await?;

let address_as_string = address.to_string();

coin.prepare_addresses_for_balance_stream_if_enabled(HashSet::from([address]))
.await
.map_err(|e| GetNewAddressRpcError::FailedScripthashSubscription(e.to_string()))?;

Ok(GetNewAddressResponse {
new_address: HDAddressBalance {
address: address.to_string(),
address: address_as_string,
derivation_path: RpcDerivationPath(derivation_path),
chain,
balance,
Expand Down
3 changes: 3 additions & 0 deletions mm2src/coins/rpc_command/hd_account_balance_rpc_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub enum HDAccountBalanceRpcError {
WalletStorageError(String),
#[display(fmt = "Electrum/Native RPC invalid response: {}", _0)]
RpcInvalidResponse(String),
#[display(fmt = "Failed scripthash subscription. Error: {_0}")]
FailedScripthashSubscription(String),
#[display(fmt = "Transport: {}", _0)]
Transport(String),
#[display(fmt = "Internal: {}", _0)]
Expand All @@ -44,6 +46,7 @@ impl HttpStatusCode for HDAccountBalanceRpcError {
HDAccountBalanceRpcError::Transport(_)
| HDAccountBalanceRpcError::WalletStorageError(_)
| HDAccountBalanceRpcError::RpcInvalidResponse(_)
| HDAccountBalanceRpcError::FailedScripthashSubscription(_)
| HDAccountBalanceRpcError::Internal(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}
Expand Down
21 changes: 18 additions & 3 deletions mm2src/coins/rpc_command/init_scan_for_new_addresses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,23 @@ pub mod common_impl {
use super::*;
use crate::coin_balance::HDWalletBalanceOps;
use crate::hd_wallet::{HDAccountOps, HDWalletCoinOps, HDWalletOps};
use crate::utxo::UtxoCommonOps;
use crate::CoinWithDerivationMethod;
use std::fmt;
use keys::Address;
use std::collections::HashSet;
use std::ops::DerefMut;
use std::str::FromStr;

pub async fn scan_for_new_addresses_rpc<Coin>(
coin: &Coin,
params: ScanAddressesParams,
) -> MmResult<ScanAddressesResponse, HDAccountBalanceRpcError>
where
Coin: CoinWithDerivationMethod<HDWallet = <Coin as HDWalletCoinOps>::HDWallet> + HDWalletBalanceOps + Sync,
<Coin as HDWalletCoinOps>::Address: fmt::Display,
Coin: UtxoCommonOps
+ CoinWithDerivationMethod<HDWallet = <Coin as HDWalletCoinOps>::HDWallet>
+ HDWalletBalanceOps
+ Sync,
HashSet<<Coin as HDWalletCoinOps>::Address>: From<HashSet<keys::Address>>,
{
let hd_wallet = coin.derivation_method().hd_wallet_or_err()?;

Expand All @@ -157,6 +163,15 @@ pub mod common_impl {
.scan_for_new_addresses(hd_wallet, hd_account.deref_mut(), &address_scanner, gap_limit)
.await?;

let addresses: HashSet<_> = new_addresses
.iter()
.map(|address_balance| Address::from_str(&address_balance.address).expect("Valid address"))
.collect();

coin.prepare_addresses_for_balance_stream_if_enabled(addresses.into())
.await
.map_err(|e| HDAccountBalanceRpcError::FailedScripthashSubscription(e.to_string()))?;

Ok(ScanAddressesResponse {
account_index: account_id,
derivation_path: RpcDerivationPath(account_derivation_path),
Expand Down
18 changes: 12 additions & 6 deletions mm2src/coins/tendermint/tendermint_balance_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ impl EventBehaviour for TendermintCoin {
})
.collect();

let mut balance_updates = vec![];
for denom in denoms {
if let Some((ticker, decimals)) = self.active_ticker_and_decimals_from_denom(&denom) {
let balance_denom = match self.account_balance_for_denom(&self.account_id, denom).await {
Expand All @@ -139,17 +140,22 @@ impl EventBehaviour for TendermintCoin {
}

if broadcast {
let payload = json!({
balance_updates.push(json!({
"ticker": ticker,
"balance": { "spendable": balance_decimal, "unspendable": BigDecimal::default() }
});

ctx.stream_channel_controller
.broadcast(Event::new(Self::EVENT_NAME.to_string(), payload.to_string()))
.await;
}));
}
}
}

if !balance_updates.is_empty() {
ctx.stream_channel_controller
.broadcast(Event::new(
Self::EVENT_NAME.to_string(),
json!(balance_updates).to_string(),
))
.await;
}
}
}
}
Expand Down
19 changes: 17 additions & 2 deletions mm2src/coins/utxo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod rpc_clients;
pub mod slp;
pub mod spv;
pub mod swap_proto_v2_scripts;
pub mod utxo_balance_events;
pub mod utxo_block_header_storage;
pub mod utxo_builder;
pub mod utxo_common;
Expand All @@ -55,7 +56,7 @@ use crypto::{Bip32DerPathOps, Bip32Error, Bip44Chain, ChildNumber, DerivationPat
StandardHDCoinAddress, StandardHDPathError, StandardHDPathToAccount, StandardHDPathToCoin};
use derive_more::Display;
#[cfg(not(target_arch = "wasm32"))] use dirs::home_dir;
use futures::channel::mpsc::{Receiver as AsyncReceiver, Sender as AsyncSender, UnboundedSender};
use futures::channel::mpsc::{Receiver as AsyncReceiver, Sender as AsyncSender, UnboundedReceiver, UnboundedSender};
use futures::compat::Future01CompatExt;
use futures::lock::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard};
use futures01::Future;
Expand All @@ -65,7 +66,7 @@ pub use keys::{Address, AddressFormat as UtxoAddressFormat, AddressHashEnum, Key
Type as ScriptType};
#[cfg(not(target_arch = "wasm32"))]
use lightning_invoice::Currency as LightningCurrency;
use mm2_core::mm_ctx::MmArc;
use mm2_core::mm_ctx::{MmArc, MmWeak};
use mm2_err_handle::prelude::*;
use mm2_metrics::MetricsArc;
use mm2_number::BigDecimal;
Expand Down Expand Up @@ -142,6 +143,15 @@ pub type MatureUnspentMap = HashMap<Address, MatureUnspentList>;
pub type RecentlySpentOutPointsGuard<'a> = AsyncMutexGuard<'a, RecentlySpentOutPoints>;
pub type UtxoHDAddress = HDAddress<Address, Public>;

pub enum ScripthashNotification {
Triggered(String),
SubscribeToAddresses(HashSet<Address>),
RefreshSubscriptions,
}

pub type ScripthashNotificationSender = Option<UnboundedSender<ScripthashNotification>>;
type ScripthashNotificationHandler = Option<Arc<AsyncMutex<UnboundedReceiver<ScripthashNotification>>>>;

#[cfg(windows)]
#[cfg(not(target_arch = "wasm32"))]
fn get_special_folder_path() -> PathBuf {
Expand Down Expand Up @@ -610,6 +620,11 @@ pub struct UtxoCoinFields {
/// This abortable system is used to spawn coin's related futures that should be aborted on coin deactivation
/// and on [`MmArc::stop`].
pub abortable_system: AbortableQueue,
pub(crate) ctx: MmWeak,
/// This is used for balance event streaming implementation for UTXOs.
/// If balance event streaming isn't enabled, this value will always be `None`; otherwise,
/// it will be used for receiving scripthash notifications to re-fetch balances.
scripthash_notification_handler: ScripthashNotificationHandler,
shamardy marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Debug, Display)]
Expand Down
8 changes: 8 additions & 0 deletions mm2src/coins/utxo/qtum.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::utxo_common::utxo_prepare_addresses_for_balance_stream_if_enabled;
use super::*;
use crate::coin_balance::{self, EnableCoinBalanceError, EnabledCoinBalanceParams, HDAccountBalance, HDAddressBalance,
HDWalletBalance, HDWalletBalanceOps};
Expand Down Expand Up @@ -1149,6 +1150,13 @@ impl HDWalletBalanceOps for QtumCoin {
) -> BalanceResult<Vec<(Self::Address, CoinBalance)>> {
utxo_common::addresses_balances(self, addresses).await
}

async fn prepare_addresses_for_balance_stream_if_enabled(
&self,
addresses: HashSet<Self::Address>,
) -> MmResult<(), String> {
utxo_prepare_addresses_for_balance_stream_if_enabled(self, addresses).await
}
}

impl HDWalletCoinWithStorageOps for QtumCoin {
Expand Down
Loading
Loading