From 47e0b834383ad7dcdb280c07cbf75bf486e06ee7 Mon Sep 17 00:00:00 2001 From: Richard Myers Date: Thu, 27 Jul 2023 11:43:31 +0200 Subject: [PATCH] Add quiescence negotiation (#2680) This change adds support for the quiescence negotiation protocol via the new `stfu` message. When a channel is quiescent, both sides will have the same set of signed htlc commitments and a splice can be performed without requiring the channel to be idle. An additional PR is still required to update our splice implementation to properly account for in-flight htlcs. Quiescence should currently only be enabled for compatibility testing. We send a warning and disconnect when a forbidden messages is received during quiescence; a disconnect ends quiescence. If an htlc is fulfilled/failed while quiescent, any preimage will be relayed immediately and the update will be replayed when quiescence ends. We also send a warning and disconnect if both quiescence and splice negotiation are not complete before the quiescence timeout. --------- Co-authored-by: t-bast --- eclair-core/src/main/resources/reference.conf | 3 + .../main/scala/fr/acinq/eclair/Features.scala | 7 + .../scala/fr/acinq/eclair/NodeParams.scala | 3 +- .../fr/acinq/eclair/channel/ChannelData.scala | 39 +- .../eclair/channel/ChannelExceptions.scala | 4 + .../fr/acinq/eclair/channel/Commitments.scala | 13 +- .../fr/acinq/eclair/channel/fsm/Channel.scala | 267 +++++++-- .../channel/fsm/DualFundingHandlers.scala | 10 - .../protocol/LightningMessageCodecs.scala | 6 + .../wire/protocol/LightningMessageTypes.scala | 2 + .../scala/fr/acinq/eclair/TestConstants.scala | 6 +- .../ChannelStateTestsHelperMethods.scala | 4 + .../states/e/NormalQuiescentStateSpec.scala | 525 ++++++++++++++++++ .../states/e/NormalSplicesStateSpec.scala | 26 +- .../protocol/LightningMessageCodecsSpec.scala | 11 + 15 files changed, 827 insertions(+), 99 deletions(-) create mode 100644 eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalQuiescentStateSpec.scala diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index 926f566088..6b1622185b 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -69,6 +69,7 @@ eclair { option_route_blinding = disabled option_shutdown_anysegwit = optional option_dual_fund = disabled + option_quiesce = disabled option_onion_messages = optional option_channel_type = optional option_scid_alias = optional @@ -154,6 +155,8 @@ eclair { max-total-pending-channels-private-nodes = 99 // maximum number of pending channels we will accept from all private nodes channel-opener-whitelist = [] // a list of public keys; we will ignore rate limits on pending channels from these peers } + + quiescence-timeout = 1 minutes // maximum time we will stay quiescent (or wait to reach quiescence) before disconnecting } balance-check-interval = 1 hour diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Features.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Features.scala index 625d6c6c5d..20db6a6c87 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Features.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Features.scala @@ -247,6 +247,12 @@ object Features { val mandatory = 28 } + // TODO: this should also extend NodeFeature once the spec is finalized + case object Quiescence extends Feature with InitFeature { + val rfcName = "option_quiesce" + val mandatory = 34 + } + case object OnionMessages extends Feature with InitFeature with NodeFeature { val rfcName = "option_onion_messages" val mandatory = 38 @@ -316,6 +322,7 @@ object Features { RouteBlinding, ShutdownAnySegwit, DualFunding, + Quiescence, OnionMessages, ChannelType, ScidAlias, diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index 3eab339cec..9b7ce7535c 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -513,7 +513,8 @@ object NodeParams extends Logging { channelOpenerWhitelist = channelOpenerWhitelist, maxPendingChannelsPerPeer = maxPendingChannelsPerPeer, maxTotalPendingChannelsPrivateNodes = maxTotalPendingChannelsPrivateNodes, - remoteRbfLimits = Channel.RemoteRbfLimits(config.getInt("channel.funding.remote-rbf-limits.max-attempts"), config.getInt("channel.funding.remote-rbf-limits.attempt-delta-blocks")) + remoteRbfLimits = Channel.RemoteRbfLimits(config.getInt("channel.funding.remote-rbf-limits.max-attempts"), config.getInt("channel.funding.remote-rbf-limits.attempt-delta-blocks")), + quiescenceTimeout = FiniteDuration(config.getDuration("channel.quiescence-timeout").getSeconds, TimeUnit.SECONDS), ), onChainFeeConf = OnChainFeeConf( feeTargets = feeTargets, diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala index 59d6595716..1b997b9b0a 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala @@ -27,7 +27,7 @@ import fr.acinq.eclair.io.Peer import fr.acinq.eclair.payment.OutgoingPaymentPacket.Upstream import fr.acinq.eclair.transactions.CommitmentSpec import fr.acinq.eclair.transactions.Transactions._ -import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelReady, ChannelReestablish, ChannelUpdate, ClosingSigned, CommitSig, FailureMessage, FundingCreated, FundingSigned, Init, OnionRoutingPacket, OpenChannel, OpenDualFundedChannel, Shutdown, SpliceInit, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFulfillHtlc} +import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelReady, ChannelReestablish, ChannelUpdate, ClosingSigned, CommitSig, FailureMessage, FundingCreated, FundingSigned, Init, OnionRoutingPacket, OpenChannel, OpenDualFundedChannel, Shutdown, SpliceInit, Stfu, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFulfillHtlc} import fr.acinq.eclair.{Alias, BlockHeight, CltvExpiry, CltvExpiryDelta, Features, InitFeature, MilliSatoshi, MilliSatoshiLong, RealShortChannelId, UInt64} import scodec.bits.ByteVector @@ -187,13 +187,14 @@ sealed trait HasReplyToCommand extends Command { def replyTo: ActorRef } sealed trait HasOptionalReplyToCommand extends Command { def replyTo_opt: Option[ActorRef] } sealed trait ForbiddenCommandDuringSplice extends Command +sealed trait ForbiddenCommandDuringQuiescence extends Command -final case class CMD_ADD_HTLC(replyTo: ActorRef, amount: MilliSatoshi, paymentHash: ByteVector32, cltvExpiry: CltvExpiry, onion: OnionRoutingPacket, nextBlindingKey_opt: Option[PublicKey], origin: Origin.Hot, commit: Boolean = false) extends HasReplyToCommand with ForbiddenCommandDuringSplice -sealed trait HtlcSettlementCommand extends HasOptionalReplyToCommand with ForbiddenCommandDuringSplice { def id: Long } +final case class CMD_ADD_HTLC(replyTo: ActorRef, amount: MilliSatoshi, paymentHash: ByteVector32, cltvExpiry: CltvExpiry, onion: OnionRoutingPacket, nextBlindingKey_opt: Option[PublicKey], origin: Origin.Hot, commit: Boolean = false) extends HasReplyToCommand with ForbiddenCommandDuringSplice with ForbiddenCommandDuringQuiescence +sealed trait HtlcSettlementCommand extends HasOptionalReplyToCommand with ForbiddenCommandDuringSplice with ForbiddenCommandDuringQuiescence { def id: Long } final case class CMD_FULFILL_HTLC(id: Long, r: ByteVector32, commit: Boolean = false, replyTo_opt: Option[ActorRef] = None) extends HtlcSettlementCommand final case class CMD_FAIL_HTLC(id: Long, reason: Either[ByteVector, FailureMessage], delay_opt: Option[FiniteDuration] = None, commit: Boolean = false, replyTo_opt: Option[ActorRef] = None) extends HtlcSettlementCommand final case class CMD_FAIL_MALFORMED_HTLC(id: Long, onionHash: ByteVector32, failureCode: Int, commit: Boolean = false, replyTo_opt: Option[ActorRef] = None) extends HtlcSettlementCommand -final case class CMD_UPDATE_FEE(feeratePerKw: FeeratePerKw, commit: Boolean = false, replyTo_opt: Option[ActorRef] = None) extends HasOptionalReplyToCommand with ForbiddenCommandDuringSplice +final case class CMD_UPDATE_FEE(feeratePerKw: FeeratePerKw, commit: Boolean = false, replyTo_opt: Option[ActorRef] = None) extends HasOptionalReplyToCommand with ForbiddenCommandDuringSplice with ForbiddenCommandDuringQuiescence final case class CMD_SIGN(replyTo_opt: Option[ActorRef] = None) extends HasOptionalReplyToCommand with ForbiddenCommandDuringSplice final case class ClosingFees(preferred: Satoshi, min: Satoshi, max: Satoshi) @@ -202,7 +203,7 @@ final case class ClosingFeerates(preferred: FeeratePerKw, min: FeeratePerKw, max } sealed trait CloseCommand extends HasReplyToCommand -final case class CMD_CLOSE(replyTo: ActorRef, scriptPubKey: Option[ByteVector], feerates: Option[ClosingFeerates]) extends CloseCommand with ForbiddenCommandDuringSplice +final case class CMD_CLOSE(replyTo: ActorRef, scriptPubKey: Option[ByteVector], feerates: Option[ClosingFeerates]) extends CloseCommand with ForbiddenCommandDuringSplice with ForbiddenCommandDuringQuiescence final case class CMD_FORCECLOSE(replyTo: ActorRef) extends CloseCommand final case class CMD_BUMP_FUNDING_FEE(replyTo: akka.actor.typed.ActorRef[CommandResponse[CMD_BUMP_FUNDING_FEE]], targetFeerate: FeeratePerKw, lockTime: Long) extends Command @@ -450,12 +451,32 @@ object RbfStatus { } sealed trait SpliceStatus +/** We're waiting for the channel to be quiescent. */ +sealed trait QuiescenceNegotiation extends SpliceStatus +object QuiescenceNegotiation { + sealed trait Initiator extends QuiescenceNegotiation + sealed trait NonInitiator extends QuiescenceNegotiation +} +/** The channel is quiescent and a splice attempt was initiated. */ +sealed trait QuiescentSpliceStatus extends SpliceStatus object SpliceStatus { case object NoSplice extends SpliceStatus - case class SpliceRequested(cmd: CMD_SPLICE, init: SpliceInit) extends SpliceStatus - case class SpliceInProgress(cmd_opt: Option[CMD_SPLICE], splice: typed.ActorRef[InteractiveTxBuilder.Command], remoteCommitSig: Option[CommitSig]) extends SpliceStatus - case class SpliceWaitingForSigs(signingSession: InteractiveTxSigningSession.WaitingForSigs) extends SpliceStatus - case object SpliceAborted extends SpliceStatus + /** We stop sending new updates and wait for our updates to be added to the local and remote commitments. */ + case class QuiescenceRequested(splice: CMD_SPLICE) extends QuiescenceNegotiation.Initiator + /** Our updates have been added to the local and remote commitments, we wait for our peer to do the same. */ + case class InitiatorQuiescent(splice: CMD_SPLICE) extends QuiescenceNegotiation.Initiator + /** Our peer has asked us to stop sending new updates and wait for our updates to be added to the local and remote commitments. */ + case class ReceivedStfu(stfu: Stfu) extends QuiescenceNegotiation.NonInitiator + /** Our updates have been added to the local and remote commitments, we wait for our peer to use the now quiescent channel. */ + case object NonInitiatorQuiescent extends QuiescentSpliceStatus + /** We told our peer we want to splice funds in the channel. */ + case class SpliceRequested(cmd: CMD_SPLICE, init: SpliceInit) extends QuiescentSpliceStatus + /** We both agreed to splice and are building the splice transaction. */ + case class SpliceInProgress(cmd_opt: Option[CMD_SPLICE], splice: typed.ActorRef[InteractiveTxBuilder.Command], remoteCommitSig: Option[CommitSig]) extends QuiescentSpliceStatus + /** The splice transaction has been negotiated, we're exchanging signatures. */ + case class SpliceWaitingForSigs(signingSession: InteractiveTxSigningSession.WaitingForSigs) extends QuiescentSpliceStatus + /** The splice attempt was aborted by us, we're waiting for our peer to ack. */ + case object SpliceAborted extends QuiescentSpliceStatus } sealed trait ChannelData extends PossiblyHarmful { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelExceptions.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelExceptions.scala index b75cae3c79..da8d8bb645 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelExceptions.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelExceptions.scala @@ -71,6 +71,7 @@ case class InvalidCompleteInteractiveTx (override val channelId: Byte case class TooManyInteractiveTxRounds (override val channelId: ByteVector32) extends ChannelException(channelId, "too many messages exchanged during interactive tx construction") case class RbfAttemptAborted (override val channelId: ByteVector32) extends ChannelException(channelId, "rbf attempt aborted") case class SpliceAttemptAborted (override val channelId: ByteVector32) extends ChannelException(channelId, "splice attempt aborted") +case class SpliceAttemptTimedOut (override val channelId: ByteVector32) extends ChannelException(channelId, "splice attempt took too long, disconnecting") case class DualFundingAborted (override val channelId: ByteVector32) extends ChannelException(channelId, "dual funding aborted") case class UnexpectedInteractiveTxMessage (override val channelId: ByteVector32, msg: InteractiveTxMessage) extends ChannelException(channelId, s"unexpected interactive-tx message (${msg.getClass.getSimpleName})") case class UnexpectedFundingSignatures (override val channelId: ByteVector32) extends ChannelException(channelId, "unexpected funding signatures (tx_signatures)") @@ -84,6 +85,7 @@ case class InvalidRbfTxAbortNotAcked (override val channelId: Byte case class InvalidRbfAttemptsExhausted (override val channelId: ByteVector32, maxAttempts: Int) extends ChannelException(channelId, s"invalid rbf attempt: $maxAttempts/$maxAttempts attempts already published") case class InvalidRbfAttemptTooSoon (override val channelId: ByteVector32, previousAttempt: BlockHeight, nextAttempt: BlockHeight) extends ChannelException(channelId, s"invalid rbf attempt: last attempt made at block=$previousAttempt, next attempt available after block=$nextAttempt") case class InvalidSpliceTxAbortNotAcked (override val channelId: ByteVector32) extends ChannelException(channelId, "invalid splice attempt: our previous tx_abort has not been acked") +case class InvalidSpliceNotQuiescent (override val channelId: ByteVector32) extends ChannelException(channelId, "invalid splice attempt: the channel is not quiescent") case class InvalidRbfTxConfirmed (override val channelId: ByteVector32) extends ChannelException(channelId, "no need to rbf, transaction is already confirmed") case class InvalidRbfNonInitiator (override val channelId: ByteVector32) extends ChannelException(channelId, "cannot initiate rbf: we're not the initiator of this interactive-tx attempt") case class InvalidRbfZeroConf (override val channelId: ByteVector32) extends ChannelException(channelId, "cannot initiate rbf: we're using zero-conf for this interactive-tx attempt") @@ -136,4 +138,6 @@ case class InvalidFailureCode (override val channelId: Byte case class PleasePublishYourCommitment (override val channelId: ByteVector32) extends ChannelException(channelId, "please publish your local commitment") case class CommandUnavailableInThisState (override val channelId: ByteVector32, command: String, state: ChannelState) extends ChannelException(channelId, s"cannot execute command=$command in state=$state") case class ForbiddenDuringSplice (override val channelId: ByteVector32, command: String) extends ChannelException(channelId, s"cannot process $command while splicing") +case class ForbiddenDuringQuiescence (override val channelId: ByteVector32, command: String) extends ChannelException(channelId, s"cannot process $command while quiescent") +case class ConcurrentRemoteSplice (override val channelId: ByteVector32) extends ChannelException(channelId, "splice attempt canceled, remote initiated splice before us") // @formatter:on \ No newline at end of file diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala index f422aba91a..d39129e2a4 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala @@ -117,7 +117,6 @@ case class ChannelParams(channelId: ByteVector32, } /** - * * @param localScriptPubKey local script pubkey (provided in CMD_CLOSE, as an upfront shutdown script, or set to the current final onchain script) * @return an exception if the provided script is not valid */ @@ -132,7 +131,6 @@ case class ChannelParams(channelId: ByteVector32, } /** - * * @param remoteScriptPubKey remote script included in a Shutdown message * @return an exception if the provided script is not valid */ @@ -144,6 +142,9 @@ case class ChannelParams(channelId: ByteVector32, else Right(remoteScriptPubKey) } + /** If both peers support quiescence, we have to exchange stfu when splicing. */ + def useQuiescence: Boolean = Features.canUseFeature(localParams.initFeatures, remoteParams.initFeatures, Features.Quiescence) + } object ChannelParams { @@ -350,7 +351,7 @@ case class Commitment(fundingTxIndex: Long, } } - private def hasNoPendingHtlcs: Boolean = localCommit.spec.htlcs.isEmpty && remoteCommit.spec.htlcs.isEmpty && nextRemoteCommit_opt.isEmpty + def hasNoPendingHtlcs: Boolean = localCommit.spec.htlcs.isEmpty && remoteCommit.spec.htlcs.isEmpty && nextRemoteCommit_opt.isEmpty def hasNoPendingHtlcsOrFeeUpdate(changes: CommitmentChanges): Boolean = hasNoPendingHtlcs && (changes.localChanges.signed ++ changes.localChanges.acked ++ changes.remoteChanges.signed ++ changes.remoteChanges.acked).collectFirst { case _: UpdateFee => true }.isEmpty @@ -359,8 +360,6 @@ case class Commitment(fundingTxIndex: Long, changes.localChanges.all.exists(_.isInstanceOf[UpdateAddHtlc]) || changes.remoteChanges.all.exists(_.isInstanceOf[UpdateAddHtlc]) - def isIdle(changes: CommitmentChanges): Boolean = hasNoPendingHtlcs && changes.localChanges.all.isEmpty && changes.remoteChanges.all.isEmpty - def timedOutOutgoingHtlcs(currentHeight: BlockHeight): Set[UpdateAddHtlc] = { def expired(add: UpdateAddHtlc): Boolean = currentHeight >= add.cltvExpiry.blockHeight @@ -795,8 +794,10 @@ case class Commitments(params: ChannelParams, def add(commitment: Commitment): Commitments = copy(active = commitment +: active) // @formatter:off + def localIsQuiescent: Boolean = changes.localChanges.all.isEmpty + def remoteIsQuiescent: Boolean = changes.remoteChanges.all.isEmpty // HTLCs and pending changes are the same for all active commitments, so we don't need to loop through all of them. - def isIdle: Boolean = active.head.isIdle(changes) + def isQuiescent: Boolean = (params.useQuiescence || active.head.hasNoPendingHtlcs) && localIsQuiescent && remoteIsQuiescent def hasNoPendingHtlcsOrFeeUpdate: Boolean = active.head.hasNoPendingHtlcsOrFeeUpdate(changes) def hasPendingOrProposedHtlcs: Boolean = active.head.hasPendingOrProposedHtlcs(changes) def timedOutOutgoingHtlcs(currentHeight: BlockHeight): Set[UpdateAddHtlc] = active.head.timedOutOutgoingHtlcs(currentHeight) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala index f9e1882b2b..ac4bae80db 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala @@ -22,7 +22,6 @@ import akka.actor.{Actor, ActorContext, ActorRef, FSM, OneForOneStrategy, Possib import akka.event.Logging.MDC import fr.acinq.bitcoin.scalacompat.Crypto.{PrivateKey, PublicKey} import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, SatoshiLong, Transaction} -import fr.acinq.eclair.Features.SplicePrototype import fr.acinq.eclair.Logs.LogCategory import fr.acinq.eclair._ import fr.acinq.eclair.blockchain.OnChainWallet.MakeFundingTxResponse @@ -91,7 +90,8 @@ object Channel { channelOpenerWhitelist: Set[PublicKey], maxPendingChannelsPerPeer: Int, maxTotalPendingChannelsPrivateNodes: Int, - remoteRbfLimits: RemoteRbfLimits) { + remoteRbfLimits: RemoteRbfLimits, + quiescenceTimeout: FiniteDuration) { require(0 <= maxHtlcValueInFlightPercent && maxHtlcValueInFlightPercent <= 100, "max-htlc-value-in-flight-percent must be between 0 and 100") def minFundingSatoshis(announceChannel: Boolean): Satoshi = if (announceChannel) minFundingPublicSatoshis else minFundingPrivateSatoshis @@ -156,6 +156,9 @@ object Channel { // we will receive this message when we waited too long for a revocation for that commit number (NB: we explicitly specify the peer to allow for testing) case class RevocationTimeout(remoteCommitNumber: Long, peer: ActorRef) + // we will receive this message if we waited too long to reach quiescence, or stayed quiescent for too long (NB: we explicitly specify the peer to allow for testing) + case class QuiescenceTimeout(peer: ActorRef) + /** We don't immediately process [[CurrentBlockHeight]] to avoid herd effects */ case class ProcessCurrentBlockHeight(c: CurrentBlockHeight) @@ -363,19 +366,46 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with */ when(NORMAL)(handleExceptions { - case Event(c: ForbiddenCommandDuringSplice, d: DATA_NORMAL) if d.spliceStatus != SpliceStatus.NoSplice => + case Event(c: ForbiddenCommandDuringQuiescence, d: DATA_NORMAL) if d.spliceStatus.isInstanceOf[QuiescenceNegotiation] => + val error = ForbiddenDuringQuiescence(d.channelId, c.getClass.getSimpleName) + c match { + case c: CMD_ADD_HTLC => handleAddHtlcCommandError(c, error, Some(d.channelUpdate)) + // Htlc settlement commands are ignored and will be replayed when not quiescent. + // This could create issues if we're keeping htlcs that should be settled pending for too long, as they could timeout. + // That's why we have a timer to actively disconnect if we stay in quiescence for too long. + case _: HtlcSettlementCommand => stay() + case _ => handleCommandError(error, c) + } + + case Event(c: ForbiddenCommandDuringSplice, d: DATA_NORMAL) if d.spliceStatus.isInstanceOf[QuiescentSpliceStatus] => val error = ForbiddenDuringSplice(d.channelId, c.getClass.getSimpleName) c match { case c: CMD_ADD_HTLC => handleAddHtlcCommandError(c, error, Some(d.channelUpdate)) - // NB: the command cannot be an htlc settlement (fail/fulfill), because if we are splicing it means the channel is idle and has no htlcs + // Htlc settlement commands are ignored and will be replayed when not quiescent. + // This could create issues if we're keeping htlcs that should be settled pending for too long, as they could timeout. + // That's why we have a timer to actively disconnect if we're splicing for too long. + case _: HtlcSettlementCommand => stay() case _ => handleCommandError(error, c) } - case Event(msg: ForbiddenMessageDuringSplice, d: DATA_NORMAL) if d.spliceStatus != SpliceStatus.NoSplice && !d.spliceStatus.isInstanceOf[SpliceStatus.SpliceRequested] => - // In case of a race between our splice_init and a forbidden message from our peer, we accept their message, because - // we know they are going to reject our splice attempt + case Event(msg: ForbiddenMessageDuringSplice, d: DATA_NORMAL) if d.spliceStatus.isInstanceOf[QuiescentSpliceStatus] => + log.warning("received forbidden message {} during splicing with status {}", msg.getClass.getSimpleName, d.spliceStatus.getClass.getSimpleName) val error = ForbiddenDuringSplice(d.channelId, msg.getClass.getSimpleName) - handleLocalError(error, d, Some(msg)) + // We forward preimages as soon as possible to the upstream channel because it allows us to pull funds. + msg match { + case fulfill: UpdateFulfillHtlc => d.commitments.receiveFulfill(fulfill) match { + case Right((_, origin, htlc)) => relayer ! RES_ADD_SETTLED(origin, htlc, HtlcResult.RemoteFulfill(fulfill)) + case _ => () + } + case _ => () + } + // Instead of force-closing (which would cost us on-chain fees), we try to resolve this issue by disconnecting. + // This will abort the splice attempt if it hasn't been signed yet, and restore the channel to a clean state. + // If the splice attempt was signed, it gives us an opportunity to re-exchange signatures on reconnection before + // the forbidden message. It also provides the opportunity for our peer to update their node to get rid of that + // bug and resume normal execution. + context.system.scheduler.scheduleOnce(1 second, peer, Peer.Disconnect(remoteNodeId)) + stay() sending Warning(d.channelId, error.getMessage) case Event(c: CMD_ADD_HTLC, d: DATA_NORMAL) if d.localShutdown.isDefined || d.remoteShutdown.isDefined => // note: spec would allow us to keep sending new htlcs after having received their shutdown (and not sent ours) @@ -537,7 +567,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with watchFundingConfirmed(signingSession.fundingTx.txId, minDepth_opt) val commitments1 = d.commitments.add(signingSession1.commitment) val d1 = d.copy(commitments = commitments1, spliceStatus = SpliceStatus.NoSplice) - stay() using d1 storing() sending signingSession1.localSigs + stay() using d1 storing() sending signingSession1.localSigs calling endQuiescence(d1) } } case _ if d.commitments.params.channelFeatures.hasFeature(Features.DualFunding) && d.commitments.latest.localFundingStatus.signedTx_opt.isEmpty && commit.batchSize == 1 => @@ -562,7 +592,18 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.shortIds, commitments1)) } context.system.eventStream.publish(ChannelSignatureReceived(self, commitments1)) - stay() using d.copy(commitments = commitments1) storing() sending revocation + // If we're now quiescent, we may send our stfu message. + val (d1, toSend) = d.spliceStatus match { + case SpliceStatus.QuiescenceRequested(cmd) if commitments1.localIsQuiescent => + val stfu = Stfu(d.channelId, initiator = true) + (d.copy(commitments = commitments1, spliceStatus = SpliceStatus.InitiatorQuiescent(cmd)), Seq(revocation, stfu)) + case _: SpliceStatus.ReceivedStfu if commitments1.localIsQuiescent => + val stfu = Stfu(d.channelId, initiator = false) + (d.copy(commitments = commitments1, spliceStatus = SpliceStatus.NonInitiatorQuiescent), Seq(revocation, stfu)) + case _ => + (d.copy(commitments = commitments1), Seq(revocation)) + } + stay() using d1 storing() sending toSend case Left(cause) => handleLocalError(cause, d, Some(commit)) } } @@ -784,54 +825,90 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with } case Event(cmd: CMD_SPLICE, d: DATA_NORMAL) => - d.spliceStatus match { - case SpliceStatus.NoSplice => - if (d.commitments.isIdle && d.commitments.params.remoteParams.initFeatures.hasFeature(SplicePrototype)) { - val parentCommitment = d.commitments.latest.commitment - val targetFeerate = nodeParams.onChainFeeConf.getFundingFeerate(nodeParams.currentFeerates) - val fundingContribution = InteractiveTxFunder.computeSpliceContribution( - isInitiator = true, - sharedInput = Multisig2of2Input(parentCommitment), - spliceInAmount = cmd.additionalLocalFunding, - spliceOut = cmd.spliceOutputs, - targetFeerate = targetFeerate) - if (parentCommitment.localCommit.spec.toLocal + fundingContribution < parentCommitment.localChannelReserve(d.commitments.params)) { - log.warning("cannot do splice: insufficient funds") - cmd.replyTo ! RES_FAILURE(cmd, InvalidSpliceRequest(d.channelId)) - stay() - } else if (cmd.spliceOut_opt.map(_.scriptPubKey).exists(!MutualClose.isValidFinalScriptPubkey(_, allowAnySegwit = true))) { - log.warning("cannot do splice: invalid splice-out script") - cmd.replyTo ! RES_FAILURE(cmd, InvalidSpliceRequest(d.channelId)) - stay() + if (d.commitments.params.remoteParams.initFeatures.hasFeature(Features.SplicePrototype)) { + d.spliceStatus match { + case SpliceStatus.NoSplice if d.commitments.params.useQuiescence => + startSingleTimer(QuiescenceTimeout.toString, QuiescenceTimeout(peer), nodeParams.channelConf.revocationTimeout) + if (d.commitments.localIsQuiescent) { + stay() using d.copy(spliceStatus = SpliceStatus.InitiatorQuiescent(cmd)) sending Stfu(d.channelId, initiator = true) } else { - log.info(s"initiating splice with local.in.amount=${cmd.additionalLocalFunding} local.in.push=${cmd.pushAmount} local.out.amount=${cmd.spliceOut_opt.map(_.amount).sum}") - val spliceInit = SpliceInit(d.channelId, - fundingContribution = fundingContribution, - lockTime = nodeParams.currentBlockHeight.toLong, - feerate = targetFeerate, - fundingPubKey = keyManager.fundingPublicKey(d.commitments.params.localParams.fundingKeyPath, parentCommitment.fundingTxIndex + 1).publicKey, - pushAmount = cmd.pushAmount, - requireConfirmedInputs = nodeParams.channelConf.requireConfirmedInputsForDualFunding - ) - stay() using d.copy(spliceStatus = SpliceStatus.SpliceRequested(cmd, spliceInit)) sending spliceInit + stay() using d.copy(spliceStatus = SpliceStatus.QuiescenceRequested(cmd)) } - } else { - log.warning("cannot initiate splice, channel is not idle or peer doesn't support splices") - cmd.replyTo ! RES_FAILURE(cmd, CommandUnavailableInThisState(d.channelId, "splice", NORMAL)) + case SpliceStatus.NoSplice if !d.commitments.params.useQuiescence => + initiateSplice(cmd, d) match { + case Left(f) => + cmd.replyTo ! RES_FAILURE(cmd, f) + stay() + case Right(spliceInit) => + stay() using d.copy(spliceStatus = SpliceStatus.SpliceRequested(cmd, spliceInit)) sending spliceInit + } + case _ => + log.warning("cannot initiate splice, another one is already in progress") + cmd.replyTo ! RES_FAILURE(cmd, InvalidSpliceAlreadyInProgress(d.channelId)) stay() + } + } else { + log.warning("cannot initiate splice, peer doesn't support splices") + cmd.replyTo ! RES_FAILURE(cmd, CommandUnavailableInThisState(d.channelId, "splice", NORMAL)) + stay() + } + + case Event(msg: Stfu, d: DATA_NORMAL) => + if (d.commitments.params.useQuiescence) { + if (d.commitments.remoteIsQuiescent) { + d.spliceStatus match { + case SpliceStatus.NoSplice => + startSingleTimer(QuiescenceTimeout.toString, QuiescenceTimeout(peer), nodeParams.channelConf.quiescenceTimeout) + if (d.commitments.localIsQuiescent) { + stay() using d.copy(spliceStatus = SpliceStatus.NonInitiatorQuiescent) sending Stfu(d.channelId, initiator = false) + } else { + stay() using d.copy(spliceStatus = SpliceStatus.ReceivedStfu(msg)) + } + case SpliceStatus.QuiescenceRequested(cmd) => + // We could keep track of our splice attempt and merge it with the remote splice instead of cancelling it. + // But this is an edge case that should rarely occur, so it's probably not worth the additional complexity. + log.warning("our peer initiated quiescence before us, cancelling our splice attempt") + cmd.replyTo ! RES_FAILURE(cmd, ConcurrentRemoteSplice(d.channelId)) + stay() using d.copy(spliceStatus = SpliceStatus.ReceivedStfu(msg)) + case SpliceStatus.InitiatorQuiescent(cmd) => + // if both sides send stfu at the same time, the quiescence initiator is the channel initiator + if (!msg.initiator || d.commitments.params.localParams.isInitiator) { + initiateSplice(cmd, d) match { + case Left(f) => + cmd.replyTo ! RES_FAILURE(cmd, f) + context.system.scheduler.scheduleOnce(2 second, peer, Peer.Disconnect(remoteNodeId)) + stay() using d.copy(spliceStatus = SpliceStatus.NoSplice) sending Warning(d.channelId, f.getMessage) + case Right(spliceInit) => + stay() using d.copy(spliceStatus = SpliceStatus.SpliceRequested(cmd, spliceInit)) sending spliceInit + } + } else { + log.warning("concurrent stfu received and our peer is the channel initiator, cancelling our splice attempt") + cmd.replyTo ! RES_FAILURE(cmd, ConcurrentRemoteSplice(d.channelId)) + stay() using d.copy(spliceStatus = SpliceStatus.NonInitiatorQuiescent) + } + case _ => + log.warning("ignoring duplicate stfu") + stay() } - case _ => - log.warning("cannot initiate splice, another one is already in progress") - cmd.replyTo ! RES_FAILURE(cmd, InvalidSpliceAlreadyInProgress(d.channelId)) - stay() + } else { + log.warning("our peer sent stfu but is not quiescent") + // NB: we use a small delay to ensure we've sent our warning before disconnecting. + context.system.scheduler.scheduleOnce(2 second, peer, Peer.Disconnect(remoteNodeId)) + stay() using d.copy(spliceStatus = SpliceStatus.NoSplice) sending Warning(d.channelId, InvalidSpliceNotQuiescent(d.channelId).getMessage) + } + } else { + log.warning("ignoring stfu because both peers do not advertise quiescence") + stay() } + case Event(_: QuiescenceTimeout, d: DATA_NORMAL) => handleQuiescenceTimeout(d) + case Event(msg: SpliceInit, d: DATA_NORMAL) => d.spliceStatus match { - case SpliceStatus.NoSplice => - if (!d.commitments.isIdle) { - log.info("rejecting splice request: channel not idle") - stay() using d.copy(spliceStatus = SpliceStatus.SpliceAborted) sending TxAbort(d.channelId, InvalidSpliceRequest(d.channelId).getMessage) + case SpliceStatus.NoSplice | SpliceStatus.NonInitiatorQuiescent => + if (!d.commitments.isQuiescent) { + log.info("rejecting splice request: channel not quiescent") + stay() using d.copy(spliceStatus = SpliceStatus.SpliceAborted) sending TxAbort(d.channelId, InvalidSpliceNotQuiescent(d.channelId).getMessage) } else if (msg.feerate < nodeParams.currentFeerates.minimum) { log.info("rejecting splice request: feerate too low") stay() using d.copy(spliceStatus = SpliceStatus.SpliceAborted) sending TxAbort(d.channelId, InvalidSpliceRequest(d.channelId).getMessage) @@ -870,7 +947,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with case SpliceStatus.SpliceAborted => log.info("rejecting splice attempt: our previous tx_abort was not acked") stay() sending Warning(d.channelId, InvalidSpliceTxAbortNotAcked(d.channelId).getMessage) - case _: SpliceStatus.SpliceRequested | _: SpliceStatus.SpliceInProgress | _: SpliceStatus.SpliceWaitingForSigs => + case _ => log.info("rejecting splice attempt: the current splice attempt must be completed or aborted first") stay() sending Warning(d.channelId, InvalidSpliceAlreadyInProgress(d.channelId).getMessage) } @@ -923,22 +1000,30 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with log.info("our peer aborted the splice attempt: ascii='{}' bin={}", msg.toAscii, msg.data) cmd_opt.foreach(cmd => cmd.replyTo ! RES_FAILURE(cmd, SpliceAttemptAborted(d.channelId))) txBuilder ! InteractiveTxBuilder.Abort - stay() using d.copy(spliceStatus = SpliceStatus.NoSplice) sending TxAbort(d.channelId, SpliceAttemptAborted(d.channelId).getMessage) + stay() using d.copy(spliceStatus = SpliceStatus.NoSplice) sending TxAbort(d.channelId, SpliceAttemptAborted(d.channelId).getMessage) calling endQuiescence(d) case SpliceStatus.SpliceWaitingForSigs(signingSession) => log.info("our peer aborted the splice attempt: ascii='{}' bin={}", msg.toAscii, msg.data) rollbackFundingAttempt(signingSession.fundingTx.tx, previousTxs = Seq.empty) // no splice rbf yet - stay() using d.copy(spliceStatus = SpliceStatus.NoSplice) sending TxAbort(d.channelId, SpliceAttemptAborted(d.channelId).getMessage) + stay() using d.copy(spliceStatus = SpliceStatus.NoSplice) sending TxAbort(d.channelId, SpliceAttemptAborted(d.channelId).getMessage) calling endQuiescence(d) case SpliceStatus.SpliceRequested(cmd, _) => log.info("our peer rejected our splice attempt: ascii='{}' bin={}", msg.toAscii, msg.data) cmd.replyTo ! RES_FAILURE(cmd, new RuntimeException(s"splice attempt rejected by our peer: ${msg.toAscii}")) - stay() using d.copy(spliceStatus = SpliceStatus.NoSplice) sending TxAbort(d.channelId, SpliceAttemptAborted(d.channelId).getMessage) + stay() using d.copy(spliceStatus = SpliceStatus.NoSplice) sending TxAbort(d.channelId, SpliceAttemptAborted(d.channelId).getMessage) calling endQuiescence(d) + case SpliceStatus.NonInitiatorQuiescent => + log.info("our peer aborted their own splice attempt: ascii='{}' bin={}", msg.toAscii, msg.data) + stay() using d.copy(spliceStatus = SpliceStatus.NoSplice) sending TxAbort(d.channelId, SpliceAttemptAborted(d.channelId).getMessage) calling endQuiescence(d) case SpliceStatus.SpliceAborted => log.debug("our peer acked our previous tx_abort") - stay() using d.copy(spliceStatus = SpliceStatus.NoSplice) + stay() using d.copy(spliceStatus = SpliceStatus.NoSplice) calling endQuiescence(d) case SpliceStatus.NoSplice => log.info("our peer wants to abort the splice, but we've already negotiated a splice transaction: ascii='{}' bin={}", msg.toAscii, msg.data) // We ack their tx_abort but we keep monitoring the funding transaction until it's confirmed or double-spent. stay() sending TxAbort(d.channelId, SpliceAttemptAborted(d.channelId).getMessage) + case _: QuiescenceNegotiation => + log.info("our peer aborted the splice during quiescence negotiation, disconnecting: ascii='{}' bin={}", msg.toAscii, msg.data) + // NB: we use a small delay to ensure we've sent our warning before disconnecting. + context.system.scheduler.scheduleOnce(2 second, peer, Peer.Disconnect(remoteNodeId)) + stay() using d.copy(spliceStatus = SpliceStatus.NoSplice) sending Warning(d.channelId, UnexpectedInteractiveTxMessage(d.channelId, msg).getMessage) } case Event(msg: InteractiveTxBuilder.Response, d: DATA_NORMAL) => @@ -997,7 +1082,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with val commitments1 = d.commitments.add(signingSession1.commitment) val d1 = d.copy(commitments = commitments1, spliceStatus = SpliceStatus.NoSplice) log.info("publishing funding tx for channelId={} fundingTxId={}", d.channelId, signingSession1.fundingTx.sharedTx.txId) - stay() using d1 storing() sending signingSession1.localSigs calling publishFundingTx(signingSession1.fundingTx) + stay() using d1 storing() sending signingSession1.localSigs calling publishFundingTx(signingSession1.fundingTx) calling endQuiescence(d1) } case _ => // We may receive an outdated tx_signatures if the transaction is already confirmed. @@ -1053,7 +1138,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with if (d.commitments.changes.localChanges.proposed.collectFirst { case add: UpdateAddHtlc => add }.isDefined) { log.debug("updating channel_update announcement (reason=disabled)") val channelUpdate1 = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, scidForChannelUpdate(d), d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, d.channelUpdate.feeBaseMsat, d.channelUpdate.feeProportionalMillionths, d.commitments.params.maxHtlcAmount, isPrivate = !d.commitments.announceChannel, enable = false) - // NB: the htlcs stay() in the commitments.localChange, they will be cleaned up after reconnection + // NB: the htlcs stay in the commitments.localChange, they will be cleaned up after reconnection d.commitments.changes.localChanges.proposed.collect { case add: UpdateAddHtlc => relayer ! RES_ADD_SETTLED(d.commitments.originChannels(add.id), add, HtlcResult.DisconnectedBeforeSigned(channelUpdate1)) } @@ -2184,6 +2269,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with if (nextState == OFFLINE) { // we can cancel the timer, we are not expecting anything when disconnected cancelTimer(RevocationTimeout.toString) + cancelTimer(QuiescenceTimeout.toString) } sealed trait EmitLocalChannelEvent @@ -2525,6 +2611,73 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with proposedTx_opt.get.unsignedTx.copy(tx = tx) } + private def initiateSplice(cmd: CMD_SPLICE, d: DATA_NORMAL): Either[ChannelException, SpliceInit] = { + if (d.commitments.isQuiescent) { + val parentCommitment = d.commitments.latest.commitment + val targetFeerate = nodeParams.onChainFeeConf.getFundingFeerate(nodeParams.currentFeerates) + val fundingContribution = InteractiveTxFunder.computeSpliceContribution( + isInitiator = true, + sharedInput = Multisig2of2Input(parentCommitment), + spliceInAmount = cmd.additionalLocalFunding, + spliceOut = cmd.spliceOutputs, + targetFeerate = targetFeerate) + if (parentCommitment.localCommit.spec.toLocal + fundingContribution < parentCommitment.localChannelReserve(d.commitments.params)) { + log.warning("cannot do splice: insufficient funds") + Left(InvalidSpliceRequest(d.channelId)) + } else if (cmd.spliceOut_opt.map(_.scriptPubKey).exists(!MutualClose.isValidFinalScriptPubkey(_, allowAnySegwit = true))) { + log.warning("cannot do splice: invalid splice-out script") + Left(InvalidSpliceRequest(d.channelId)) + } else { + log.info(s"initiating splice with local.in.amount=${cmd.additionalLocalFunding} local.in.push=${cmd.pushAmount} local.out.amount=${cmd.spliceOut_opt.map(_.amount).sum}") + val spliceInit = SpliceInit(d.channelId, + fundingContribution = fundingContribution, + lockTime = nodeParams.currentBlockHeight.toLong, + feerate = targetFeerate, + fundingPubKey = keyManager.fundingPublicKey(d.commitments.params.localParams.fundingKeyPath, parentCommitment.fundingTxIndex + 1).publicKey, + pushAmount = cmd.pushAmount, + requireConfirmedInputs = nodeParams.channelConf.requireConfirmedInputsForDualFunding + ) + Right(spliceInit) + } + } else { + log.warning("cannot initiate splice, channel is not quiescent") + Left(InvalidSpliceNotQuiescent(d.channelId)) + } + } + + private def handleQuiescenceTimeout(d: DATA_NORMAL): State = { + if (d.spliceStatus == SpliceStatus.NoSplice) { + log.warning("quiescence timed out with no ongoing splice, did we forget to cancel the timer?") + stay() + } else { + log.warning("quiescence timed out in state {}, closing connection", d.spliceStatus.getClass.getSimpleName) + context.system.scheduler.scheduleOnce(2 second, peer, Peer.Disconnect(remoteNodeId)) + stay() sending Warning(d.channelId, SpliceAttemptTimedOut(d.channelId).getMessage) + } + } + + /** Get out of a quiescent state: if there are HTLCs in flight, re-emit pending settlement commands. */ + private def endQuiescence(d: DATA_NORMAL): Unit = { + // We cancel the quiescence timeout timer, otherwise it may fire during the next quiescence session. + cancelTimer(QuiescenceTimeout.toString) + if (d.commitments.hasPendingOrProposedHtlcs) { + PendingCommandsDb.getSettlementCommands(nodeParams.db.pendingCommands, d.channelId).foreach(self ! _) + } + } + + private def reportSpliceFailure(spliceStatus: SpliceStatus, f: Throwable): Unit = { + val cmd_opt = spliceStatus match { + case SpliceStatus.QuiescenceRequested(cmd) => Some(cmd) + case SpliceStatus.InitiatorQuiescent(cmd) => Some(cmd) + case SpliceStatus.SpliceRequested(cmd, _) => Some(cmd) + case SpliceStatus.SpliceInProgress(cmd_opt, txBuilder, _) => + txBuilder ! InteractiveTxBuilder.Abort + cmd_opt + case _ => None + } + cmd_opt.foreach(cmd => cmd.replyTo ! RES_FAILURE(cmd, f)) + } + override def mdc(currentMessage: Any): MDC = { val category_opt = LogCategory(currentMessage) val id = currentMessage match { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/DualFundingHandlers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/DualFundingHandlers.scala index 05f9f8473a..98c277bf6e 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/DualFundingHandlers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/DualFundingHandlers.scala @@ -147,14 +147,4 @@ trait DualFundingHandlers extends CommonFundingHandlers { } } - def reportSpliceFailure(spliceStatus: SpliceStatus, f: Throwable): Unit = { - spliceStatus match { - case SpliceStatus.SpliceRequested(cmd, _) => cmd.replyTo ! RES_FAILURE(cmd, f) - case SpliceStatus.SpliceInProgress(cmd_opt, txBuilder, _) => - txBuilder ! InteractiveTxBuilder.Abort - cmd_opt.foreach(cmd => cmd.replyTo ! RES_FAILURE(cmd, f)) - case _ => () - } - } - } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecs.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecs.scala index 2fdaa0bc8b..35f9269fcc 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecs.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecs.scala @@ -420,6 +420,11 @@ object LightningMessageCodecs { ("channelId" | bytes32) :: ("fundingTxid" | bytes32) :: ("tlvStream" | SpliceLockedTlv.spliceLockedTlvCodec)).as[SpliceLocked] + + val stfuCodec: Codec[Stfu] = ( + ("channelId" | bytes32) :: + ("initiator" | byte.xmap[Boolean](b => b != 0, b => if (b) 1 else 0))).as[Stfu] + // // @@ -431,6 +436,7 @@ object LightningMessageCodecs { val lightningMessageCodec = discriminated[LightningMessage].by(uint16) .typecase(1, warningCodec) + .typecase(2, stfuCodec) .typecase(16, initCodec) .typecase(17, errorCodec) .typecase(18, pingCodec) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala index 30ba59f7c8..6f6f932ba7 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala @@ -278,6 +278,8 @@ case class ChannelReady(channelId: ByteVector32, val alias_opt: Option[Alias] = tlvStream.get[ShortChannelIdTlv].map(_.alias) } +case class Stfu(channelId: ByteVector32, initiator: Boolean) extends SetupMessage with HasChannelId + case class SpliceInit(channelId: ByteVector32, fundingContribution: Satoshi, feerate: FeeratePerKw, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index 1ec95cb70e..21bd98694c 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -131,7 +131,8 @@ object TestConstants { channelOpenerWhitelist = Set.empty, maxPendingChannelsPerPeer = 3, maxTotalPendingChannelsPrivateNodes = 99, - remoteRbfLimits = RemoteRbfLimits(5, 0) + remoteRbfLimits = RemoteRbfLimits(5, 0), + quiescenceTimeout = 2 minutes ), onChainFeeConf = OnChainFeeConf( feeTargets = FeeTargets(funding = ConfirmationPriority.Medium, closing = ConfirmationPriority.Medium), @@ -290,7 +291,8 @@ object TestConstants { channelOpenerWhitelist = Set.empty, maxPendingChannelsPerPeer = 3, maxTotalPendingChannelsPrivateNodes = 99, - remoteRbfLimits = RemoteRbfLimits(5, 0) + remoteRbfLimits = RemoteRbfLimits(5, 0), + quiescenceTimeout = 2 minutes ), onChainFeeConf = OnChainFeeConf( feeTargets = FeeTargets(funding = ConfirmationPriority.Medium, closing = ConfirmationPriority.Medium), diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala index 44fe231984..d2b16961a2 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala @@ -90,6 +90,8 @@ object ChannelStateTestsTags { val RejectRbfAttempts = "reject_rbf_attempts" /** If set, the non-initiator will require a 1-block delay between RBF attempts. */ val DelayRbfAttempts = "delay_rbf_attempts" + /** If set, peers will support the quiesce protocol. */ + val Quiescence = "quiescence" } trait ChannelStateTestsBase extends Assertions with Eventually { @@ -180,6 +182,7 @@ trait ChannelStateTestsBase extends Assertions with Eventually { .modify(_.activated).usingIf(tags.contains(ChannelStateTestsTags.ScidAlias))(_.updated(Features.ScidAlias, FeatureSupport.Optional)) .modify(_.activated).usingIf(tags.contains(ChannelStateTestsTags.DualFunding))(_.updated(Features.DualFunding, FeatureSupport.Optional)) .modify(_.activated).usingIf(tags.contains(ChannelStateTestsTags.Splicing))(_.updated(Features.SplicePrototype, FeatureSupport.Optional)) + .modify(_.activated).usingIf(tags.contains(ChannelStateTestsTags.Quiescence))(_.updated(Features.Quiescence, FeatureSupport.Optional)) .initFeatures() val bobInitFeatures = Bob.nodeParams.features .modify(_.activated).usingIf(tags.contains(ChannelStateTestsTags.DisableWumbo))(_.removed(Features.Wumbo)) @@ -193,6 +196,7 @@ trait ChannelStateTestsBase extends Assertions with Eventually { .modify(_.activated).usingIf(tags.contains(ChannelStateTestsTags.ScidAlias))(_.updated(Features.ScidAlias, FeatureSupport.Optional)) .modify(_.activated).usingIf(tags.contains(ChannelStateTestsTags.DualFunding))(_.updated(Features.DualFunding, FeatureSupport.Optional)) .modify(_.activated).usingIf(tags.contains(ChannelStateTestsTags.Splicing))(_.updated(Features.SplicePrototype, FeatureSupport.Optional)) + .modify(_.activated).usingIf(tags.contains(ChannelStateTestsTags.Quiescence))(_.updated(Features.Quiescence, FeatureSupport.Optional)) .initFeatures() val channelType = ChannelTypes.defaultFromFeatures(aliceInitFeatures, bobInitFeatures, announceChannel = channelFlags.announceChannel) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalQuiescentStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalQuiescentStateSpec.scala new file mode 100644 index 0000000000..476c7c46c5 --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalQuiescentStateSpec.scala @@ -0,0 +1,525 @@ +/* + * Copyright 2023 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.channel.states.e + +import akka.actor.typed.scaladsl.adapter.actorRefAdapter +import akka.testkit.{TestFSMRef, TestProbe} +import fr.acinq.bitcoin.scalacompat.{SatoshiLong, Script} +import fr.acinq.eclair.TestConstants.Bob +import fr.acinq.eclair.blockchain.CurrentBlockHeight +import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._ +import fr.acinq.eclair.blockchain.fee.FeeratePerKw +import fr.acinq.eclair.channel._ +import fr.acinq.eclair.channel.fsm.Channel +import fr.acinq.eclair.channel.fund.InteractiveTxBuilder +import fr.acinq.eclair.channel.publish.TxPublisher.{PublishFinalTx, PublishTx} +import fr.acinq.eclair.channel.states.{ChannelStateTestsBase, ChannelStateTestsTags} +import fr.acinq.eclair.io.Peer +import fr.acinq.eclair.payment.relay.Relayer.RelayForward +import fr.acinq.eclair.wire.protocol._ +import fr.acinq.eclair.{channel, _} +import org.scalatest.Outcome +import org.scalatest.funsuite.FixtureAnyFunSuiteLike +import org.scalatest.time.SpanSugar.convertIntToGrainOfTime +import scodec.bits.HexStringSyntax + +class NormalQuiescentStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with ChannelStateTestsBase { + + type FixtureParam = SetupFixture + + implicit val log: akka.event.LoggingAdapter = akka.event.NoLogging + + override def withFixture(test: OneArgTest): Outcome = { + val tags = test.tags + ChannelStateTestsTags.DualFunding + ChannelStateTestsTags.Splicing + ChannelStateTestsTags.Quiescence + val setup = init(tags = tags) + import setup._ + reachNormal(setup, tags) + alice2bob.ignoreMsg { case _: ChannelUpdate => true } + bob2alice.ignoreMsg { case _: ChannelUpdate => true } + awaitCond(alice.stateName == NORMAL) + awaitCond(bob.stateName == NORMAL) + withFixture(test.toNoArgTest(setup)) + } + + private def disconnect(f: FixtureParam): Unit = { + import f._ + alice ! INPUT_DISCONNECTED + bob ! INPUT_DISCONNECTED + awaitCond(alice.stateName == OFFLINE) + awaitCond(bob.stateName == OFFLINE) + } + + private def reconnect(f: FixtureParam): Unit = { + import f._ + + val aliceInit = Init(alice.stateData.asInstanceOf[ChannelDataWithCommitments].commitments.params.localParams.initFeatures) + val bobInit = Init(bob.stateData.asInstanceOf[ChannelDataWithCommitments].commitments.params.localParams.initFeatures) + alice ! INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit) + bob ! INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit) + alice2bob.expectMsgType[ChannelReestablish] + alice2bob.forward(bob) + bob2alice.expectMsgType[ChannelReestablish] + bob2alice.forward(alice) + + alice2blockchain.expectMsgType[WatchFundingDeeplyBuried] + bob2blockchain.expectMsgType[WatchFundingDeeplyBuried] + awaitCond(alice.stateName == NORMAL) + awaitCond(bob.stateName == NORMAL) + } + + private def safeSend(r: TestFSMRef[ChannelState, ChannelData, Channel], cmds: Seq[channel.Command]): Unit = { + cmds.foreach { + case cmd: channel.HtlcSettlementCommand => + // this would be done automatically when the relayer calls safeSend + r.underlyingActor.nodeParams.db.pendingCommands.addSettlementCommand(r.stateData.channelId, cmd) + r ! cmd + case cmd: channel.Command => r ! cmd + } + } + + private def initiateQuiescence(f: FixtureParam, sendInitialStfu: Boolean): TestProbe = { + import f._ + + val sender = TestProbe() + val scriptPubKey = Script.write(Script.pay2wpkh(randomKey().publicKey)) + val cmd = CMD_SPLICE(sender.ref, spliceIn_opt = Some(SpliceIn(500_000 sat)), spliceOut_opt = Some(SpliceOut(100_000 sat, scriptPubKey))) + alice ! cmd + alice2bob.expectMsgType[Stfu] + if (!sendInitialStfu) { + // only alice is quiescent, we're holding the first stfu to pause the splice + } else { + alice2bob.forward(bob) + bob2alice.expectMsgType[Stfu] + bob2alice.forward(alice) + alice2bob.expectMsgType[SpliceInit] + // both alice and bob are quiescent, we're holding the splice-init to pause the splice + } + + sender + } + + test("send stfu after pending local changes have been added") { f => + import f._ + // we have an unsigned htlc in our local changes + addHtlc(10_000 msat, alice, bob, alice2bob, bob2alice) + alice ! CMD_SPLICE(TestProbe().ref, spliceIn_opt = Some(SpliceIn(50_000 sat)), spliceOut_opt = None) + alice2bob.expectNoMessage(100 millis) + crossSign(alice, bob, alice2bob, bob2alice) + alice2bob.expectMsgType[Stfu] + assert(alice.stateData.asInstanceOf[ChannelDataWithCommitments].commitments.isQuiescent) + } + + test("recv stfu when there are pending local changes") { f => + import f._ + initiateQuiescence(f, sendInitialStfu = false) + // we're holding the stfu from alice so that bob can add a pending local change + addHtlc(10_000 msat, bob, alice, bob2alice, alice2bob) + // bob will not reply to alice's stfu until bob has no pending local commitment changes + alice2bob.forward(bob) + bob2alice.expectNoMessage(100 millis) + crossSign(bob, alice, bob2alice, alice2bob) + bob2alice.expectMsgType[Stfu] + assert(bob.stateData.asInstanceOf[ChannelDataWithCommitments].commitments.isQuiescent) + bob2alice.forward(alice) + // when both nodes are quiescent, alice will start the splice + alice2bob.expectMsgType[SpliceInit] + alice2bob.forward(bob) + bob2alice.expectMsgType[SpliceAck] + bob2alice.forward(alice) + } + + test("recv forbidden non-settlement commands while initiator awaiting stfu from remote") { f => + import f._ + // initiator should reject commands that change the commitment once it became quiescent + val sender1, sender2, sender3 = TestProbe() + val cmds = Seq( + CMD_ADD_HTLC(sender1.ref, 1_000_000 msat, randomBytes32(), CltvExpiryDelta(144).toCltvExpiry(currentBlockHeight), TestConstants.emptyOnionPacket, None, localOrigin(sender1.ref)), + CMD_UPDATE_FEE(FeeratePerKw(100 sat), replyTo_opt = Some(sender2.ref)), + CMD_CLOSE(sender3.ref, None, None) + ) + initiateQuiescence(f, sendInitialStfu = false) + safeSend(alice, cmds) + sender1.expectMsgType[RES_ADD_FAILED[ForbiddenDuringSplice]] + sender2.expectMsgType[RES_FAILURE[CMD_UPDATE_FEE, ForbiddenDuringSplice]] + sender3.expectMsgType[RES_FAILURE[CMD_CLOSE, ForbiddenDuringSplice]] + } + + test("recv forbidden non-settlement commands while quiescent") { f => + import f._ + // both should reject commands that change the commitment while quiescent + val sender1, sender2, sender3 = TestProbe() + val cmds = Seq( + CMD_ADD_HTLC(sender1.ref, 1_000_000 msat, randomBytes32(), CltvExpiryDelta(144).toCltvExpiry(currentBlockHeight), TestConstants.emptyOnionPacket, None, localOrigin(sender1.ref)), + CMD_UPDATE_FEE(FeeratePerKw(100 sat), replyTo_opt = Some(sender2.ref)), + CMD_CLOSE(sender3.ref, None, None) + ) + initiateQuiescence(f, sendInitialStfu = true) + safeSend(alice, cmds) + sender1.expectMsgType[RES_ADD_FAILED[ForbiddenDuringSplice]] + sender2.expectMsgType[RES_FAILURE[CMD_UPDATE_FEE, ForbiddenDuringSplice]] + sender3.expectMsgType[RES_FAILURE[CMD_CLOSE, ForbiddenDuringSplice]] + safeSend(bob, cmds) + sender1.expectMsgType[RES_ADD_FAILED[ForbiddenDuringSplice]] + sender2.expectMsgType[RES_FAILURE[CMD_UPDATE_FEE, ForbiddenDuringSplice]] + sender3.expectMsgType[RES_FAILURE[CMD_CLOSE, ForbiddenDuringSplice]] + } + + // @formatter:off + sealed trait SettlementCommandEnum + case object FulfillHtlc extends SettlementCommandEnum + case object FailHtlc extends SettlementCommandEnum + // @formatter:on + + private def receiveSettlementCommand(f: FixtureParam, c: SettlementCommandEnum, sendInitialStfu: Boolean, resetConnection: Boolean = false): Unit = { + import f._ + + val (preimage, add) = addHtlc(10_000 msat, bob, alice, bob2alice, alice2bob) + val cmd = c match { + case FulfillHtlc => CMD_FULFILL_HTLC(add.id, preimage) + case FailHtlc => CMD_FAIL_HTLC(add.id, Left(randomBytes32())) + } + crossSign(bob, alice, bob2alice, alice2bob) + val sender = initiateQuiescence(f, sendInitialStfu) + safeSend(alice, Seq(cmd)) + // alice will not forward settlement msg to bob while quiescent + alice2bob.expectNoMessage(100 millis) + if (resetConnection) { + // both alice and bob leave quiescence when the channel is disconnected + disconnect(f) + } else if (sendInitialStfu) { + // alice sends splice-init to bob + alice2bob.forward(bob) + bob2alice.expectMsgType[SpliceAck] + // both alice and bob leave quiescence when the splice aborts + bob2alice.forward(alice, TxAbort(channelId(bob), hex"deadbeef")) + alice2bob.expectMsgType[TxAbort] + alice2bob.forward(bob) + bob2alice.expectMsgType[TxAbort] + } else { + // alice sends a warning and disconnects if bob doesn't send stfu before the timeout + alice ! Channel.QuiescenceTimeout(alicePeer.ref) + alice2bob.expectMsgType[Warning] + alice2bob.forward(bob) + // we should disconnect after giving bob time to receive the warning + alicePeer.fishForMessage(3 seconds) { + case Peer.Disconnect(nodeId, _) if nodeId == alice.underlyingActor.remoteNodeId => true + case _ => false + } + disconnect(f) + } + if (resetConnection || !sendInitialStfu) { + // any failure during quiescence will cause alice to disconnect + sender.expectMsgType[RES_FAILURE[_, _]] + assert(alice.stateData.asInstanceOf[DATA_NORMAL].spliceStatus == SpliceStatus.NoSplice) + reconnect(f) + } + // alice sends pending settlement after quiescence ends + c match { + case FulfillHtlc => alice2bob.expectMsgType[UpdateFulfillHtlc] + case FailHtlc => alice2bob.expectMsgType[UpdateFailHtlc] + } + } + + test("recv CMD_FULFILL_HTLC while initiator awaiting stfu from remote") { f => + receiveSettlementCommand(f, FulfillHtlc, sendInitialStfu = false) + } + + test("recv CMD_FAIL_HTLC while initiator awaiting stfu from remote") { f => + receiveSettlementCommand(f, FailHtlc, sendInitialStfu = false) + } + + test("recv CMD_FULFILL_HTLC while initiator awaiting stfu from remote and channel disconnects") { f => + receiveSettlementCommand(f, FulfillHtlc, sendInitialStfu = false, resetConnection = true) + } + + test("recv CMD_FAIL_HTLC while initiator awaiting stfu from remote and channel disconnects") { f => + receiveSettlementCommand(f, FailHtlc, sendInitialStfu = false, resetConnection = true) + } + + test("recv CMD_FULFILL_HTLC while quiescent") { f => + receiveSettlementCommand(f, FulfillHtlc, sendInitialStfu = true) + } + + test("recv CMD_FAIL_HTLC while quiescent") { f => + receiveSettlementCommand(f, FailHtlc, sendInitialStfu = true) + } + + test("recv CMD_FULFILL_HTLC while quiescent and channel disconnects") { f => + receiveSettlementCommand(f, FulfillHtlc, sendInitialStfu = true, resetConnection = true) + } + + test("recv CMD_FAIL_HTLC while quiescent and channel disconnects") { f => + receiveSettlementCommand(f, FailHtlc, sendInitialStfu = true, resetConnection = true) + } + + test("recv second stfu while non-initiator waiting for local commitment to be signed") { f => + import f._ + initiateQuiescence(f, sendInitialStfu = false) + val (_, _) = addHtlc(10_000 msat, bob, alice, bob2alice, alice2bob) + alice2bob.forward(bob) + // second stfu to bob is ignored + bob ! Stfu(channelId(bob), initiator = true) + bob2alice.expectNoMessage(100 millis) + } + + test("recv Shutdown message before initiator receives stfu from remote") { f => + import f._ + initiateQuiescence(f, sendInitialStfu = false) + val bobData = bob.stateData.asInstanceOf[DATA_NORMAL] + val forbiddenMsg = Shutdown(channelId(bob), bob.underlyingActor.getOrGenerateFinalScriptPubKey(bobData)) + bob2alice.forward(alice, forbiddenMsg) + // handle Shutdown normally + alice2bob.expectMsgType[Shutdown] + } + + test("recv (forbidden) Shutdown message while quiescent") { f => + import f._ + initiateQuiescence(f, sendInitialStfu = true) + val bobData = bob.stateData.asInstanceOf[DATA_NORMAL] + val forbiddenMsg = Shutdown(channelId(bob), bob.underlyingActor.getOrGenerateFinalScriptPubKey(bobData)) + // both parties will respond to a forbidden msg while quiescent with a warning (and disconnect) + bob2alice.forward(alice, forbiddenMsg) + alice2bob.expectMsg(Warning(channelId(alice), ForbiddenDuringSplice(channelId(alice), "Shutdown").getMessage)) + alice2bob.forward(bob, forbiddenMsg) + bob2alice.expectMsg(Warning(channelId(alice), ForbiddenDuringSplice(channelId(alice), "Shutdown").getMessage)) + } + + test("recv (forbidden) UpdateFulfillHtlc message while quiescent") { f => + import f._ + val (preimage, add) = addHtlc(10_000 msat, bob, alice, bob2alice, alice2bob) + crossSign(bob, alice, bob2alice, alice2bob) + alice2relayer.expectMsg(RelayForward(add)) + initiateQuiescence(f, sendInitialStfu = true) + val forbiddenMsg = UpdateFulfillHtlc(channelId(bob), add.id, preimage) + // both parties will respond to a forbidden msg while quiescent with a warning (and disconnect) + bob2alice.forward(alice, forbiddenMsg) + alice2bob.expectMsg(Warning(channelId(alice), ForbiddenDuringSplice(channelId(alice), "UpdateFulfillHtlc").getMessage)) + alice2bob.forward(bob, forbiddenMsg) + bob2alice.expectMsg(Warning(channelId(alice), ForbiddenDuringSplice(channelId(alice), "UpdateFulfillHtlc").getMessage)) + assert(bob2relayer.expectMsgType[RES_ADD_SETTLED[_, HtlcResult.RemoteFulfill]].result.paymentPreimage == preimage) + } + + test("recv (forbidden) UpdateFailHtlc message while quiescent") { f => + import f._ + val (_, add) = addHtlc(10_000 msat, bob, alice, bob2alice, alice2bob) + crossSign(bob, alice, bob2alice, alice2bob) + initiateQuiescence(f, sendInitialStfu = true) + val forbiddenMsg = UpdateFailHtlc(channelId(bob), add.id, randomBytes32()) + // both parties will respond to a forbidden msg while quiescent with a warning (and disconnect) + bob2alice.forward(alice, forbiddenMsg) + alice2bob.expectMsg(Warning(channelId(alice), ForbiddenDuringSplice(channelId(alice), "UpdateFailHtlc").getMessage)) + alice2bob.forward(bob, forbiddenMsg) + bob2alice.expectMsg(Warning(channelId(alice), ForbiddenDuringSplice(channelId(alice), "UpdateFailHtlc").getMessage)) + } + + test("recv (forbidden) UpdateFee message while quiescent") { f => + import f._ + val (_, _) = addHtlc(10_000 msat, bob, alice, bob2alice, alice2bob) + crossSign(bob, alice, bob2alice, alice2bob) + initiateQuiescence(f, sendInitialStfu = true) + val forbiddenMsg = UpdateFee(channelId(bob), FeeratePerKw(500 sat)) + // both parties will respond to a forbidden msg while quiescent with a warning (and disconnect) + bob2alice.forward(alice, forbiddenMsg) + alice2bob.expectMsg(Warning(channelId(alice), ForbiddenDuringSplice(channelId(alice), "UpdateFee").getMessage)) + alice2bob.forward(bob, forbiddenMsg) + bob2alice.expectMsg(Warning(channelId(alice), ForbiddenDuringSplice(channelId(alice), "UpdateFee").getMessage)) + } + + test("recv (forbidden) UpdateAddHtlc message while quiescent") { f => + import f._ + initiateQuiescence(f, sendInitialStfu = true) + // have to build a htlc manually because eclair would refuse to accept this command as it's forbidden + val forbiddenMsg = UpdateAddHtlc(channelId = randomBytes32(), id = 5656, amountMsat = 50000000 msat, cltvExpiry = CltvExpiryDelta(144).toCltvExpiry(currentBlockHeight), paymentHash = randomBytes32(), onionRoutingPacket = TestConstants.emptyOnionPacket, blinding_opt = None) + // both parties will respond to a forbidden msg while quiescent with a warning (and disconnect) + bob2alice.forward(alice, forbiddenMsg) + alice2bob.expectMsg(Warning(channelId(alice), ForbiddenDuringSplice(channelId(alice), "UpdateAddHtlc").getMessage)) + alice2bob.forward(bob, forbiddenMsg) + bob2alice.expectMsg(Warning(channelId(alice), ForbiddenDuringSplice(channelId(alice), "UpdateAddHtlc").getMessage)) + } + + test("recv stfu from splice initiator that is not quiescent") { f => + import f._ + addHtlc(10_000 msat, alice, bob, alice2bob, bob2alice) + alice2bob.forward(bob, Stfu(channelId(alice), initiator = true)) + bob2alice.expectMsg(Warning(channelId(bob), InvalidSpliceNotQuiescent(channelId(bob)).getMessage)) + // we should disconnect after giving alice time to receive the warning + bobPeer.fishForMessage(3 seconds) { + case Peer.Disconnect(nodeId, _) if nodeId == bob.underlyingActor.remoteNodeId => true + case _ => false + } + } + + test("recv stfu from splice non-initiator that is not quiescent") { f => + import f._ + addHtlc(10_000 msat, bob, alice, bob2alice, alice2bob) + initiateQuiescence(f, sendInitialStfu = false) + alice2bob.forward(bob) + bob2alice.forward(alice, Stfu(channelId(bob), initiator = false)) + alice2bob.expectMsg(Warning(channelId(alice), InvalidSpliceNotQuiescent(channelId(alice)).getMessage)) + // we should disconnect after giving bob time to receive the warning + alicePeer.fishForMessage(3 seconds) { + case Peer.Disconnect(nodeId, _) if nodeId == alice.underlyingActor.remoteNodeId => true + case _ => false + } + } + + test("initiate quiescence concurrently (no pending changes)") { f => + import f._ + + val sender = TestProbe() + val cmd = CMD_SPLICE(sender.ref, spliceIn_opt = Some(SpliceIn(500_000 sat, pushAmount = 0 msat)), spliceOut_opt = None) + alice ! cmd + alice2bob.expectMsgType[Stfu] + bob ! cmd + bob2alice.expectMsgType[Stfu] + bob2alice.forward(alice) + alice2bob.forward(bob) + alice2bob.expectMsgType[SpliceInit] + eventually(assert(bob.stateData.asInstanceOf[DATA_NORMAL].spliceStatus == SpliceStatus.NonInitiatorQuiescent)) + sender.expectMsgType[RES_FAILURE[CMD_SPLICE, ConcurrentRemoteSplice]] + } + + test("initiate quiescence concurrently (pending changes on initiator side)") { f => + import f._ + + addHtlc(10_000 msat, alice, bob, alice2bob, bob2alice) + val sender = TestProbe() + val cmd = CMD_SPLICE(sender.ref, spliceIn_opt = Some(SpliceIn(500_000 sat, pushAmount = 0 msat)), spliceOut_opt = None) + alice ! cmd + alice2bob.expectNoMessage(100 millis) // alice isn't quiescent yet + bob ! cmd + bob2alice.expectMsgType[Stfu] + bob2alice.forward(alice) + crossSign(alice, bob, alice2bob, bob2alice) + alice2bob.expectMsgType[Stfu] + alice2bob.forward(bob) + assert(alice.stateData.asInstanceOf[DATA_NORMAL].spliceStatus == SpliceStatus.NonInitiatorQuiescent) + sender.expectMsgType[RES_FAILURE[CMD_SPLICE, ConcurrentRemoteSplice]] + bob2alice.expectMsgType[SpliceInit] + } + + test("initiate quiescence concurrently (pending changes on non-initiator side)") { f => + import f._ + + addHtlc(10_000 msat, bob, alice, bob2alice, alice2bob) + val sender = TestProbe() + val cmd = CMD_SPLICE(sender.ref, spliceIn_opt = Some(SpliceIn(500_000 sat, pushAmount = 0 msat)), spliceOut_opt = None) + alice ! cmd + alice2bob.expectMsgType[Stfu] + bob ! cmd + bob2alice.expectNoMessage(100 millis) // bob isn't quiescent yet + alice2bob.forward(bob) + crossSign(bob, alice, bob2alice, alice2bob) + bob2alice.expectMsgType[Stfu] + bob2alice.forward(alice) + assert(bob.stateData.asInstanceOf[DATA_NORMAL].spliceStatus == SpliceStatus.NonInitiatorQuiescent) + sender.expectMsgType[RES_FAILURE[CMD_SPLICE, ConcurrentRemoteSplice]] + alice2bob.expectMsgType[SpliceInit] + } + + test("htlc timeout during quiescence negotiation") { f => + import f._ + val (_, add) = addHtlc(50_000_000 msat, alice, bob, alice2bob, bob2alice) + crossSign(alice, bob, alice2bob, bob2alice) + initiateQuiescence(f, sendInitialStfu = true) + + val aliceCommit = alice.stateData.asInstanceOf[DATA_NORMAL].commitments.latest.localCommit + val commitTx = aliceCommit.commitTxAndRemoteSig.commitTx.tx + assert(aliceCommit.htlcTxsAndRemoteSigs.size == 1) + val htlcTimeoutTx = aliceCommit.htlcTxsAndRemoteSigs.head.htlcTx.tx + + // the HTLC times out, alice needs to close the channel + alice ! CurrentBlockHeight(add.cltvExpiry.blockHeight) + assert(alice2blockchain.expectMsgType[PublishFinalTx].tx.txid == commitTx.txid) + alice2blockchain.expectMsgType[PublishTx] // main delayed + assert(alice2blockchain.expectMsgType[PublishFinalTx].tx.txid == htlcTimeoutTx.txid) + assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == commitTx.txid) + alice2blockchain.expectMsgType[WatchTxConfirmed] // main delayed + alice2blockchain.expectMsgType[WatchOutputSpent] // htlc output + alice2blockchain.expectNoMessage(100 millis) + + channelUpdateListener.expectMsgType[LocalChannelDown] + } + + test("htlc timeout during quiescence negotiation (with pending preimage)") { f => + import f._ + val (preimage, add) = addHtlc(50_000_000 msat, alice, bob, alice2bob, bob2alice) + crossSign(alice, bob, alice2bob, bob2alice) + initiateQuiescence(f, sendInitialStfu = true) + + val bobCommit = bob.stateData.asInstanceOf[DATA_NORMAL].commitments.latest.localCommit + val commitTx = bobCommit.commitTxAndRemoteSig.commitTx.tx + assert(bobCommit.htlcTxsAndRemoteSigs.size == 1) + val htlcSuccessTx = bobCommit.htlcTxsAndRemoteSigs.head.htlcTx.tx + + // bob receives the fulfill for htlc, which is ignored because the channel is quiescent + val fulfillHtlc = CMD_FULFILL_HTLC(add.id, preimage) + safeSend(bob, Seq(fulfillHtlc)) + + // the HTLC timeout from alice is near, bob needs to close the channel to avoid an on-chain race condition + bob ! CurrentBlockHeight(add.cltvExpiry.blockHeight - Bob.nodeParams.channelConf.fulfillSafetyBeforeTimeout.toInt) + // bob publishes a first set of force-close transactions + assert(bob2blockchain.expectMsgType[PublishFinalTx].tx.txid == commitTx.txid) + bob2blockchain.expectMsgType[PublishTx] // main delayed + assert(bob2blockchain.expectMsgType[WatchTxConfirmed].txId == commitTx.txid) + bob2blockchain.expectMsgType[WatchTxConfirmed] + bob2blockchain.expectMsgType[WatchOutputSpent] // htlc output + + // when transitioning to the closing state, bob checks the pending commands DB and replays the HTLC fulfill + assert(bob2blockchain.expectMsgType[PublishFinalTx].tx.txid == commitTx.txid) + bob2blockchain.expectMsgType[PublishTx] // main delayed + assert(bob2blockchain.expectMsgType[PublishFinalTx].tx.txid == htlcSuccessTx.txid) + assert(bob2blockchain.expectMsgType[WatchTxConfirmed].txId == commitTx.txid) + bob2blockchain.expectMsgType[WatchTxConfirmed] // main delayed + bob2blockchain.expectMsgType[WatchOutputSpent] // htlc output + bob2blockchain.expectNoMessage(100 millis) + + channelUpdateListener.expectMsgType[LocalChannelDown] + } + + test("receive quiescence timeout while splice in progress") { f => + import f._ + initiateQuiescence(f, sendInitialStfu = true) + + // alice sends splice-init to bob, bob responds with splice-ack + alice2bob.forward(bob) + bob2alice.expectMsgType[SpliceAck] + assert(bob.stateData.asInstanceOf[DATA_NORMAL].spliceStatus.isInstanceOf[SpliceStatus.SpliceInProgress]) + + // bob sends a warning and disconnects if the splice takes too long to complete + bob ! Channel.QuiescenceTimeout(bobPeer.ref) + bob2alice.expectMsg(Warning(channelId(bob), SpliceAttemptTimedOut(channelId(bob)).getMessage)) + } + + test("receive quiescence timeout while splice is waiting for tx_abort from peer") { f => + import f._ + initiateQuiescence(f, sendInitialStfu = true) + + // bob sends tx-abort to alice but does not receive a tx-abort back from alice + alice2bob.forward(bob) + bob2alice.expectMsgType[SpliceAck] + assert(bob.stateData.asInstanceOf[DATA_NORMAL].spliceStatus.isInstanceOf[SpliceStatus.SpliceInProgress]) + bob ! InteractiveTxBuilder.LocalFailure(new ChannelException(channelId(bob), "abort")) + bob2alice.expectMsgType[TxAbort] + assert(bob.stateData.asInstanceOf[DATA_NORMAL].spliceStatus == SpliceStatus.SpliceAborted) + + // bob sends a warning and disconnects if the splice takes too long to complete + bob ! Channel.QuiescenceTimeout(bobPeer.ref) + bob2alice.expectMsg(Warning(channelId(bob), SpliceAttemptTimedOut(channelId(bob)).getMessage)) + } + +} diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala index d6e8c0fac3..98c27c5d00 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala @@ -649,21 +649,21 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik alice2bob.expectMsgType[SpliceInit] // we're holding the splice_init to create a race - val (preimage, cmdAdd: CMD_ADD_HTLC) = makeCmdAdd(5_000_000 msat, bob.underlyingActor.remoteNodeId, bob.underlyingActor.nodeParams.currentBlockHeight) + val (_, cmdAdd: CMD_ADD_HTLC) = makeCmdAdd(5_000_000 msat, bob.underlyingActor.remoteNodeId, bob.underlyingActor.nodeParams.currentBlockHeight) bob ! cmdAdd - val add = bob2alice.expectMsgType[UpdateAddHtlc] + bob2alice.expectMsgType[UpdateAddHtlc] bob2alice.forward(alice) // now we forward the splice_init alice2bob.forward(bob) - // this cancels the splice + // bob rejects the SpliceInit because they have a pending htlc bob2alice.expectMsgType[TxAbort] bob2alice.forward(alice) + // alice returns a warning and schedules a disconnect after receiving UpdateAddHtlc + alice2bob.expectMsg(Warning(channelId(alice), ForbiddenDuringSplice(channelId(alice), "UpdateAddHtlc").getMessage)) + // alice confirms the splice abort alice2bob.expectMsgType[TxAbort] - alice2bob.forward(bob) - // but the htlc goes through normally - crossSign(bob, alice, bob2alice, alice2bob) - fulfillHtlc(add.id, preimage, alice, bob, alice2bob, bob2alice) - crossSign(alice, bob, alice2bob, bob2alice) + // the htlc is not added + assert(!alice.stateData.asInstanceOf[DATA_NORMAL].commitments.hasPendingOrProposedHtlcs) } test("recv UpdateAddHtlc while a splice is in progress") { f => @@ -680,12 +680,10 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik // have to build a htlc manually because eclair would refuse to accept this command as it's forbidden val fakeHtlc = UpdateAddHtlc(channelId = randomBytes32(), id = 5656, amountMsat = 50000000 msat, cltvExpiry = CltvExpiryDelta(144).toCltvExpiry(currentBlockHeight), paymentHash = randomBytes32(), onionRoutingPacket = TestConstants.emptyOnionPacket, blinding_opt = None) bob2alice.forward(alice, fakeHtlc) - alice2bob.expectMsgType[Error] - assertPublished(alice2blockchain, "commit-tx") - assertPublished(alice2blockchain, "local-main-delayed") - alice2blockchain.expectMsgType[WatchTxConfirmed] - alice2blockchain.expectMsgType[WatchTxConfirmed] - alice2blockchain.expectNoMessage(100 millis) + // alice returns a warning and schedules a disconnect after receiving UpdateAddHtlc + alice2bob.expectMsg(Warning(channelId(alice), ForbiddenDuringSplice(channelId(alice), "UpdateAddHtlc").getMessage)) + // the htlc is not added + assert(!alice.stateData.asInstanceOf[DATA_NORMAL].commitments.hasPendingOrProposedHtlcs) } test("recv UpdateAddHtlc before splice confirms (zero-conf)", Tag(ZeroConf), Tag(AnchorOutputsZeroFeeHtlcTxs)) { f => diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecsSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecsSpec.scala index 1a08fa6f22..cc634f6b37 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecsSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/wire/protocol/LightningMessageCodecsSpec.scala @@ -107,6 +107,17 @@ class LightningMessageCodecsSpec extends AnyFunSuite { } } + test("encode/decode stfu") { + val testCases = Seq( + Stfu(ByteVector32.One, initiator = true) -> hex"0002 0100000000000000000000000000000000000000000000000000000000000000 01", + Stfu(ByteVector32.One, initiator = false) -> hex"0002 0100000000000000000000000000000000000000000000000000000000000000 00", + ) + testCases.foreach { case (expected, bin) => + assert(lightningMessageCodec.encode(expected).require.bytes == bin) + assert(lightningMessageCodec.decode(bin.bits).require.value == expected) + } + } + test("nonreg generic tlv") { val channelId = randomBytes32() val signature = randomBytes64()