Skip to content

Commit

Permalink
Merge pull request #1178 from jkczyz/2021-11-payment-path-successful
Browse files Browse the repository at this point in the history
Generate PaymentPathSuccessful event for each path
  • Loading branch information
TheBlueMatt committed Nov 23, 2021
2 parents 2b78957 + 2c4f16d commit 19191b4
Show file tree
Hide file tree
Showing 10 changed files with 244 additions and 128 deletions.
1 change: 1 addition & 0 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,7 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out) {
}
},
events::Event::PaymentSent { .. } => {},
events::Event::PaymentPathSuccessful { .. } => {},
events::Event::PaymentPathFailed { .. } => {},
events::Event::PaymentForwarded { .. } if $node == 1 => {},
events::Event::PendingHTLCsForwardable { .. } => {
Expand Down
63 changes: 27 additions & 36 deletions lightning/src/ln/chanmon_update_fail_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ fn do_test_monitor_temporary_update_fail(disconnect_count: usize) {
nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_second_revoke_and_ack);
assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty());
check_added_monitors!(nodes[0], 1);
expect_payment_path_successful!(nodes[0]);

expect_pending_htlcs_forwardable!(nodes[1]);

Expand Down Expand Up @@ -1090,12 +1091,12 @@ fn test_monitor_update_fail_reestablish() {
let chan_1 = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
create_announced_chan_between_nodes(&nodes, 1, 2, InitFeatures::known(), InitFeatures::known());

let (our_payment_preimage, _, _) = route_payment(&nodes[0], &[&nodes[1], &nodes[2]], 1000000);
let (payment_preimage, _, _) = route_payment(&nodes[0], &[&nodes[1], &nodes[2]], 1000000);

nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);

assert!(nodes[2].node.claim_funds(our_payment_preimage));
assert!(nodes[2].node.claim_funds(payment_preimage));
check_added_monitors!(nodes[2], 1);
let mut updates = get_htlc_update_msgs!(nodes[2], nodes[1].node.get_our_node_id());
assert!(updates.update_add_htlcs.is_empty());
Expand Down Expand Up @@ -1159,13 +1160,7 @@ fn test_monitor_update_fail_reestablish() {
assert_eq!(updates.update_fulfill_htlcs.len(), 1);
nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
commitment_signed_dance!(nodes[0], nodes[1], updates.commitment_signed, false);

let events = nodes[0].node.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
match events[0] {
Event::PaymentSent { payment_preimage, .. } => assert_eq!(payment_preimage, our_payment_preimage),
_ => panic!("Unexpected event"),
}
expect_payment_sent!(nodes[0], payment_preimage);
}

#[test]
Expand Down Expand Up @@ -1300,7 +1295,7 @@ fn claim_while_disconnected_monitor_update_fail() {
let channel_id = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()).2;

// Forward a payment for B to claim
let (payment_preimage_1, payment_hash_1, _) = route_payment(&nodes[0], &[&nodes[1]], 1000000);
let (payment_preimage_1, _, _) = route_payment(&nodes[0], &[&nodes[1]], 1000000);

nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
Expand Down Expand Up @@ -1395,16 +1390,7 @@ fn claim_while_disconnected_monitor_update_fail() {

nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_raa);
check_added_monitors!(nodes[0], 1);

let events = nodes[0].node.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
match events[0] {
Event::PaymentSent { ref payment_preimage, ref payment_hash, .. } => {
assert_eq!(*payment_preimage, payment_preimage_1);
assert_eq!(*payment_hash, payment_hash_1);
},
_ => panic!("Unexpected event"),
}
expect_payment_sent!(nodes[0], payment_preimage_1);

claim_payment(&nodes[0], &[&nodes[1]], payment_preimage_2);
}
Expand Down Expand Up @@ -1766,7 +1752,7 @@ fn monitor_update_claim_fail_no_response() {
let channel_id = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()).2;

// Forward a payment for B to claim
let (payment_preimage_1, payment_hash_1, _) = route_payment(&nodes[0], &[&nodes[1]], 1000000);
let (payment_preimage_1, _, _) = route_payment(&nodes[0], &[&nodes[1]], 1000000);

// Now start forwarding a second payment, skipping the last RAA so B is in AwaitingRAA
let (route, payment_hash_2, payment_preimage_2, payment_secret_2) = get_route_and_payment_hash!(nodes[0], nodes[1], 1000000);
Expand Down Expand Up @@ -1802,16 +1788,7 @@ fn monitor_update_claim_fail_no_response() {
let bs_updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &bs_updates.update_fulfill_htlcs[0]);
commitment_signed_dance!(nodes[0], nodes[1], bs_updates.commitment_signed, false);

let events = nodes[0].node.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
match events[0] {
Event::PaymentSent { ref payment_preimage, ref payment_hash, .. } => {
assert_eq!(*payment_preimage, payment_preimage_1);
assert_eq!(*payment_hash, payment_hash_1);
},
_ => panic!("Unexpected event"),
}
expect_payment_sent!(nodes[0], payment_preimage_1);

claim_payment(&nodes[0], &[&nodes[1]], payment_preimage_2);
}
Expand Down Expand Up @@ -2323,7 +2300,7 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) {
assert!(updates.update_fee.is_none());
assert_eq!(updates.update_fulfill_htlcs.len(), 1);
nodes[1].node.handle_update_fulfill_htlc(&nodes[0].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
expect_payment_sent!(nodes[1], payment_preimage_0);
expect_payment_sent_without_paths!(nodes[1], payment_preimage_0);
assert_eq!(updates.update_add_htlcs.len(), 1);
nodes[1].node.handle_update_add_htlc(&nodes[0].node.get_our_node_id(), &updates.update_add_htlcs[0]);
updates.commitment_signed
Expand All @@ -2342,7 +2319,18 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) {

commitment_signed_dance!(nodes[1], nodes[0], (), false, true, false);

expect_pending_htlcs_forwardable!(nodes[1]);
let events = nodes[1].node.get_and_clear_pending_events();
assert_eq!(events.len(), 2);
match events[0] {
Event::PendingHTLCsForwardable { .. } => { },
_ => panic!("Unexpected event"),
};
match events[1] {
Event::PaymentPathSuccessful { .. } => { },
_ => panic!("Unexpected event"),
};

nodes[1].node.process_pending_htlc_forwards();
expect_payment_received!(nodes[1], payment_hash_2, payment_secret_2, 100000);

claim_payment(&nodes[0], &[&nodes[1]], payment_preimage_1);
Expand Down Expand Up @@ -2427,9 +2415,10 @@ fn do_test_reconnect_dup_htlc_claims(htlc_status: HTLCStatusAtDupClaim, second_f
bs_updates = Some(get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()));
assert_eq!(bs_updates.as_ref().unwrap().update_fulfill_htlcs.len(), 1);
nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &bs_updates.as_ref().unwrap().update_fulfill_htlcs[0]);
expect_payment_sent!(nodes[0], payment_preimage);
expect_payment_sent_without_paths!(nodes[0], payment_preimage);
if htlc_status == HTLCStatusAtDupClaim::Cleared {
commitment_signed_dance!(nodes[0], nodes[1], &bs_updates.as_ref().unwrap().commitment_signed, false);
expect_payment_path_successful!(nodes[0]);
}
} else {
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
Expand All @@ -2453,10 +2442,11 @@ fn do_test_reconnect_dup_htlc_claims(htlc_status: HTLCStatusAtDupClaim, second_f
bs_updates = Some(get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id()));
assert_eq!(bs_updates.as_ref().unwrap().update_fulfill_htlcs.len(), 1);
nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &bs_updates.as_ref().unwrap().update_fulfill_htlcs[0]);
expect_payment_sent!(nodes[0], payment_preimage);
expect_payment_sent_without_paths!(nodes[0], payment_preimage);
}
if htlc_status != HTLCStatusAtDupClaim::Cleared {
commitment_signed_dance!(nodes[0], nodes[1], &bs_updates.as_ref().unwrap().commitment_signed, false);
expect_payment_path_successful!(nodes[0]);
}
}

Expand Down Expand Up @@ -2620,7 +2610,7 @@ fn double_temp_error() {
assert_eq!(node_id, nodes[0].node.get_our_node_id());
nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &update_fulfill_1);
check_added_monitors!(nodes[0], 0);
expect_payment_sent!(nodes[0], payment_preimage_1);
expect_payment_sent_without_paths!(nodes[0], payment_preimage_1);
nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &commitment_signed_b1);
check_added_monitors!(nodes[0], 1);
nodes[0].node.process_pending_htlc_forwards();
Expand Down Expand Up @@ -2658,6 +2648,7 @@ fn double_temp_error() {
};
nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &raa_b2);
check_added_monitors!(nodes[0], 1);
expect_payment_path_successful!(nodes[0]);

nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &update_fulfill_2);
check_added_monitors!(nodes[0], 0);
Expand Down
99 changes: 76 additions & 23 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ pub(crate) enum PendingOutboundPayment {
/// and add a pending payment that was already fulfilled.
Fulfilled {
session_privs: HashSet<[u8; 32]>,
payment_hash: Option<PaymentHash>,
},
}

Expand All @@ -475,23 +476,32 @@ impl PendingOutboundPayment {
}
}

fn payment_hash(&self) -> Option<PaymentHash> {
match self {
PendingOutboundPayment::Legacy { .. } => None,
PendingOutboundPayment::Retryable { payment_hash, .. } => Some(*payment_hash),
PendingOutboundPayment::Fulfilled { payment_hash, .. } => *payment_hash,
}
}

fn mark_fulfilled(&mut self) {
let mut session_privs = HashSet::new();
core::mem::swap(&mut session_privs, match self {
PendingOutboundPayment::Legacy { session_privs } |
PendingOutboundPayment::Retryable { session_privs, .. } |
PendingOutboundPayment::Fulfilled { session_privs }
PendingOutboundPayment::Fulfilled { session_privs, .. }
=> session_privs
});
*self = PendingOutboundPayment::Fulfilled { session_privs };
let payment_hash = self.payment_hash();
*self = PendingOutboundPayment::Fulfilled { session_privs, payment_hash };
}

/// panics if path is None and !self.is_fulfilled
fn remove(&mut self, session_priv: &[u8; 32], path: Option<&Vec<RouteHop>>) -> bool {
let remove_res = match self {
PendingOutboundPayment::Legacy { session_privs } |
PendingOutboundPayment::Retryable { session_privs, .. } |
PendingOutboundPayment::Fulfilled { session_privs } => {
PendingOutboundPayment::Fulfilled { session_privs, .. } => {
session_privs.remove(session_priv)
}
};
Expand Down Expand Up @@ -532,7 +542,7 @@ impl PendingOutboundPayment {
match self {
PendingOutboundPayment::Legacy { session_privs } |
PendingOutboundPayment::Retryable { session_privs, .. } |
PendingOutboundPayment::Fulfilled { session_privs } => {
PendingOutboundPayment::Fulfilled { session_privs, .. } => {
session_privs.len()
}
}
Expand Down Expand Up @@ -3493,14 +3503,23 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
}

fn finalize_claims(&self, mut sources: Vec<HTLCSource>) {
let mut pending_events = self.pending_events.lock().unwrap();
for source in sources.drain(..) {
if let HTLCSource::OutboundRoute { session_priv, payment_id, .. } = source {
if let HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } = source {
let mut session_priv_bytes = [0; 32];
session_priv_bytes.copy_from_slice(&session_priv[..]);
let mut outbounds = self.pending_outbound_payments.lock().unwrap();
if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) {
assert!(payment.get().is_fulfilled());
payment.get_mut().remove(&session_priv_bytes, None);
if payment.get_mut().remove(&session_priv_bytes, None) {
pending_events.push(
events::Event::PaymentPathSuccessful {
payment_id,
payment_hash: payment.get().payment_hash(),
path,
}
);
}
if payment.get().remaining_parts() == 0 {
payment.remove();
}
Expand All @@ -3517,32 +3536,43 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
session_priv_bytes.copy_from_slice(&session_priv[..]);
let mut outbounds = self.pending_outbound_payments.lock().unwrap();
if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) {
let found_payment = !payment.get().is_fulfilled();
let fee_paid_msat = payment.get().get_pending_fee_msat();
payment.get_mut().mark_fulfilled();
let mut pending_events = self.pending_events.lock().unwrap();
if !payment.get().is_fulfilled() {
let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner());
let fee_paid_msat = payment.get().get_pending_fee_msat();
pending_events.push(
events::Event::PaymentSent {
payment_id: Some(payment_id),
payment_preimage,
payment_hash,
fee_paid_msat,
}
);
payment.get_mut().mark_fulfilled();
}

if from_onchain {
// We currently immediately remove HTLCs which were fulfilled on-chain.
// This could potentially lead to removing a pending payment too early,
// with a reorg of one block causing us to re-add the fulfilled payment on
// restart.
// TODO: We should have a second monitor event that informs us of payments
// irrevocably fulfilled.
payment.get_mut().remove(&session_priv_bytes, Some(&path));
if payment.get_mut().remove(&session_priv_bytes, Some(&path)) {
let payment_hash = Some(PaymentHash(Sha256::hash(&payment_preimage.0).into_inner()));
pending_events.push(
events::Event::PaymentPathSuccessful {
payment_id,
payment_hash,
path,
}
);
}

if payment.get().remaining_parts() == 0 {
payment.remove();
}
}
if found_payment {
let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner());
self.pending_events.lock().unwrap().push(
events::Event::PaymentSent {
payment_id: Some(payment_id),
payment_preimage,
payment_hash: payment_hash,
fee_paid_msat,
}
);
}
} else {
log_trace!(self.logger, "Received duplicative fulfill for HTLC with payment_preimage {}", log_bytes!(payment_preimage.0));
}
Expand Down Expand Up @@ -4631,6 +4661,11 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
pub fn has_pending_payments(&self) -> bool {
!self.pending_outbound_payments.lock().unwrap().is_empty()
}

#[cfg(test)]
pub fn clear_pending_payments(&self) {
self.pending_outbound_payments.lock().unwrap().clear()
}
}

impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSendEventsProvider for ChannelManager<Signer, M, T, K, F, L>
Expand Down Expand Up @@ -5555,6 +5590,7 @@ impl_writeable_tlv_based_enum_upgradable!(PendingOutboundPayment,
},
(1, Fulfilled) => {
(0, session_privs, required),
(1, payment_hash, option),
},
(2, Retryable) => {
(0, session_privs, required),
Expand Down Expand Up @@ -6323,9 +6359,10 @@ mod tests {
nodes[0].node.handle_revoke_and_ack(&nodes[1].node.get_our_node_id(), &bs_third_raa);
check_added_monitors!(nodes[0], 1);

// Note that successful MPP payments will generate 1 event upon the first path's success. No
// further events will be generated for subsequence path successes.
// Note that successful MPP payments will generate a single PaymentSent event upon the first
// path's success and a PaymentPathSuccessful event for each path's success.
let events = nodes[0].node.get_and_clear_pending_events();
assert_eq!(events.len(), 3);
match events[0] {
Event::PaymentSent { payment_id: ref id, payment_preimage: ref preimage, payment_hash: ref hash, .. } => {
assert_eq!(Some(payment_id), *id);
Expand All @@ -6334,6 +6371,22 @@ mod tests {
},
_ => panic!("Unexpected event"),
}
match events[1] {
Event::PaymentPathSuccessful { payment_id: ref actual_payment_id, ref payment_hash, ref path } => {
assert_eq!(payment_id, *actual_payment_id);
assert_eq!(our_payment_hash, *payment_hash.as_ref().unwrap());
assert_eq!(route.paths[0], *path);
},
_ => panic!("Unexpected event"),
}
match events[2] {
Event::PaymentPathSuccessful { payment_id: ref actual_payment_id, ref payment_hash, ref path } => {
assert_eq!(payment_id, *actual_payment_id);
assert_eq!(our_payment_hash, *payment_hash.as_ref().unwrap());
assert_eq!(route.paths[0], *path);
},
_ => panic!("Unexpected event"),
}
}

#[test]
Expand Down
Loading

0 comments on commit 19191b4

Please sign in to comment.