Skip to content

Commit

Permalink
Merge pull request #1104 from TheBlueMatt/2021-10-payment-id-in-monitors
Browse files Browse the repository at this point in the history
Reload pending outbound payments from ChannelMonitors on startup
  • Loading branch information
TheBlueMatt committed Oct 22, 2021
2 parents 107c6c7 + 7af5d12 commit 0a31c12
Show file tree
Hide file tree
Showing 8 changed files with 868 additions and 219 deletions.
95 changes: 95 additions & 0 deletions lightning/src/chain/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1515,6 +1515,101 @@ impl<Signer: Sign> ChannelMonitor<Signer> {

res
}

/// Gets the set of outbound HTLCs which are pending resolution in this channel.
/// This is used to reconstruct pending outbound payments on restart in the ChannelManager.
pub(crate) fn get_pending_outbound_htlcs(&self) -> HashMap<HTLCSource, HTLCOutputInCommitment> {
let mut res = HashMap::new();
let us = self.inner.lock().unwrap();

macro_rules! walk_htlcs {
($holder_commitment: expr, $htlc_iter: expr) => {
for (htlc, source) in $htlc_iter {
if us.htlcs_resolved_on_chain.iter().any(|v| Some(v.input_idx) == htlc.transaction_output_index) {
// We should assert that funding_spend_confirmed is_some() here, but we
// have some unit tests which violate HTLC transaction CSVs entirely and
// would fail.
// TODO: Once tests all connect transactions at consensus-valid times, we
// should assert here like we do in `get_claimable_balances`.
} else if htlc.offered == $holder_commitment {
// If the payment was outbound, check if there's an HTLCUpdate
// indicating we have spent this HTLC with a timeout, claiming it back
// and awaiting confirmations on it.
let htlc_update_confd = us.onchain_events_awaiting_threshold_conf.iter().any(|event| {
if let OnchainEvent::HTLCUpdate { input_idx: Some(input_idx), .. } = event.event {
// If the HTLC was timed out, we wait for ANTI_REORG_DELAY blocks
// before considering it "no longer pending" - this matches when we
// provide the ChannelManager an HTLC failure event.
Some(input_idx) == htlc.transaction_output_index &&
us.best_block.height() >= event.height + ANTI_REORG_DELAY - 1
} else if let OnchainEvent::HTLCSpendConfirmation { input_idx, .. } = event.event {
// If the HTLC was fulfilled with a preimage, we consider the HTLC
// immediately non-pending, matching when we provide ChannelManager
// the preimage.
Some(input_idx) == htlc.transaction_output_index
} else { false }
});
if !htlc_update_confd {
res.insert(source.clone(), htlc.clone());
}
}
}
}
}

// We're only concerned with the confirmation count of HTLC transactions, and don't
// actually care how many confirmations a commitment transaction may or may not have. Thus,
// we look for either a FundingSpendConfirmation event or a funding_spend_confirmed.
let confirmed_txid = us.funding_spend_confirmed.or_else(|| {
us.onchain_events_awaiting_threshold_conf.iter().find_map(|event| {
if let OnchainEvent::FundingSpendConfirmation { .. } = event.event {
Some(event.txid)
} else { None }
})
});
if let Some(txid) = confirmed_txid {
if Some(txid) == us.current_counterparty_commitment_txid || Some(txid) == us.prev_counterparty_commitment_txid {
walk_htlcs!(false, us.counterparty_claimable_outpoints.get(&txid).unwrap().iter().filter_map(|(a, b)| {
if let &Some(ref source) = b {
Some((a, &**source))
} else { None }
}));
} else if txid == us.current_holder_commitment_tx.txid {
walk_htlcs!(true, us.current_holder_commitment_tx.htlc_outputs.iter().filter_map(|(a, _, c)| {
if let Some(source) = c { Some((a, source)) } else { None }
}));
} else if let Some(prev_commitment) = &us.prev_holder_signed_commitment_tx {
if txid == prev_commitment.txid {
walk_htlcs!(true, prev_commitment.htlc_outputs.iter().filter_map(|(a, _, c)| {
if let Some(source) = c { Some((a, source)) } else { None }
}));
}
}
} else {
// If we have not seen a commitment transaction on-chain (ie the channel is not yet
// closed), just examine the available counterparty commitment transactions. See docs
// on `fail_unbroadcast_htlcs`, below, for justification.
macro_rules! walk_counterparty_commitment {
($txid: expr) => {
if let Some(ref latest_outpoints) = us.counterparty_claimable_outpoints.get($txid) {
for &(ref htlc, ref source_option) in latest_outpoints.iter() {
if let &Some(ref source) = source_option {
res.insert((**source).clone(), htlc.clone());
}
}
}
}
}
if let Some(ref txid) = us.current_counterparty_commitment_txid {
walk_counterparty_commitment!(txid);
}
if let Some(ref txid) = us.prev_counterparty_commitment_txid {
walk_counterparty_commitment!(txid);
}
}

res
}
}

/// Compares a broadcasted commitment transaction's HTLCs with those in the latest state,
Expand Down
108 changes: 88 additions & 20 deletions lightning/src/ln/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,29 @@ pub enum UpdateFulfillCommitFetch {
DuplicateClaim {},
}

/// The return value of `revoke_and_ack` on success, primarily updates to other channels or HTLC
/// state.
pub(super) struct RAAUpdates {
pub commitment_update: Option<msgs::CommitmentUpdate>,
pub accepted_htlcs: Vec<(PendingHTLCInfo, u64)>,
pub failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
pub finalized_claimed_htlcs: Vec<HTLCSource>,
pub monitor_update: ChannelMonitorUpdate,
pub holding_cell_failed_htlcs: Vec<(HTLCSource, PaymentHash)>,
}

/// The return value of `monitor_updating_restored`
pub(super) struct MonitorRestoreUpdates {
pub raa: Option<msgs::RevokeAndACK>,
pub commitment_update: Option<msgs::CommitmentUpdate>,
pub order: RAACommitmentOrder,
pub accepted_htlcs: Vec<(PendingHTLCInfo, u64)>,
pub failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
pub finalized_claimed_htlcs: Vec<HTLCSource>,
pub funding_broadcastable: Option<Transaction>,
pub funding_locked: Option<msgs::FundingLocked>,
}

/// If the majority of the channels funds are to the fundee and the initiator holds only just
/// enough funds to cover their reserve value, channels are at risk of getting "stuck". Because the
/// initiator controls the feerate, if they then go to increase the channel fee, they may have no
Expand Down Expand Up @@ -406,6 +429,7 @@ pub(super) struct Channel<Signer: Sign> {
monitor_pending_commitment_signed: bool,
monitor_pending_forwards: Vec<(PendingHTLCInfo, u64)>,
monitor_pending_failures: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
monitor_pending_finalized_fulfills: Vec<HTLCSource>,

// pending_update_fee is filled when sending and receiving update_fee.
//
Expand Down Expand Up @@ -692,6 +716,7 @@ impl<Signer: Sign> Channel<Signer> {
monitor_pending_commitment_signed: false,
monitor_pending_forwards: Vec::new(),
monitor_pending_failures: Vec::new(),
monitor_pending_finalized_fulfills: Vec::new(),

#[cfg(debug_assertions)]
holder_max_commitment_tx_output: Mutex::new((channel_value_satoshis * 1000 - push_msat, push_msat)),
Expand Down Expand Up @@ -955,6 +980,7 @@ impl<Signer: Sign> Channel<Signer> {
monitor_pending_commitment_signed: false,
monitor_pending_forwards: Vec::new(),
monitor_pending_failures: Vec::new(),
monitor_pending_finalized_fulfills: Vec::new(),

#[cfg(debug_assertions)]
holder_max_commitment_tx_output: Mutex::new((msg.push_msat, msg.funding_satoshis * 1000 - msg.push_msat)),
Expand Down Expand Up @@ -2711,7 +2737,7 @@ impl<Signer: Sign> Channel<Signer> {
/// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
/// generating an appropriate error *after* the channel state has been updated based on the
/// revoke_and_ack message.
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Option<msgs::CommitmentUpdate>, Vec<(PendingHTLCInfo, u64)>, Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, ChannelMonitorUpdate, Vec<(HTLCSource, PaymentHash)>), ChannelError>
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<RAAUpdates, ChannelError>
where L::Target: Logger,
{
if (self.channel_state & (ChannelState::ChannelFunded as u32)) != (ChannelState::ChannelFunded as u32) {
Expand Down Expand Up @@ -2777,6 +2803,7 @@ impl<Signer: Sign> Channel<Signer> {
log_trace!(logger, "Updating HTLCs on receipt of RAA in channel {}...", log_bytes!(self.channel_id()));
let mut to_forward_infos = Vec::new();
let mut revoked_htlcs = Vec::new();
let mut finalized_claimed_htlcs = Vec::new();
let mut update_fail_htlcs = Vec::new();
let mut update_fail_malformed_htlcs = Vec::new();
let mut require_commitment = false;
Expand All @@ -2803,6 +2830,7 @@ impl<Signer: Sign> Channel<Signer> {
if let Some(reason) = fail_reason.clone() { // We really want take() here, but, again, non-mut ref :(
revoked_htlcs.push((htlc.source.clone(), htlc.payment_hash, reason));
} else {
finalized_claimed_htlcs.push(htlc.source.clone());
// They fulfilled, so we sent them money
value_to_self_msat_diff -= htlc.amount_msat as i64;
}
Expand Down Expand Up @@ -2899,8 +2927,14 @@ impl<Signer: Sign> Channel<Signer> {
}
self.monitor_pending_forwards.append(&mut to_forward_infos);
self.monitor_pending_failures.append(&mut revoked_htlcs);
self.monitor_pending_finalized_fulfills.append(&mut finalized_claimed_htlcs);
log_debug!(logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id()));
return Ok((None, Vec::new(), Vec::new(), monitor_update, Vec::new()))
return Ok(RAAUpdates {
commitment_update: None, finalized_claimed_htlcs: Vec::new(),
accepted_htlcs: Vec::new(), failed_htlcs: Vec::new(),
monitor_update,
holding_cell_failed_htlcs: Vec::new()
});
}

match self.free_holding_cell_htlcs(logger)? {
Expand All @@ -2919,7 +2953,14 @@ impl<Signer: Sign> Channel<Signer> {
self.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);

Ok((Some(commitment_update), to_forward_infos, revoked_htlcs, monitor_update, htlcs_to_fail))
Ok(RAAUpdates {
commitment_update: Some(commitment_update),
finalized_claimed_htlcs,
accepted_htlcs: to_forward_infos,
failed_htlcs: revoked_htlcs,
monitor_update,
holding_cell_failed_htlcs: htlcs_to_fail
})
},
(None, htlcs_to_fail) => {
if require_commitment {
Expand All @@ -2932,17 +2973,27 @@ impl<Signer: Sign> Channel<Signer> {

log_debug!(logger, "Received a valid revoke_and_ack for channel {}. Responding with a commitment update with {} HTLCs failed.",
log_bytes!(self.channel_id()), update_fail_htlcs.len() + update_fail_malformed_htlcs.len());
Ok((Some(msgs::CommitmentUpdate {
update_add_htlcs: Vec::new(),
update_fulfill_htlcs: Vec::new(),
update_fail_htlcs,
update_fail_malformed_htlcs,
update_fee: None,
commitment_signed
}), to_forward_infos, revoked_htlcs, monitor_update, htlcs_to_fail))
Ok(RAAUpdates {
commitment_update: Some(msgs::CommitmentUpdate {
update_add_htlcs: Vec::new(),
update_fulfill_htlcs: Vec::new(),
update_fail_htlcs,
update_fail_malformed_htlcs,
update_fee: None,
commitment_signed
}),
finalized_claimed_htlcs,
accepted_htlcs: to_forward_infos, failed_htlcs: revoked_htlcs,
monitor_update, holding_cell_failed_htlcs: htlcs_to_fail
})
} else {
log_debug!(logger, "Received a valid revoke_and_ack for channel {} with no reply necessary.", log_bytes!(self.channel_id()));
Ok((None, to_forward_infos, revoked_htlcs, monitor_update, htlcs_to_fail))
Ok(RAAUpdates {
commitment_update: None,
finalized_claimed_htlcs,
accepted_htlcs: to_forward_infos, failed_htlcs: revoked_htlcs,
monitor_update, holding_cell_failed_htlcs: htlcs_to_fail
})
}
}
}
Expand Down Expand Up @@ -3057,18 +3108,23 @@ impl<Signer: Sign> Channel<Signer> {
/// which failed. The messages which were generated from that call which generated the
/// monitor update failure must *not* have been sent to the remote end, and must instead
/// have been dropped. They will be regenerated when monitor_updating_restored is called.
pub fn monitor_update_failed(&mut self, resend_raa: bool, resend_commitment: bool, mut pending_forwards: Vec<(PendingHTLCInfo, u64)>, mut pending_fails: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>) {
pub fn monitor_update_failed(&mut self, resend_raa: bool, resend_commitment: bool,
mut pending_forwards: Vec<(PendingHTLCInfo, u64)>,
mut pending_fails: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>,
mut pending_finalized_claimed_htlcs: Vec<HTLCSource>
) {
self.monitor_pending_revoke_and_ack |= resend_raa;
self.monitor_pending_commitment_signed |= resend_commitment;
self.monitor_pending_forwards.append(&mut pending_forwards);
self.monitor_pending_failures.append(&mut pending_fails);
self.monitor_pending_finalized_fulfills.append(&mut pending_finalized_claimed_htlcs);
self.channel_state |= ChannelState::MonitorUpdateFailed as u32;
}

/// Indicates that the latest ChannelMonitor update has been committed by the client
/// successfully and we should restore normal operation. Returns messages which should be sent
/// to the remote side.
pub fn monitor_updating_restored<L: Deref>(&mut self, logger: &L) -> (Option<msgs::RevokeAndACK>, Option<msgs::CommitmentUpdate>, RAACommitmentOrder, Vec<(PendingHTLCInfo, u64)>, Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, Option<Transaction>, Option<msgs::FundingLocked>) where L::Target: Logger {
pub fn monitor_updating_restored<L: Deref>(&mut self, logger: &L) -> MonitorRestoreUpdates where L::Target: Logger {
assert_eq!(self.channel_state & ChannelState::MonitorUpdateFailed as u32, ChannelState::MonitorUpdateFailed as u32);
self.channel_state &= !(ChannelState::MonitorUpdateFailed as u32);

Expand All @@ -3091,15 +3147,20 @@ impl<Signer: Sign> Channel<Signer> {
})
} else { None };

let mut forwards = Vec::new();
mem::swap(&mut forwards, &mut self.monitor_pending_forwards);
let mut failures = Vec::new();
mem::swap(&mut failures, &mut self.monitor_pending_failures);
let mut accepted_htlcs = Vec::new();
mem::swap(&mut accepted_htlcs, &mut self.monitor_pending_forwards);
let mut failed_htlcs = Vec::new();
mem::swap(&mut failed_htlcs, &mut self.monitor_pending_failures);
let mut finalized_claimed_htlcs = Vec::new();
mem::swap(&mut finalized_claimed_htlcs, &mut self.monitor_pending_finalized_fulfills);

if self.channel_state & (ChannelState::PeerDisconnected as u32) != 0 {
self.monitor_pending_revoke_and_ack = false;
self.monitor_pending_commitment_signed = false;
return (None, None, RAACommitmentOrder::RevokeAndACKFirst, forwards, failures, funding_broadcastable, funding_locked);
return MonitorRestoreUpdates {
raa: None, commitment_update: None, order: RAACommitmentOrder::RevokeAndACKFirst,
accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, funding_broadcastable, funding_locked
};
}

let raa = if self.monitor_pending_revoke_and_ack {
Expand All @@ -3116,7 +3177,9 @@ impl<Signer: Sign> Channel<Signer> {
log_bytes!(self.channel_id()), if funding_broadcastable.is_some() { "a funding broadcastable, " } else { "" },
if commitment_update.is_some() { "a" } else { "no" }, if raa.is_some() { "an" } else { "no" },
match order { RAACommitmentOrder::CommitmentFirst => "commitment", RAACommitmentOrder::RevokeAndACKFirst => "RAA"});
(raa, commitment_update, order, forwards, failures, funding_broadcastable, funding_locked)
MonitorRestoreUpdates {
raa, commitment_update, order, accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, funding_broadcastable, funding_locked
}
}

pub fn update_fee<F: Deref>(&mut self, fee_estimator: &F, msg: &msgs::UpdateFee) -> Result<(), ChannelError>
Expand Down Expand Up @@ -5176,6 +5239,7 @@ impl<Signer: Sign> Writeable for Channel<Signer> {
(5, self.config, required),
(7, self.shutdown_scriptpubkey, option),
(9, self.target_closing_feerate_sats_per_kw, option),
(11, self.monitor_pending_finalized_fulfills, vec_type),
});

Ok(())
Expand Down Expand Up @@ -5409,13 +5473,15 @@ impl<'a, Signer: Sign, K: Deref> ReadableArgs<&'a K> for Channel<Signer>

let mut announcement_sigs = None;
let mut target_closing_feerate_sats_per_kw = None;
let mut monitor_pending_finalized_fulfills = Some(Vec::new());
read_tlv_fields!(reader, {
(0, announcement_sigs, option),
(1, minimum_depth, option),
(3, counterparty_selected_channel_reserve_satoshis, option),
(5, config, option), // Note that if none is provided we will *not* overwrite the existing one.
(7, shutdown_scriptpubkey, option),
(9, target_closing_feerate_sats_per_kw, option),
(11, monitor_pending_finalized_fulfills, vec_type),
});

let mut secp_ctx = Secp256k1::new();
Expand Down Expand Up @@ -5451,6 +5517,7 @@ impl<'a, Signer: Sign, K: Deref> ReadableArgs<&'a K> for Channel<Signer>
monitor_pending_commitment_signed,
monitor_pending_forwards,
monitor_pending_failures,
monitor_pending_finalized_fulfills: monitor_pending_finalized_fulfills.unwrap(),

pending_update_fee,
holding_cell_update_fee,
Expand Down Expand Up @@ -5700,6 +5767,7 @@ mod tests {
session_priv: SecretKey::from_slice(&hex::decode("0fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff").unwrap()[..]).unwrap(),
first_hop_htlc_msat: 548,
payment_id: PaymentId([42; 32]),
payment_secret: None,
}
});

Expand Down
Loading

0 comments on commit 0a31c12

Please sign in to comment.