Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronously clean up obsolete HTLC info from DB #2705

Merged
merged 5 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,16 @@ eclair {
migrate-on-restart = false // migrate sqlite -> postgres on restart (only applies if sqlite is primary)
compare-on-restart = false // compare sqlite and postgres dbs on restart (only applies if sqlite is primary)
}
// During normal channel operation, we need to store information about past HTLCs to be able to punish our peer if
// they publish a revoked commitment. Once a channel closes or a splice transaction confirms, we can clean up past
// data (which reduces the size of our DB). Since there may be millions of rows to delete and we don't want to slow
// down the node, we delete those rows in batches at regular intervals.
revoked-htlc-info-cleaner {
// Number of rows to delete per batch: a higher value will clean up the DB faster, but may have a higher impact on performance.
batch-size = 50000
// Frequency at which batches of rows are deleted: a lower value will clean up the DB faster, but may have a higher impact on performance.
interval = 15 minutes
}
}

file-backup {
Expand Down
9 changes: 7 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
blockchainWatchdogThreshold: Int,
blockchainWatchdogSources: Seq[String],
onionMessageConfig: OnionMessageConfig,
purgeInvoicesInterval: Option[FiniteDuration]) {
purgeInvoicesInterval: Option[FiniteDuration],
revokedHtlcInfoCleanerConfig: RevokedHtlcInfoCleaner.Config) {
val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey

val nodeId: PublicKey = nodeKeyManager.nodeId
Expand Down Expand Up @@ -604,7 +605,11 @@ object NodeParams extends Logging {
timeout = FiniteDuration(config.getDuration("onion-messages.reply-timeout").getSeconds, TimeUnit.SECONDS),
maxAttempts = config.getInt("onion-messages.max-attempts"),
),
purgeInvoicesInterval = purgeInvoicesInterval
purgeInvoicesInterval = purgeInvoicesInterval,
revokedHtlcInfoCleanerConfig = RevokedHtlcInfoCleaner.Config(
batchSize = config.getInt("db.revoked-htlc-info-cleaner.batch-size"),
interval = FiniteDuration(config.getDuration("db.revoked-htlc-info-cleaner.interval").getSeconds, TimeUnit.SECONDS)
)
)
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package fr.acinq.eclair.channel

import akka.event.LoggingAdapter
import com.softwaremill.quicklens.ModifyPimp
import fr.acinq.bitcoin.scalacompat.Crypto.{PrivateKey, PublicKey}
import fr.acinq.bitcoin.scalacompat.{ByteVector32, ByteVector64, Crypto, Satoshi, SatoshiLong, Script, Transaction, TxId}
import fr.acinq.eclair.blockchain.fee.{FeeratePerByte, FeeratePerKw, FeeratesPerKw, OnChainFeeConf}
Expand Down Expand Up @@ -266,12 +265,16 @@ case class NextRemoteCommit(sig: CommitSig, commit: RemoteCommit)
/**
* A minimal commitment for a given funding tx.
*
* @param fundingTxIndex index of the funding tx in the life of the channel:
* - initial funding tx has index 0
* - splice txs have index 1, 2, ...
* - commitments that share the same index are rbfed
* @param fundingTxIndex index of the funding tx in the life of the channel:
* - initial funding tx has index 0
* - splice txs have index 1, 2, ...
* - commitments that share the same index are rbfed
* @param firstRemoteCommitIndex index of the first remote commitment we signed that spends the funding transaction.
* Once the funding transaction confirms, our peer won't be able to publish revoked
* commitments with lower commitment indices.
*/
case class Commitment(fundingTxIndex: Long,
firstRemoteCommitIndex: Long,
remoteFundingPubKey: PublicKey,
localFundingStatus: LocalFundingStatus, remoteFundingStatus: RemoteFundingStatus,
localCommit: LocalCommit, remoteCommit: RemoteCommit, nextRemoteCommit_opt: Option[NextRemoteCommit]) {
Expand Down Expand Up @@ -730,6 +733,7 @@ object Commitment {
/** Subset of Commitments when we want to work with a single, specific commitment. */
case class FullCommitment(params: ChannelParams, changes: CommitmentChanges,
fundingTxIndex: Long,
firstRemoteCommitIndex: Long,
remoteFundingPubKey: PublicKey,
localFundingStatus: LocalFundingStatus, remoteFundingStatus: RemoteFundingStatus,
localCommit: LocalCommit, remoteCommit: RemoteCommit, nextRemoteCommit_opt: Option[NextRemoteCommit]) {
Expand All @@ -739,7 +743,7 @@ case class FullCommitment(params: ChannelParams, changes: CommitmentChanges,
val commitInput = localCommit.commitTxAndRemoteSig.commitTx.input
val fundingTxId = commitInput.outPoint.txid
val capacity = commitInput.txOut.amount
val commitment = Commitment(fundingTxIndex, remoteFundingPubKey, localFundingStatus, remoteFundingStatus, localCommit, remoteCommit, nextRemoteCommit_opt)
val commitment = Commitment(fundingTxIndex, firstRemoteCommitIndex, remoteFundingPubKey, localFundingStatus, remoteFundingStatus, localCommit, remoteCommit, nextRemoteCommit_opt)

def localChannelReserve: Satoshi = commitment.localChannelReserve(params)

Expand Down Expand Up @@ -803,7 +807,7 @@ case class Commitments(params: ChannelParams,
lazy val availableBalanceForReceive: MilliSatoshi = active.map(_.availableBalanceForReceive(params, changes)).min

// We always use the last commitment that was created, to make sure we never go back in time.
val latest = FullCommitment(params, changes, active.head.fundingTxIndex, active.head.remoteFundingPubKey, active.head.localFundingStatus, active.head.remoteFundingStatus, active.head.localCommit, active.head.remoteCommit, active.head.nextRemoteCommit_opt)
val latest = FullCommitment(params, changes, active.head.fundingTxIndex, active.head.firstRemoteCommitIndex, active.head.remoteFundingPubKey, active.head.localFundingStatus, active.head.remoteFundingStatus, active.head.localCommit, active.head.remoteCommit, active.head.nextRemoteCommit_opt)

val all: Seq[Commitment] = active ++ inactive

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package fr.acinq.eclair.channel.fsm
import akka.actor.Status
import akka.actor.typed.scaladsl.adapter.actorRefAdapter
import akka.pattern.pipe
import fr.acinq.bitcoin.scalacompat.{SatoshiLong, Script, TxHash}
import fr.acinq.bitcoin.scalacompat.{SatoshiLong, Script}
import fr.acinq.eclair.blockchain.OnChainWallet.MakeFundingTxResponse
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._
import fr.acinq.eclair.channel.Helpers.Funding
Expand Down Expand Up @@ -277,6 +277,7 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers {
)
val commitment = Commitment(
fundingTxIndex = 0,
firstRemoteCommitIndex = 0,
remoteFundingPubKey = remoteFundingPubKey,
localFundingStatus = SingleFundedUnconfirmedFundingTx(None),
remoteFundingStatus = RemoteFundingStatus.NotLocked,
Expand Down Expand Up @@ -323,6 +324,7 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers {
case Success(_) =>
val commitment = Commitment(
fundingTxIndex = 0,
firstRemoteCommitIndex = 0,
remoteFundingPubKey = remoteFundingPubKey,
localFundingStatus = SingleFundedUnconfirmedFundingTx(Some(fundingTx)),
remoteFundingStatus = RemoteFundingStatus.NotLocked,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import fr.acinq.eclair.channel.Helpers.getRelayFees
import fr.acinq.eclair.channel.LocalFundingStatus.{ConfirmedFundingTx, DualFundedUnconfirmedFundingTx, SingleFundedUnconfirmedFundingTx}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.channel.fsm.Channel.{ANNOUNCEMENTS_MINCONF, BroadcastChannelUpdate, PeriodicRefresh, REFRESH_CHANNEL_UPDATE_INTERVAL}
import fr.acinq.eclair.db.RevokedHtlcInfoCleaner
import fr.acinq.eclair.wire.protocol.{AnnouncementSignatures, ChannelReady, ChannelReadyTlv, TlvStream}

import scala.concurrent.duration.{DurationInt, FiniteDuration}
Expand Down Expand Up @@ -83,6 +84,12 @@ trait CommonFundingHandlers extends CommonHandlers {
}
val fundingStatus = ConfirmedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid))
context.system.eventStream.publish(TransactionConfirmed(d.channelId, remoteNodeId, w.tx))
// When a splice transaction confirms, it double-spends all the commitment transactions that only applied to the
// previous funding transaction. Our peer cannot publish the corresponding revoked commitments anymore, so we can
// clean-up the htlc data that we were storing for the matching penalty transactions.
d.commitments.all.find(_.fundingTxId == w.tx.txid).map(_.firstRemoteCommitIndex).foreach {
commitIndex => context.system.eventStream.publish(RevokedHtlcInfoCleaner.ForgetHtlcInfos(d.channelId, beforeCommitIndex = commitIndex))
}
d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus).map {
case (commitments1, commitment) =>
// First of all, we watch the funding tx that is now confirmed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ object InteractiveTxSigningSession {
LocalCommit.fromCommitSig(nodeParams.channelKeyManager, channelParams, fundingTx.txId, fundingTxIndex, fundingParams.remoteFundingPubKey, commitInput, remoteCommitSig, localCommitIndex, unsignedLocalCommit.spec, localPerCommitmentPoint).map { signedLocalCommit =>
if (shouldSignFirst(fundingParams.isInitiator, channelParams, fundingTx.tx)) {
val fundingStatus = LocalFundingStatus.DualFundedUnconfirmedFundingTx(fundingTx, nodeParams.currentBlockHeight, fundingParams)
val commitment = Commitment(fundingTxIndex, fundingParams.remoteFundingPubKey, fundingStatus, RemoteFundingStatus.NotLocked, signedLocalCommit, remoteCommit, None)
val commitment = Commitment(fundingTxIndex, remoteCommit.index, fundingParams.remoteFundingPubKey, fundingStatus, RemoteFundingStatus.NotLocked, signedLocalCommit, remoteCommit, None)
SendingSigs(fundingStatus, commitment, fundingTx.localSigs)
} else {
this.copy(localCommit = Right(signedLocalCommit))
Expand All @@ -989,7 +989,7 @@ object InteractiveTxSigningSession {
case Right(fullySignedTx) =>
log.info("interactive-tx fully signed with {} local inputs, {} remote inputs, {} local outputs and {} remote outputs", fullySignedTx.tx.localInputs.length, fullySignedTx.tx.remoteInputs.length, fullySignedTx.tx.localOutputs.length, fullySignedTx.tx.remoteOutputs.length)
val fundingStatus = LocalFundingStatus.DualFundedUnconfirmedFundingTx(fullySignedTx, nodeParams.currentBlockHeight, fundingParams)
val commitment = Commitment(fundingTxIndex, fundingParams.remoteFundingPubKey, fundingStatus, RemoteFundingStatus.NotLocked, signedLocalCommit, remoteCommit, None)
val commitment = Commitment(fundingTxIndex, remoteCommit.index, fundingParams.remoteFundingPubKey, fundingStatus, RemoteFundingStatus.NotLocked, signedLocalCommit, remoteCommit, None)
Right(SendingSigs(fundingStatus, commitment, fullySignedTx.localSigs))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ package fr.acinq.eclair.db

import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampSecond}
import fr.acinq.eclair.channel.PersistentChannelData
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.{CltvExpiry, Paginated}

trait ChannelsDb {

Expand All @@ -30,8 +30,15 @@ trait ChannelsDb {

def updateChannelMeta(channelId: ByteVector32, event: ChannelEvent.EventType): Unit

/** Mark a channel as closed, but keep it in the DB. */
def removeChannel(channelId: ByteVector32): Unit

/** Mark revoked HTLC information as obsolete. It will be removed from the DB once [[removeHtlcInfos]] is called. */
def markHtlcInfosForRemoval(channelId: ByteVector32, beforeCommitIndex: Long): Unit

/** Remove up to batchSize obsolete revoked HTLC information. */
def removeHtlcInfos(batchSize: Int): Unit

def listLocalChannels(): Seq[PersistentChannelData]

def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package fr.acinq.eclair.db

import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter.ClassicActorContextOps
import akka.actor.{Actor, DiagnosticActorLogging, Props}
import akka.event.Logging.MDC
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
Expand All @@ -33,8 +36,10 @@ import fr.acinq.eclair.{Logs, NodeParams}
*/
class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorLogging {

val auditDb: AuditDb = nodeParams.db.audit
val channelsDb: ChannelsDb = nodeParams.db.channels
private val auditDb: AuditDb = nodeParams.db.audit
private val channelsDb: ChannelsDb = nodeParams.db.channels

context.spawn(Behaviors.supervise(RevokedHtlcInfoCleaner(channelsDb, nodeParams.revokedHtlcInfoCleanerConfig)).onFailure(SupervisorStrategy.restart), name = "revoked-htlc-info-cleaner")

context.system.eventStream.subscribe(self, classOf[PaymentSent])
context.system.eventStream.subscribe(self, classOf[PaymentFailed])
Expand Down
10 changes: 10 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,16 @@ case class DualChannelsDb(primary: ChannelsDb, secondary: ChannelsDb) extends Ch
primary.removeChannel(channelId)
}

override def markHtlcInfosForRemoval(channelId: ByteVector32, beforeCommitIndex: Long): Unit = {
runAsync(secondary.markHtlcInfosForRemoval(channelId, beforeCommitIndex))
primary.markHtlcInfosForRemoval(channelId, beforeCommitIndex)
}

override def removeHtlcInfos(batchSize: Int): Unit = {
runAsync(secondary.removeHtlcInfos(batchSize))
primary.removeHtlcInfos(batchSize)
}

override def listLocalChannels(): Seq[PersistentChannelData] = {
runAsync(secondary.listLocalChannels())
primary.listLocalChannels()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2023 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.db

import akka.actor.typed.Behavior
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.Behaviors
import fr.acinq.bitcoin.scalacompat.ByteVector32

import scala.concurrent.duration.FiniteDuration

/**
* When a channel is closed or a splice transaction confirms, we can remove the information about old HTLCs that was
* stored in the DB to punish revoked commitments. We potentially have millions of rows to delete per channel, and there
* is no rush to remove them. We don't want this to negatively impact active channels, so this actor deletes that data
* in small batches, at regular intervals.
*/
object RevokedHtlcInfoCleaner {

// @formatter:off
sealed trait Command
case class ForgetHtlcInfos(channelId: ByteVector32, beforeCommitIndex: Long) extends Command
private case object DeleteBatch extends Command
// @formatter:on

case class Config(batchSize: Int, interval: FiniteDuration)

def apply(db: ChannelsDb, config: Config): Behavior[Command] = {
Behaviors.setup { context =>
context.system.eventStream ! EventStream.Subscribe(context.self)
Behaviors.withTimers { timers =>
timers.startTimerWithFixedDelay(DeleteBatch, config.interval)
Behaviors.receiveMessage {
case ForgetHtlcInfos(channelId, beforeCommitIndex) =>
db.markHtlcInfosForRemoval(channelId, beforeCommitIndex)
Behaviors.same
case DeleteBatch =>
db.removeHtlcInfos(config.batchSize)
Behaviors.same
}
}
}
}

}
Loading
Loading