Skip to content

Commit

Permalink
Asynchronously clean up obsolete HTLC info after splice
Browse files Browse the repository at this point in the history
When a splice transaction confirms, all the revoked commitment transactions
that only applied to the previous funding transaction cannot be published
anymore, because the previous funding output has already been spent.

We can thus forget all the historical HTLCs that were included in those
commitments, because we will never need to generate the corresponding
penalty transactions.

This ensures that the growth of our DB is bounded, and will shrink every
time a splice transaction is confirmed.

Fixes #2740
  • Loading branch information
t-bast committed Jan 18, 2024
1 parent 01be7b4 commit ed12d7b
Show file tree
Hide file tree
Showing 17 changed files with 243 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,11 @@ sealed trait LocalFundingStatus {
def signedTx_opt: Option[Transaction]
/** We store local signatures for the purpose of retransmitting if the funding/splicing flow is interrupted. */
def localSigs_opt: Option[TxSignatures]
/**
* Commitment index of the first remote commitment we signed that spends this funding transaction.
* Once the funding transaction confirms, our peer won't be able to publish revoked commitments with lower indices.
*/
def firstCommitIndex_opt: Option[Long]
}
object LocalFundingStatus {
sealed trait NotLocked extends LocalFundingStatus
Expand All @@ -431,15 +436,16 @@ object LocalFundingStatus {
*/
case class SingleFundedUnconfirmedFundingTx(signedTx_opt: Option[Transaction]) extends UnconfirmedFundingTx with NotLocked {
override val localSigs_opt: Option[TxSignatures] = None
override val firstCommitIndex_opt: Option[Long] = None
}
case class DualFundedUnconfirmedFundingTx(sharedTx: SignedSharedTransaction, createdAt: BlockHeight, fundingParams: InteractiveTxParams) extends UnconfirmedFundingTx with NotLocked {
case class DualFundedUnconfirmedFundingTx(sharedTx: SignedSharedTransaction, createdAt: BlockHeight, fundingParams: InteractiveTxParams, firstCommitIndex_opt: Option[Long]) extends UnconfirmedFundingTx with NotLocked {
override val signedTx_opt: Option[Transaction] = sharedTx.signedTx_opt
override val localSigs_opt: Option[TxSignatures] = Some(sharedTx.localSigs)
}
case class ZeroconfPublishedFundingTx(tx: Transaction, localSigs_opt: Option[TxSignatures]) extends UnconfirmedFundingTx with Locked {
case class ZeroconfPublishedFundingTx(tx: Transaction, localSigs_opt: Option[TxSignatures], firstCommitIndex_opt: Option[Long]) extends UnconfirmedFundingTx with Locked {
override val signedTx_opt: Option[Transaction] = Some(tx)
}
case class ConfirmedFundingTx(tx: Transaction, localSigs_opt: Option[TxSignatures]) extends LocalFundingStatus with Locked {
case class ConfirmedFundingTx(tx: Transaction, localSigs_opt: Option[TxSignatures], firstCommitIndex_opt: Option[Long]) extends LocalFundingStatus with Locked {
override val signedTx_opt: Option[Transaction] = Some(tx)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,11 @@ case class Commitments(params: ChannelParams,
all.find(_.fundingTxId == fundingTxId).flatMap(_.localFundingStatus.localSigs_opt)
}

/** Return the index of the first remote commitment we signed spending the given funding transaction. */
def firstCommitIndex(fundingTxId: TxId): Option[Long] = {
all.find(_.fundingTxId == fundingTxId).flatMap(_.localFundingStatus.firstCommitIndex_opt)
}

/**
* Update the local/remote funding status
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1094,7 +1094,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with

case Event(msg: TxSignatures, d: DATA_NORMAL) =>
d.commitments.latest.localFundingStatus match {
case dfu@LocalFundingStatus.DualFundedUnconfirmedFundingTx(fundingTx: PartiallySignedSharedTransaction, _, _) if fundingTx.txId == msg.txId =>
case dfu@LocalFundingStatus.DualFundedUnconfirmedFundingTx(fundingTx: PartiallySignedSharedTransaction, _, _, _) if fundingTx.txId == msg.txId =>
// we already sent our tx_signatures
InteractiveTxSigningSession.addRemoteSigs(keyManager, d.commitments.params, dfu.fundingParams, fundingTx, msg) match {
case Left(cause) =>
Expand Down Expand Up @@ -1138,7 +1138,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
}

case Event(w: WatchPublishedTriggered, d: DATA_NORMAL) =>
val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid))
val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid), d.commitments.firstCommitIndex(w.tx.txid))
d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus) match {
case Right((commitments1, _)) =>
watchFundingConfirmed(w.tx.txid, Some(nodeParams.channelConf.minDepthBlocks), delay_opt = None)
Expand Down Expand Up @@ -1857,7 +1857,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
case d: DATA_NORMAL => d.spliceStatus match {
case SpliceStatus.SpliceWaitingForSigs(status) => Set(ChannelReestablishTlv.NextFundingTlv(status.fundingTx.txId))
case _ => d.commitments.latest.localFundingStatus match {
case LocalFundingStatus.DualFundedUnconfirmedFundingTx(fundingTx: PartiallySignedSharedTransaction, _, _) => Set(ChannelReestablishTlv.NextFundingTlv(fundingTx.txId))
case LocalFundingStatus.DualFundedUnconfirmedFundingTx(fundingTx: PartiallySignedSharedTransaction, _, _, _) => Set(ChannelReestablishTlv.NextFundingTlv(fundingTx.txId))
case _ => Set.empty
}
}
Expand Down Expand Up @@ -2241,11 +2241,11 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
// slightly before us. In that case, the WatchConfirmed may trigger first, and it would be inefficient to let the
// WatchPublished override our funding status: it will make us set a new WatchConfirmed that will instantly
// trigger and rewrite the funding status again.
val alreadyConfirmed = d.commitments.active.map(_.localFundingStatus).collect { case LocalFundingStatus.ConfirmedFundingTx(tx, _) => tx }.exists(_.txid == w.tx.txid)
val alreadyConfirmed = d.commitments.active.map(_.localFundingStatus).collect { case LocalFundingStatus.ConfirmedFundingTx(tx, _, _) => tx }.exists(_.txid == w.tx.txid)
if (alreadyConfirmed) {
stay()
} else {
val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid))
val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid), d.commitments.firstCommitIndex(w.tx.txid))
d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus) match {
case Right((commitments1, _)) =>
log.info(s"zero-conf funding txid=${w.tx.txid} has been published")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers {

case Event(w: WatchPublishedTriggered, d: DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED) =>
log.info("funding txid={} was successfully published for zero-conf channelId={}", w.tx.txid, d.channelId)
val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid))
val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid), d.commitments.firstCommitIndex(w.tx.txid))
d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus) match {
case Right((commitments1, _)) =>
// we still watch the funding tx for confirmation even if we can use the zero-conf channel right away
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers {
stay() using d.copy(deferred = Some(remoteChannelReady)) // no need to store, they will re-send if we get disconnected

case Event(w: WatchPublishedTriggered, d: DATA_WAIT_FOR_FUNDING_CONFIRMED) =>
val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, None)
val fundingStatus = LocalFundingStatus.ZeroconfPublishedFundingTx(w.tx, None, None)
d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus) match {
case Right((commitments1, _)) =>
log.info("funding txid={} was successfully published for zero-conf channelId={}", w.tx.txid, d.channelId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import fr.acinq.eclair.channel.Helpers.getRelayFees
import fr.acinq.eclair.channel.LocalFundingStatus.{ConfirmedFundingTx, DualFundedUnconfirmedFundingTx, SingleFundedUnconfirmedFundingTx}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.channel.fsm.Channel.{ANNOUNCEMENTS_MINCONF, BroadcastChannelUpdate, PeriodicRefresh, REFRESH_CHANNEL_UPDATE_INTERVAL}
import fr.acinq.eclair.db.RevokedHtlcInfoCleaner
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.wire.protocol.{AnnouncementSignatures, ChannelReady, ChannelReadyTlv, TlvStream}

import scala.concurrent.duration.{DurationInt, FiniteDuration}
Expand Down Expand Up @@ -81,8 +83,14 @@ trait CommonFundingHandlers extends CommonHandlers {
}
case _ => () // in the dual-funding case, we have already verified the funding tx
}
val fundingStatus = ConfirmedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid))
val fundingStatus = ConfirmedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid), d.commitments.firstCommitIndex(w.tx.txid))
context.system.eventStream.publish(TransactionConfirmed(d.channelId, remoteNodeId, w.tx))
// When a splice transaction confirms, it double-spends all the commitment transactions that only applied to the
// previous funding transaction. Our peer cannot publish the corresponding revoked commitments anymore, so we can
// clean-up the htlc data that we were storing for the matching penalty transactions.
fundingStatus.firstCommitIndex_opt.foreach {
commitIndex => context.system.eventStream.publish(RevokedHtlcInfoCleaner.ForgetHtlcInfos(d.channelId, commitIndex))
}
d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus).map {
case (commitments1, commitment) =>
// First of all, we watch the funding tx that is now confirmed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ object InteractiveTxSigningSession {
val localPerCommitmentPoint = nodeParams.channelKeyManager.commitmentPoint(channelKeyPath, localCommitIndex)
LocalCommit.fromCommitSig(nodeParams.channelKeyManager, channelParams, fundingTx.txId, fundingTxIndex, fundingParams.remoteFundingPubKey, commitInput, remoteCommitSig, localCommitIndex, unsignedLocalCommit.spec, localPerCommitmentPoint).map { signedLocalCommit =>
if (shouldSignFirst(fundingParams.isInitiator, channelParams, fundingTx.tx)) {
val fundingStatus = LocalFundingStatus.DualFundedUnconfirmedFundingTx(fundingTx, nodeParams.currentBlockHeight, fundingParams)
val fundingStatus = LocalFundingStatus.DualFundedUnconfirmedFundingTx(fundingTx, nodeParams.currentBlockHeight, fundingParams, Some(remoteCommit.index))
val commitment = Commitment(fundingTxIndex, fundingParams.remoteFundingPubKey, fundingStatus, RemoteFundingStatus.NotLocked, signedLocalCommit, remoteCommit, None)
SendingSigs(fundingStatus, commitment, fundingTx.localSigs)
} else {
Expand All @@ -988,7 +988,7 @@ object InteractiveTxSigningSession {
Left(f)
case Right(fullySignedTx) =>
log.info("interactive-tx fully signed with {} local inputs, {} remote inputs, {} local outputs and {} remote outputs", fullySignedTx.tx.localInputs.length, fullySignedTx.tx.remoteInputs.length, fullySignedTx.tx.localOutputs.length, fullySignedTx.tx.remoteOutputs.length)
val fundingStatus = LocalFundingStatus.DualFundedUnconfirmedFundingTx(fullySignedTx, nodeParams.currentBlockHeight, fundingParams)
val fundingStatus = LocalFundingStatus.DualFundedUnconfirmedFundingTx(fullySignedTx, nodeParams.currentBlockHeight, fundingParams, Some(remoteCommit.index))
val commitment = Commitment(fundingTxIndex, fundingParams.remoteFundingPubKey, fundingStatus, RemoteFundingStatus.NotLocked, signedLocalCommit, remoteCommit, None)
Right(SendingSigs(fundingStatus, commitment, fullySignedTx.localSigs))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ package fr.acinq.eclair.db

import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampSecond}
import fr.acinq.eclair.channel.PersistentChannelData
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.{CltvExpiry, Paginated}

trait ChannelsDb {

Expand All @@ -30,8 +30,13 @@ trait ChannelsDb {

def updateChannelMeta(channelId: ByteVector32, event: ChannelEvent.EventType): Unit

/** Mark a channel as closed, but keep it in the DB. */
def removeChannel(channelId: ByteVector32): Unit

/** Mark revoked HTLC information as obsolete. It will be removed from the DB once [[removeHtlcInfos]] is called. */
def forgetHtlcInfos(channelId: ByteVector32, beforeCommitIndex: Long): Unit

/** Remove up to batchSize obsolete revoked HTLC information. */
def removeHtlcInfos(batchSize: Int): Unit

def listLocalChannels(): Seq[PersistentChannelData]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ case class DualChannelsDb(primary: ChannelsDb, secondary: ChannelsDb) extends Ch
primary.removeChannel(channelId)
}

override def forgetHtlcInfos(channelId: ByteVector32, beforeCommitIndex: Long): Unit = {
runAsync(secondary.forgetHtlcInfos(channelId, beforeCommitIndex))
primary.forgetHtlcInfos(channelId, beforeCommitIndex)
}

override def removeHtlcInfos(batchSize: Int): Unit = {
runAsync(secondary.removeHtlcInfos(batchSize))
primary.removeHtlcInfos(batchSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,37 @@
package fr.acinq.eclair.db

import akka.actor.typed.Behavior
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.Behaviors
import fr.acinq.bitcoin.scalacompat.ByteVector32

import scala.concurrent.duration.FiniteDuration

/**
* When a channel is closed, we can remove the information about old HTLCs that was stored in the DB to punish revoked commitments.
* We potentially have millions of rows to delete per channel, and there is no rush to remove them.
* We don't want this to negatively impact active channels, so this actor deletes that data in small batches, at regular intervals.
* When a channel is closed or a splice transaction confirms, we can remove the information about old HTLCs that was
* stored in the DB to punish revoked commitments. We potentially have millions of rows to delete per channel, and there
* is no rush to remove them. We don't want this to negatively impact active channels, so this actor deletes that data
* in small batches, at regular intervals.
*/
object RevokedHtlcInfoCleaner {

// @formatter:off
sealed trait Command
case class ForgetHtlcInfos(channelId: ByteVector32, beforeCommitIndex: Long) extends Command
private case object DeleteBatch extends Command
// @formatter:on

case class Config(batchSize: Int, interval: FiniteDuration)

def apply(db: ChannelsDb, config: Config): Behavior[Command] = {
Behaviors.setup { _ =>
Behaviors.setup { context =>
context.system.eventStream ! EventStream.Subscribe(context.self)
Behaviors.withTimers { timers =>
timers.startTimerWithFixedDelay(DeleteBatch, config.interval)
Behaviors.receiveMessage {
case ForgetHtlcInfos(channelId, beforeCommitIndex) =>
db.forgetHtlcInfos(channelId, beforeCommitIndex)
Behaviors.same
case DeleteBatch =>
db.removeHtlcInfos(config.batchSize)
Behaviors.same
Expand Down
Loading

0 comments on commit ed12d7b

Please sign in to comment.