From f41bd22d69f870856301d59c9dfecc240ba2770e Mon Sep 17 00:00:00 2001 From: Bastien Teinturier <31281497+t-bast@users.noreply.github.com> Date: Wed, 14 Feb 2024 14:06:17 +0100 Subject: [PATCH] Asynchronously clean up obsolete HTLC info from DB (#2705) When a channel is closed, we can forget the data from historical HTLCs sent and received through that channel (which is otherwise required to punish cheating attempts by our peer). We previously synchronously removed that data from the DB when the closing transaction confirmed. However, this could create performance issues as the `htlc_infos` table can be very large for busy nodes and many concurrent writes may be happening at the same time. We don't need to get rid of this data immediately: we only want to remove it to avoid degrading the performance of active channels that read and write to the `htlc_infos` table. We now mark channels as closed in a dedicated table, and run a background actor that deletes batches of obsolete htlc data at regular intervals. This ensures that the table is eventually cleaned up, without impacting the performance of active channels. When a splice transaction confirms, all the revoked commitment transactions that only applied to the previous funding transaction cannot be published anymore, because the previous funding output has already been spent. We can thus forget all the historical HTLCs that were included in those commitments, because we will never need to generate the corresponding penalty transactions. This ensures that the growth of our DB is bounded, and will shrink every time a splice transaction is confirmed. Fixes #2610, #2702 and #2740 --- eclair-core/src/main/resources/reference.conf | 10 ++ .../scala/fr/acinq/eclair/NodeParams.scala | 9 +- .../fr/acinq/eclair/channel/Commitments.scala | 18 ++- .../channel/fsm/ChannelOpenSingleFunded.scala | 4 +- .../channel/fsm/CommonFundingHandlers.scala | 7 + .../channel/fund/InteractiveTxBuilder.scala | 4 +- .../scala/fr/acinq/eclair/db/ChannelsDb.scala | 9 +- .../fr/acinq/eclair/db/DbEventHandler.scala | 9 +- .../fr/acinq/eclair/db/DualDatabases.scala | 10 ++ .../eclair/db/RevokedHtlcInfoCleaner.scala | 59 ++++++++ .../fr/acinq/eclair/db/pg/PgChannelsDb.scala | 59 +++++++- .../eclair/db/sqlite/SqliteChannelsDb.scala | 89 +++++++++--- .../channel/version0/ChannelTypes0.scala | 1 + .../channel/version3/ChannelTypes3.scala | 2 +- .../channel/version4/ChannelCodecs4.scala | 136 +++++++++++++++--- .../scala/fr/acinq/eclair/TestConstants.scala | 7 +- .../eclair/channel/CommitmentsSpec.scala | 4 +- .../states/e/NormalSplicesStateSpec.scala | 79 +++++----- .../fr/acinq/eclair/db/ChannelsDbSpec.scala | 79 ++++++++-- .../db/RevokedHtlcInfoCleanerSpec.scala | 84 +++++++++++ .../eclair/json/JsonSerializersSpec.scala | 2 +- .../eclair/payment/PaymentPacketSpec.scala | 2 +- .../internal/channel/ChannelCodecsSpec.scala | 4 +- .../channel/version4/ChannelCodecs4Spec.scala | 2 +- 24 files changed, 578 insertions(+), 111 deletions(-) create mode 100644 eclair-core/src/main/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleaner.scala create mode 100644 eclair-core/src/test/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleanerSpec.scala diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index 89ae71d876..5113fe81d4 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -485,6 +485,16 @@ eclair { migrate-on-restart = false // migrate sqlite -> postgres on restart (only applies if sqlite is primary) compare-on-restart = false // compare sqlite and postgres dbs on restart (only applies if sqlite is primary) } + // During normal channel operation, we need to store information about past HTLCs to be able to punish our peer if + // they publish a revoked commitment. Once a channel closes or a splice transaction confirms, we can clean up past + // data (which reduces the size of our DB). Since there may be millions of rows to delete and we don't want to slow + // down the node, we delete those rows in batches at regular intervals. + revoked-htlc-info-cleaner { + // Number of rows to delete per batch: a higher value will clean up the DB faster, but may have a higher impact on performance. + batch-size = 50000 + // Frequency at which batches of rows are deleted: a lower value will clean up the DB faster, but may have a higher impact on performance. + interval = 15 minutes + } } file-backup { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index d77aab4687..86cc13e091 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -86,7 +86,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager, blockchainWatchdogThreshold: Int, blockchainWatchdogSources: Seq[String], onionMessageConfig: OnionMessageConfig, - purgeInvoicesInterval: Option[FiniteDuration]) { + purgeInvoicesInterval: Option[FiniteDuration], + revokedHtlcInfoCleanerConfig: RevokedHtlcInfoCleaner.Config) { val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey val nodeId: PublicKey = nodeKeyManager.nodeId @@ -605,7 +606,11 @@ object NodeParams extends Logging { timeout = FiniteDuration(config.getDuration("onion-messages.reply-timeout").getSeconds, TimeUnit.SECONDS), maxAttempts = config.getInt("onion-messages.max-attempts"), ), - purgeInvoicesInterval = purgeInvoicesInterval + purgeInvoicesInterval = purgeInvoicesInterval, + revokedHtlcInfoCleanerConfig = RevokedHtlcInfoCleaner.Config( + batchSize = config.getInt("db.revoked-htlc-info-cleaner.batch-size"), + interval = FiniteDuration(config.getDuration("db.revoked-htlc-info-cleaner.interval").getSeconds, TimeUnit.SECONDS) + ) ) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala index ce2c020ebd..3126f23318 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala @@ -1,7 +1,6 @@ package fr.acinq.eclair.channel import akka.event.LoggingAdapter -import com.softwaremill.quicklens.ModifyPimp import fr.acinq.bitcoin.scalacompat.Crypto.{PrivateKey, PublicKey} import fr.acinq.bitcoin.scalacompat.{ByteVector32, ByteVector64, Crypto, Satoshi, SatoshiLong, Script, Transaction, TxId} import fr.acinq.eclair.blockchain.fee.{FeeratePerByte, FeeratePerKw, FeeratesPerKw, OnChainFeeConf} @@ -266,12 +265,16 @@ case class NextRemoteCommit(sig: CommitSig, commit: RemoteCommit) /** * A minimal commitment for a given funding tx. * - * @param fundingTxIndex index of the funding tx in the life of the channel: - * - initial funding tx has index 0 - * - splice txs have index 1, 2, ... - * - commitments that share the same index are rbfed + * @param fundingTxIndex index of the funding tx in the life of the channel: + * - initial funding tx has index 0 + * - splice txs have index 1, 2, ... + * - commitments that share the same index are rbfed + * @param firstRemoteCommitIndex index of the first remote commitment we signed that spends the funding transaction. + * Once the funding transaction confirms, our peer won't be able to publish revoked + * commitments with lower commitment indices. */ case class Commitment(fundingTxIndex: Long, + firstRemoteCommitIndex: Long, remoteFundingPubKey: PublicKey, localFundingStatus: LocalFundingStatus, remoteFundingStatus: RemoteFundingStatus, localCommit: LocalCommit, remoteCommit: RemoteCommit, nextRemoteCommit_opt: Option[NextRemoteCommit]) { @@ -730,6 +733,7 @@ object Commitment { /** Subset of Commitments when we want to work with a single, specific commitment. */ case class FullCommitment(params: ChannelParams, changes: CommitmentChanges, fundingTxIndex: Long, + firstRemoteCommitIndex: Long, remoteFundingPubKey: PublicKey, localFundingStatus: LocalFundingStatus, remoteFundingStatus: RemoteFundingStatus, localCommit: LocalCommit, remoteCommit: RemoteCommit, nextRemoteCommit_opt: Option[NextRemoteCommit]) { @@ -739,7 +743,7 @@ case class FullCommitment(params: ChannelParams, changes: CommitmentChanges, val commitInput = localCommit.commitTxAndRemoteSig.commitTx.input val fundingTxId = commitInput.outPoint.txid val capacity = commitInput.txOut.amount - val commitment = Commitment(fundingTxIndex, remoteFundingPubKey, localFundingStatus, remoteFundingStatus, localCommit, remoteCommit, nextRemoteCommit_opt) + val commitment = Commitment(fundingTxIndex, firstRemoteCommitIndex, remoteFundingPubKey, localFundingStatus, remoteFundingStatus, localCommit, remoteCommit, nextRemoteCommit_opt) def localChannelReserve: Satoshi = commitment.localChannelReserve(params) @@ -803,7 +807,7 @@ case class Commitments(params: ChannelParams, lazy val availableBalanceForReceive: MilliSatoshi = active.map(_.availableBalanceForReceive(params, changes)).min // We always use the last commitment that was created, to make sure we never go back in time. - val latest = FullCommitment(params, changes, active.head.fundingTxIndex, active.head.remoteFundingPubKey, active.head.localFundingStatus, active.head.remoteFundingStatus, active.head.localCommit, active.head.remoteCommit, active.head.nextRemoteCommit_opt) + val latest = FullCommitment(params, changes, active.head.fundingTxIndex, active.head.firstRemoteCommitIndex, active.head.remoteFundingPubKey, active.head.localFundingStatus, active.head.remoteFundingStatus, active.head.localCommit, active.head.remoteCommit, active.head.nextRemoteCommit_opt) val all: Seq[Commitment] = active ++ inactive diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunded.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunded.scala index 0740907597..024738c32c 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunded.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunded.scala @@ -19,7 +19,7 @@ package fr.acinq.eclair.channel.fsm import akka.actor.Status import akka.actor.typed.scaladsl.adapter.actorRefAdapter import akka.pattern.pipe -import fr.acinq.bitcoin.scalacompat.{SatoshiLong, Script, TxHash} +import fr.acinq.bitcoin.scalacompat.{SatoshiLong, Script} import fr.acinq.eclair.blockchain.OnChainWallet.MakeFundingTxResponse import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._ import fr.acinq.eclair.channel.Helpers.Funding @@ -277,6 +277,7 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers { ) val commitment = Commitment( fundingTxIndex = 0, + firstRemoteCommitIndex = 0, remoteFundingPubKey = remoteFundingPubKey, localFundingStatus = SingleFundedUnconfirmedFundingTx(None), remoteFundingStatus = RemoteFundingStatus.NotLocked, @@ -323,6 +324,7 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers { case Success(_) => val commitment = Commitment( fundingTxIndex = 0, + firstRemoteCommitIndex = 0, remoteFundingPubKey = remoteFundingPubKey, localFundingStatus = SingleFundedUnconfirmedFundingTx(Some(fundingTx)), remoteFundingStatus = RemoteFundingStatus.NotLocked, diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonFundingHandlers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonFundingHandlers.scala index f846e44a06..e827205890 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonFundingHandlers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonFundingHandlers.scala @@ -26,6 +26,7 @@ import fr.acinq.eclair.channel.Helpers.getRelayFees import fr.acinq.eclair.channel.LocalFundingStatus.{ConfirmedFundingTx, DualFundedUnconfirmedFundingTx, SingleFundedUnconfirmedFundingTx} import fr.acinq.eclair.channel._ import fr.acinq.eclair.channel.fsm.Channel.{ANNOUNCEMENTS_MINCONF, BroadcastChannelUpdate, PeriodicRefresh, REFRESH_CHANNEL_UPDATE_INTERVAL} +import fr.acinq.eclair.db.RevokedHtlcInfoCleaner import fr.acinq.eclair.wire.protocol.{AnnouncementSignatures, ChannelReady, ChannelReadyTlv, TlvStream} import scala.concurrent.duration.{DurationInt, FiniteDuration} @@ -83,6 +84,12 @@ trait CommonFundingHandlers extends CommonHandlers { } val fundingStatus = ConfirmedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid)) context.system.eventStream.publish(TransactionConfirmed(d.channelId, remoteNodeId, w.tx)) + // When a splice transaction confirms, it double-spends all the commitment transactions that only applied to the + // previous funding transaction. Our peer cannot publish the corresponding revoked commitments anymore, so we can + // clean-up the htlc data that we were storing for the matching penalty transactions. + d.commitments.all.find(_.fundingTxId == w.tx.txid).map(_.firstRemoteCommitIndex).foreach { + commitIndex => context.system.eventStream.publish(RevokedHtlcInfoCleaner.ForgetHtlcInfos(d.channelId, beforeCommitIndex = commitIndex)) + } d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus).map { case (commitments1, commitment) => // First of all, we watch the funding tx that is now confirmed. diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala index 2d4afcbc94..d5b8f12c40 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala @@ -964,7 +964,7 @@ object InteractiveTxSigningSession { LocalCommit.fromCommitSig(nodeParams.channelKeyManager, channelParams, fundingTx.txId, fundingTxIndex, fundingParams.remoteFundingPubKey, commitInput, remoteCommitSig, localCommitIndex, unsignedLocalCommit.spec, localPerCommitmentPoint).map { signedLocalCommit => if (shouldSignFirst(fundingParams.isInitiator, channelParams, fundingTx.tx)) { val fundingStatus = LocalFundingStatus.DualFundedUnconfirmedFundingTx(fundingTx, nodeParams.currentBlockHeight, fundingParams) - val commitment = Commitment(fundingTxIndex, fundingParams.remoteFundingPubKey, fundingStatus, RemoteFundingStatus.NotLocked, signedLocalCommit, remoteCommit, None) + val commitment = Commitment(fundingTxIndex, remoteCommit.index, fundingParams.remoteFundingPubKey, fundingStatus, RemoteFundingStatus.NotLocked, signedLocalCommit, remoteCommit, None) SendingSigs(fundingStatus, commitment, fundingTx.localSigs) } else { this.copy(localCommit = Right(signedLocalCommit)) @@ -989,7 +989,7 @@ object InteractiveTxSigningSession { case Right(fullySignedTx) => log.info("interactive-tx fully signed with {} local inputs, {} remote inputs, {} local outputs and {} remote outputs", fullySignedTx.tx.localInputs.length, fullySignedTx.tx.remoteInputs.length, fullySignedTx.tx.localOutputs.length, fullySignedTx.tx.remoteOutputs.length) val fundingStatus = LocalFundingStatus.DualFundedUnconfirmedFundingTx(fullySignedTx, nodeParams.currentBlockHeight, fundingParams) - val commitment = Commitment(fundingTxIndex, fundingParams.remoteFundingPubKey, fundingStatus, RemoteFundingStatus.NotLocked, signedLocalCommit, remoteCommit, None) + val commitment = Commitment(fundingTxIndex, remoteCommit.index, fundingParams.remoteFundingPubKey, fundingStatus, RemoteFundingStatus.NotLocked, signedLocalCommit, remoteCommit, None) Right(SendingSigs(fundingStatus, commitment, fullySignedTx.localSigs)) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala index c312c9786d..6f1f34ecce 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala @@ -18,9 +18,9 @@ package fr.acinq.eclair.db import fr.acinq.bitcoin.scalacompat.ByteVector32 import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey -import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampSecond} import fr.acinq.eclair.channel.PersistentChannelData import fr.acinq.eclair.db.DbEventHandler.ChannelEvent +import fr.acinq.eclair.{CltvExpiry, Paginated} trait ChannelsDb { @@ -30,8 +30,15 @@ trait ChannelsDb { def updateChannelMeta(channelId: ByteVector32, event: ChannelEvent.EventType): Unit + /** Mark a channel as closed, but keep it in the DB. */ def removeChannel(channelId: ByteVector32): Unit + /** Mark revoked HTLC information as obsolete. It will be removed from the DB once [[removeHtlcInfos]] is called. */ + def markHtlcInfosForRemoval(channelId: ByteVector32, beforeCommitIndex: Long): Unit + + /** Remove up to batchSize obsolete revoked HTLC information. */ + def removeHtlcInfos(batchSize: Int): Unit + def listLocalChannels(): Seq[PersistentChannelData] def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala index 1b7eea5aab..e241db7a47 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala @@ -16,6 +16,9 @@ package fr.acinq.eclair.db +import akka.actor.typed.SupervisorStrategy +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.adapter.ClassicActorContextOps import akka.actor.{Actor, DiagnosticActorLogging, Props} import akka.event.Logging.MDC import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey @@ -33,8 +36,10 @@ import fr.acinq.eclair.{Logs, NodeParams} */ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorLogging { - val auditDb: AuditDb = nodeParams.db.audit - val channelsDb: ChannelsDb = nodeParams.db.channels + private val auditDb: AuditDb = nodeParams.db.audit + private val channelsDb: ChannelsDb = nodeParams.db.channels + + context.spawn(Behaviors.supervise(RevokedHtlcInfoCleaner(channelsDb, nodeParams.revokedHtlcInfoCleanerConfig)).onFailure(SupervisorStrategy.restart), name = "revoked-htlc-info-cleaner") context.system.eventStream.subscribe(self, classOf[PaymentSent]) context.system.eventStream.subscribe(self, classOf[PaymentFailed]) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala index 17078e0d91..bf6eb6e669 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala @@ -231,6 +231,16 @@ case class DualChannelsDb(primary: ChannelsDb, secondary: ChannelsDb) extends Ch primary.removeChannel(channelId) } + override def markHtlcInfosForRemoval(channelId: ByteVector32, beforeCommitIndex: Long): Unit = { + runAsync(secondary.markHtlcInfosForRemoval(channelId, beforeCommitIndex)) + primary.markHtlcInfosForRemoval(channelId, beforeCommitIndex) + } + + override def removeHtlcInfos(batchSize: Int): Unit = { + runAsync(secondary.removeHtlcInfos(batchSize)) + primary.removeHtlcInfos(batchSize) + } + override def listLocalChannels(): Seq[PersistentChannelData] = { runAsync(secondary.listLocalChannels()) primary.listLocalChannels() diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleaner.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleaner.scala new file mode 100644 index 0000000000..98cc460914 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleaner.scala @@ -0,0 +1,59 @@ +/* + * Copyright 2023 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.db + +import akka.actor.typed.Behavior +import akka.actor.typed.eventstream.EventStream +import akka.actor.typed.scaladsl.Behaviors +import fr.acinq.bitcoin.scalacompat.ByteVector32 + +import scala.concurrent.duration.FiniteDuration + +/** + * When a channel is closed or a splice transaction confirms, we can remove the information about old HTLCs that was + * stored in the DB to punish revoked commitments. We potentially have millions of rows to delete per channel, and there + * is no rush to remove them. We don't want this to negatively impact active channels, so this actor deletes that data + * in small batches, at regular intervals. + */ +object RevokedHtlcInfoCleaner { + + // @formatter:off + sealed trait Command + case class ForgetHtlcInfos(channelId: ByteVector32, beforeCommitIndex: Long) extends Command + private case object DeleteBatch extends Command + // @formatter:on + + case class Config(batchSize: Int, interval: FiniteDuration) + + def apply(db: ChannelsDb, config: Config): Behavior[Command] = { + Behaviors.setup { context => + context.system.eventStream ! EventStream.Subscribe(context.self) + Behaviors.withTimers { timers => + timers.startTimerWithFixedDelay(DeleteBatch, config.interval) + Behaviors.receiveMessage { + case ForgetHtlcInfos(channelId, beforeCommitIndex) => + db.markHtlcInfosForRemoval(channelId, beforeCommitIndex) + Behaviors.same + case DeleteBatch => + db.removeHtlcInfos(config.batchSize) + Behaviors.same + } + } + } + } + +} diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala index a2f852befc..7a35e059d0 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala @@ -36,7 +36,7 @@ import javax.sql.DataSource object PgChannelsDb { val DB_NAME = "channels" - val CURRENT_VERSION = 8 + val CURRENT_VERSION = 9 } class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb with Logging { @@ -65,7 +65,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit statement.executeUpdate("ALTER TABLE local_channels ALTER COLUMN last_connected_timestamp SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + last_connected_timestamp * interval '1 millisecond'") statement.executeUpdate("ALTER TABLE local_channels ALTER COLUMN closed_timestamp SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + closed_timestamp * interval '1 millisecond'") - statement.executeUpdate("ALTER TABLE htlc_infos ALTER COLUMN commitment_number SET DATA TYPE BIGINT USING commitment_number::BIGINT") + statement.executeUpdate("ALTER TABLE htlc_infos ALTER COLUMN commitment_number SET DATA TYPE BIGINT USING commitment_number::BIGINT") } def migration45(statement: Statement): Unit = { @@ -115,12 +115,17 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit statement.executeUpdate("CREATE INDEX local_channels_remote_node_id_idx ON local.channels(remote_node_id)") } + def migration89(statement: Statement): Unit = { + statement.executeUpdate("CREATE TABLE local.htlc_infos_to_remove (channel_id TEXT NOT NULL PRIMARY KEY, before_commitment_number BIGINT NOT NULL)") + } + getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local") statement.executeUpdate("CREATE TABLE local.channels (channel_id TEXT NOT NULL PRIMARY KEY, remote_node_id TEXT NOT NULL, data BYTEA NOT NULL, json JSONB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT FALSE, created_timestamp TIMESTAMP WITH TIME ZONE, last_payment_sent_timestamp TIMESTAMP WITH TIME ZONE, last_payment_received_timestamp TIMESTAMP WITH TIME ZONE, last_connected_timestamp TIMESTAMP WITH TIME ZONE, closed_timestamp TIMESTAMP WITH TIME ZONE)") statement.executeUpdate("CREATE TABLE local.htlc_infos (channel_id TEXT NOT NULL, commitment_number BIGINT NOT NULL, payment_hash TEXT NOT NULL, cltv_expiry BIGINT NOT NULL, FOREIGN KEY(channel_id) REFERENCES local.channels(channel_id))") + statement.executeUpdate("CREATE TABLE local.htlc_infos_to_remove (channel_id TEXT NOT NULL PRIMARY KEY, before_commitment_number BIGINT NOT NULL)") statement.executeUpdate("CREATE INDEX local_channels_type_idx ON local.channels ((json->>'type'))") statement.executeUpdate("CREATE INDEX local_channels_remote_node_id_idx ON local.channels(remote_node_id)") @@ -145,6 +150,9 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit if (v < 8) { migration78(statement) } + if (v < 9) { + migration89(statement) + } case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -225,10 +233,9 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit statement.executeUpdate() } - using(pg.prepareStatement("DELETE FROM local.htlc_infos WHERE channel_id=?")) { statement => - statement.setString(1, channelId.toHex) - statement.executeUpdate() - } + // The htlc_infos may contain millions of rows, which is very expensive to delete synchronously. + // We instead run an asynchronous job to clean up that data in small batches. + markHtlcInfosForRemoval(channelId, Long.MaxValue) using(pg.prepareStatement("UPDATE local.channels SET is_closed=TRUE, closed_timestamp=? WHERE channel_id=?")) { statement => statement.setTimestamp(1, Timestamp.from(Instant.now())) @@ -238,6 +245,46 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit } } + override def markHtlcInfosForRemoval(channelId: ByteVector32, beforeCommitIndex: Long): Unit = withMetrics("channels/forget-htlc-infos", DbBackends.Postgres) { + withLock { pg => + using(pg.prepareStatement("INSERT INTO local.htlc_infos_to_remove (channel_id, before_commitment_number) VALUES(?, ?) ON CONFLICT (channel_id) DO UPDATE SET before_commitment_number = EXCLUDED.before_commitment_number")) { statement => + statement.setString(1, channelId.toHex) + statement.setLong(2, beforeCommitIndex) + statement.executeUpdate() + } + } + } + + override def removeHtlcInfos(batchSize: Int): Unit = withMetrics("channels/remove-htlc-infos", DbBackends.Postgres) { + withLock { pg => + // Check if there are channels that need to be cleaned up. + val channelToCleanUp_opt = using(pg.prepareStatement("SELECT channel_id, before_commitment_number FROM local.htlc_infos_to_remove LIMIT 1")) { statement => + statement.executeQuery().map(rs => { + val channelId = ByteVector32(rs.getByteVector32FromHex("channel_id")) + val beforeCommitmentNumber = rs.getLong("before_commitment_number") + (channelId, beforeCommitmentNumber) + }).lastOption + } + // Remove a batch of HTLC information for that channel. + channelToCleanUp_opt.foreach { case (channelId, beforeCommitmentNumber) => + val deletedCount = using(pg.prepareStatement(s"DELETE FROM local.htlc_infos WHERE channel_id=? AND commitment_number IN (SELECT commitment_number FROM local.htlc_infos WHERE channel_id=? AND commitment_number + statement.setString(1, channelId.toHex) + statement.setString(2, channelId.toHex) + statement.setLong(3, beforeCommitmentNumber) + statement.executeUpdate() + } + logger.info(s"deleted $deletedCount rows from htlc_infos for channelId=$channelId beforeCommitmentNumber=$beforeCommitmentNumber") + // If we've deleted all HTLC information for that channel, we can now remove it from the DB. + if (deletedCount < batchSize) { + using(pg.prepareStatement("DELETE FROM local.htlc_infos_to_remove WHERE channel_id=?")) { statement => + statement.setString(1, channelId.toHex) + statement.executeUpdate() + } + } + } + } + } + override def listLocalChannels(): Seq[PersistentChannelData] = withMetrics("channels/list-local-channels", DbBackends.Postgres) { withLock { pg => using(pg.createStatement) { statement => diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala index ea07977280..e9f9f8d014 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala @@ -24,7 +24,7 @@ import fr.acinq.eclair.db.DbEventHandler.ChannelEvent import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec -import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampMilli, TimestampSecond} +import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampMilli} import grizzled.slf4j.Logging import scodec.bits.BitVector @@ -32,7 +32,7 @@ import java.sql.{Connection, Statement} object SqliteChannelsDb { val DB_NAME = "channels" - val CURRENT_VERSION = 4 + val CURRENT_VERSION = 5 } class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { @@ -79,10 +79,15 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { )(logger) } + def migration45(): Unit = { + statement.executeUpdate("CREATE TABLE htlc_infos_to_remove (channel_id BLOB NOT NULL PRIMARY KEY, before_commitment_number INTEGER NOT NULL)") + } + getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE TABLE local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0, created_timestamp INTEGER, last_payment_sent_timestamp INTEGER, last_payment_received_timestamp INTEGER, last_connected_timestamp INTEGER, closed_timestamp INTEGER)") statement.executeUpdate("CREATE TABLE htlc_infos (channel_id BLOB NOT NULL, commitment_number INTEGER NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))") + statement.executeUpdate("CREATE TABLE htlc_infos_to_remove (channel_id BLOB NOT NULL PRIMARY KEY, before_commitment_number INTEGER NOT NULL)") statement.executeUpdate("CREATE INDEX htlc_infos_idx ON htlc_infos(channel_id, commitment_number)") case Some(v@(1 | 2 | 3)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") @@ -95,6 +100,9 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { if (v < 4) { migration34() } + if (v < 5) { + migration45() + } case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -152,10 +160,9 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { statement.executeUpdate() } - using(sqlite.prepareStatement("DELETE FROM htlc_infos WHERE channel_id=?")) { statement => - statement.setBytes(1, channelId.toArray) - statement.executeUpdate() - } + // The htlc_infos may contain millions of rows, which is very expensive to delete synchronously. + // We instead run an asynchronous job to clean up that data in small batches. + markHtlcInfosForRemoval(channelId, Long.MaxValue) using(sqlite.prepareStatement("UPDATE local_channels SET is_closed=1, closed_timestamp=? WHERE channel_id=?")) { statement => statement.setLong(1, TimestampMilli.now().toLong) @@ -164,6 +171,48 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { } } + override def markHtlcInfosForRemoval(channelId: ByteVector32, beforeCommitIndex: Long): Unit = withMetrics("channels/forget-htlc-infos", DbBackends.Sqlite) { + using(sqlite.prepareStatement("UPDATE htlc_infos_to_remove SET before_commitment_number=? WHERE channel_id=?")) { update => + update.setLong(1, beforeCommitIndex) + update.setBytes(2, channelId.toArray) + if (update.executeUpdate() == 0) { + using(sqlite.prepareStatement("INSERT INTO htlc_infos_to_remove VALUES (?, ?)")) { statement => + statement.setBytes(1, channelId.toArray) + statement.setLong(2, beforeCommitIndex) + statement.executeUpdate() + } + } + } + } + + override def removeHtlcInfos(batchSize: Int): Unit = withMetrics("channels/remove-htlc-infos", DbBackends.Sqlite) { + // Check if there are channels that need to be cleaned up. + val channelToCleanUp_opt = using(sqlite.prepareStatement("SELECT channel_id, before_commitment_number FROM htlc_infos_to_remove LIMIT 1")) { statement => + statement.executeQuery().map(rs => { + val channelId = ByteVector32(rs.getByteVector32("channel_id")) + val beforeCommitmentNumber = rs.getLong("before_commitment_number") + (channelId, beforeCommitmentNumber) + }).lastOption + } + // Remove a batch of HTLC information for that channel. + channelToCleanUp_opt.foreach { case (channelId, beforeCommitmentNumber) => + val deletedCount = using(sqlite.prepareStatement(s"DELETE FROM htlc_infos WHERE channel_id=? AND commitment_number IN (SELECT commitment_number FROM htlc_infos WHERE channel_id=? AND commitment_number + statement.setBytes(1, channelId.toArray) + statement.setBytes(2, channelId.toArray) + statement.setLong(3, beforeCommitmentNumber) + statement.executeUpdate() + } + logger.info(s"deleted $deletedCount rows from htlc_infos for channelId=$channelId beforeCommitmentNumber=$beforeCommitmentNumber") + // If we've deleted all HTLC information for that channel, we can now remove it from the DB. + if (deletedCount < batchSize) { + using(sqlite.prepareStatement("DELETE FROM htlc_infos_to_remove WHERE channel_id=?")) { statement => + statement.setBytes(1, channelId.toArray) + statement.executeUpdate() + } + } + } + } + override def listLocalChannels(): Seq[PersistentChannelData] = withMetrics("channels/list-local-channels", DbBackends.Sqlite) { using(sqlite.createStatement) { statement => statement.executeQuery("SELECT data FROM local_channels WHERE is_closed=0") @@ -175,21 +224,21 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { override def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Sqlite) { val sql = "SELECT data FROM local_channels WHERE is_closed=1 ORDER BY closed_timestamp DESC" remoteNodeId_opt match { - case None => - using(sqlite.prepareStatement(limited(sql, paginated_opt))) { statement => - statement.executeQuery().mapCodec(channelDataCodec).toSeq - } - case Some(nodeId) => - using(sqlite.prepareStatement(sql)) { statement => - val filtered = statement.executeQuery() - .mapCodec(channelDataCodec).filter(_.remoteNodeId == nodeId) - val limited = paginated_opt match { - case None => filtered - case Some(p) => filtered.slice(p.skip, p.skip + p.count) - } - limited.toSeq + case None => + using(sqlite.prepareStatement(limited(sql, paginated_opt))) { statement => + statement.executeQuery().mapCodec(channelDataCodec).toSeq + } + case Some(nodeId) => + using(sqlite.prepareStatement(sql)) { statement => + val filtered = statement.executeQuery() + .mapCodec(channelDataCodec).filter(_.remoteNodeId == nodeId) + val limited = paginated_opt match { + case None => filtered + case Some(p) => filtered.slice(p.skip, p.skip + p.count) } - } + limited.toSeq + } + } } override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = withMetrics("channels/add-htlc-info", DbBackends.Sqlite) { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version0/ChannelTypes0.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version0/ChannelTypes0.scala index 4d3d4bab63..e5fb015785 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version0/ChannelTypes0.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version0/ChannelTypes0.scala @@ -239,6 +239,7 @@ private[channel] object ChannelTypes0 { } val commitment = Commitment( fundingTxIndex = 0, + firstRemoteCommitIndex = 0, remoteFundingPubKey = remoteParams.fundingPubKey, // We set an empty funding tx, even if it may be confirmed already (and the channel fully operational). We could // have set a specific Unknown status, but it would have forced us to keep it forever. We will retrieve the diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version3/ChannelTypes3.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version3/ChannelTypes3.scala index 2720b5b440..ce14345e8e 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version3/ChannelTypes3.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version3/ChannelTypes3.scala @@ -45,7 +45,7 @@ private[channel] object ChannelTypes3 { def migrate(): channel.Commitments = channel.Commitments( ChannelParams(channelId, channelConfig, channelFeatures, localParams, remoteParams.migrate(), channelFlags), CommitmentChanges(localChanges, remoteChanges, localNextHtlcId, remoteNextHtlcId), - Seq(Commitment(fundingTxIndex = 0, remoteFundingPubKey = remoteParams.fundingPubKey, localFundingStatus, remoteFundingStatus, localCommit, remoteCommit, remoteNextCommitInfo.left.toOption.map(w => NextRemoteCommit(w.sent, w.nextRemoteCommit)))), + Seq(Commitment(fundingTxIndex = 0, firstRemoteCommitIndex = 0, remoteFundingPubKey = remoteParams.fundingPubKey, localFundingStatus, remoteFundingStatus, localCommit, remoteCommit, remoteNextCommitInfo.left.toOption.map(w => NextRemoteCommit(w.sent, w.nextRemoteCommit)))), inactive = Nil, remoteNextCommitInfo.fold(w => Left(WaitForRev(w.sentAfterLocalCommitIndex)), remotePerCommitmentPoint => Right(remotePerCommitmentPoint)), remotePerCommitmentSecrets, diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version4/ChannelCodecs4.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version4/ChannelCodecs4.scala index ffe5b5eee5..e8be2ded6e 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version4/ChannelCodecs4.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version4/ChannelCodecs4.scala @@ -6,17 +6,16 @@ import fr.acinq.bitcoin.scalacompat.{OutPoint, ScriptWitness, Transaction, TxOut import fr.acinq.eclair.blockchain.fee.{ConfirmationPriority, ConfirmationTarget} import fr.acinq.eclair.channel.LocalFundingStatus._ import fr.acinq.eclair.channel._ +import fr.acinq.eclair.channel.fund.InteractiveTxBuilder.{FullySignedSharedTransaction, PartiallySignedSharedTransaction} import fr.acinq.eclair.channel.fund.InteractiveTxSigningSession.UnsignedLocalCommit import fr.acinq.eclair.channel.fund.{InteractiveTxBuilder, InteractiveTxSigningSession} import fr.acinq.eclair.crypto.ShaChain -import fr.acinq.eclair.MilliSatoshiLong -import fr.acinq.eclair.channel.fund.InteractiveTxBuilder.{FullySignedSharedTransaction, PartiallySignedSharedTransaction} import fr.acinq.eclair.transactions.Transactions._ import fr.acinq.eclair.transactions.{CommitmentSpec, DirectedHtlc, IncomingHtlc, OutgoingHtlc} import fr.acinq.eclair.wire.protocol.CommonCodecs._ import fr.acinq.eclair.wire.protocol.LightningMessageCodecs._ import fr.acinq.eclair.wire.protocol.{TxSignatures, UpdateAddHtlc, UpdateMessage} -import fr.acinq.eclair.{BlockHeight, FeatureSupport, Features, PermanentChannelFeature, channel} +import fr.acinq.eclair.{BlockHeight, FeatureSupport, Features, MilliSatoshiLong, PermanentChannelFeature, channel} import scodec.bits.{BitVector, ByteVector} import scodec.codecs._ import scodec.{Attempt, Codec} @@ -369,7 +368,13 @@ private[channel] object ChannelCodecs4 { ("sharedTx" | signedSharedTransactionCodec) :: ("createdAt" | blockHeight) :: ("fundingParams" | fundingParamsCodec)).as[DualFundedUnconfirmedFundingTx].xmap( - dfu => (dfu.sharedTx.tx.sharedInput_opt, dfu.fundingParams.sharedInput_opt) match { + dfu => fillSharedInputScript(dfu), + dfu => dfu + ) + + // When decoding interactive-tx from older codecs, we fill the shared input publicKeyScript if necessary. + private def fillSharedInputScript(dfu: DualFundedUnconfirmedFundingTx): DualFundedUnconfirmedFundingTx = { + (dfu.sharedTx.tx.sharedInput_opt, dfu.fundingParams.sharedInput_opt) match { case (Some(sharedTxInput), Some(sharedFundingParamsInput)) if sharedTxInput.publicKeyScript.isEmpty => val sharedTxInput1 = sharedTxInput.copy(publicKeyScript = sharedFundingParamsInput.info.txOut.publicKeyScript) val sharedTx1 = dfu.sharedTx.tx.copy(sharedInput_opt = Some(sharedTxInput1)) @@ -379,9 +384,8 @@ private[channel] object ChannelCodecs4 { } dfu1 case _ => dfu - }, - dfu => dfu - ) + } + } val fundingTxStatusCodec: Codec[LocalFundingStatus] = discriminated[LocalFundingStatus].by(uint8) .typecase(0x01, optional(bool8, txCodec).as[SingleFundedUnconfirmedFundingTx]) @@ -428,8 +432,19 @@ private[channel] object ChannelCodecs4 { ("sig" | lengthDelimited(commitSigCodec)) :: ("commit" | remoteCommitCodec(commitmentSpecCodec))).as[NextRemoteCommit] + private def commitmentCodecWithoutFirstRemoteCommitIndex(htlcs: Set[DirectedHtlc]): Codec[Commitment] = ( + ("fundingTxIndex" | uint32) :: + ("firstRemoteCommitIndex" | provide(0L)) :: + ("fundingPubKey" | publicKey) :: + ("fundingTxStatus" | fundingTxStatusCodec) :: + ("remoteFundingStatus" | remoteFundingStatusCodec) :: + ("localCommit" | localCommitCodec(minimalCommitmentSpecCodec(htlcs))) :: + ("remoteCommit" | remoteCommitCodec(minimalCommitmentSpecCodec(htlcs.map(_.opposite)))) :: + ("nextRemoteCommit_opt" | optional(bool8, nextRemoteCommitCodec(minimalCommitmentSpecCodec(htlcs.map(_.opposite)))))).as[Commitment] + private def commitmentCodec(htlcs: Set[DirectedHtlc]): Codec[Commitment] = ( ("fundingTxIndex" | uint32) :: + ("firstRemoteCommitIndex" | uint64overflow) :: ("fundingPubKey" | publicKey) :: ("fundingTxStatus" | fundingTxStatusCodec) :: ("remoteFundingStatus" | remoteFundingStatusCodec) :: @@ -490,6 +505,21 @@ private[channel] object ChannelCodecs4 { } } + val commitmentsCodecWithoutFirstRemoteCommitIndex: Codec[Commitments] = ( + ("params" | paramsCodec) :: + ("changes" | changesCodec) :: + (("htlcs" | setCodec(htlcCodec)) >>:~ { htlcs => + ("active" | listOfN(uint16, commitmentCodecWithoutFirstRemoteCommitIndex(htlcs))) :: + ("inactive" | listOfN(uint16, commitmentCodecWithoutFirstRemoteCommitIndex(htlcs))) :: + ("remoteNextCommitInfo" | either(bool8, waitForRevCodec, publicKey)) :: + ("remotePerCommitmentSecrets" | byteAligned(ShaChain.shaChainCodec)) :: + ("originChannels" | originsMapCodec) :: + ("remoteChannelData_opt" | optional(bool8, varsizebinarydata)) + })).as[EncodedCommitments].xmap( + encoded => encoded.toCommitments, + commitments => EncodedCommitments(commitments) + ) + val commitmentsCodec: Codec[Commitments] = ( ("params" | paramsCodec) :: ("changes" | changesCodec) :: @@ -505,6 +535,9 @@ private[channel] object ChannelCodecs4 { commitments => EncodedCommitments(commitments) ) + val versionedCommitmentsCodec: Codec[Commitments] = discriminated[Commitments].by(uint8) + .typecase(0x01, commitmentsCodec) + val closingFeeratesCodec: Codec[ClosingFeerates] = ( ("preferred" | feeratePerKw) :: ("min" | feeratePerKw) :: @@ -565,13 +598,23 @@ private[channel] object ChannelCodecs4 { .\(0x02) { case status: SpliceStatus.SpliceWaitingForSigs => status }(interactiveTxWaitingForSigsCodec.as[channel.SpliceStatus.SpliceWaitingForSigs]) val DATA_WAIT_FOR_FUNDING_CONFIRMED_00_Codec: Codec[DATA_WAIT_FOR_FUNDING_CONFIRMED] = ( - ("commitments" | commitmentsCodec) :: + ("commitments" | commitmentsCodecWithoutFirstRemoteCommitIndex) :: + ("waitingSince" | blockHeight) :: + ("deferred" | optional(bool8, lengthDelimited(channelReadyCodec))) :: + ("lastSent" | either(bool8, lengthDelimited(fundingCreatedCodec), lengthDelimited(fundingSignedCodec)))).as[DATA_WAIT_FOR_FUNDING_CONFIRMED] + + val DATA_WAIT_FOR_FUNDING_CONFIRMED_0a_Codec: Codec[DATA_WAIT_FOR_FUNDING_CONFIRMED] = ( + ("commitments" | versionedCommitmentsCodec) :: ("waitingSince" | blockHeight) :: ("deferred" | optional(bool8, lengthDelimited(channelReadyCodec))) :: ("lastSent" | either(bool8, lengthDelimited(fundingCreatedCodec), lengthDelimited(fundingSignedCodec)))).as[DATA_WAIT_FOR_FUNDING_CONFIRMED] val DATA_WAIT_FOR_CHANNEL_READY_01_Codec: Codec[DATA_WAIT_FOR_CHANNEL_READY] = ( - ("commitments" | commitmentsCodec) :: + ("commitments" | commitmentsCodecWithoutFirstRemoteCommitIndex) :: + ("shortIds" | shortids)).as[DATA_WAIT_FOR_CHANNEL_READY] + + val DATA_WAIT_FOR_CHANNEL_READY_0b_Codec: Codec[DATA_WAIT_FOR_CHANNEL_READY] = ( + ("commitments" | versionedCommitmentsCodec) :: ("shortIds" | shortids)).as[DATA_WAIT_FOR_CHANNEL_READY] val DATA_WAIT_FOR_DUAL_FUNDING_SIGNED_09_Codec: Codec[DATA_WAIT_FOR_DUAL_FUNDING_SIGNED] = ( @@ -583,7 +626,16 @@ private[channel] object ChannelCodecs4 { ("remoteChannelData_opt" | optional(bool8, varsizebinarydata))).as[DATA_WAIT_FOR_DUAL_FUNDING_SIGNED] val DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED_02_Codec: Codec[DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED] = ( - ("commitments" | commitmentsCodec) :: + ("commitments" | commitmentsCodecWithoutFirstRemoteCommitIndex) :: + ("localPushAmount" | millisatoshi) :: + ("remotePushAmount" | millisatoshi) :: + ("waitingSince" | blockHeight) :: + ("lastChecked" | blockHeight) :: + ("rbfStatus" | rbfStatusCodec) :: + ("deferred" | optional(bool8, lengthDelimited(channelReadyCodec)))).as[DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED] + + val DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED_0c_Codec: Codec[DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED] = ( + ("commitments" | versionedCommitmentsCodec) :: ("localPushAmount" | millisatoshi) :: ("remotePushAmount" | millisatoshi) :: ("waitingSince" | blockHeight) :: @@ -592,11 +644,25 @@ private[channel] object ChannelCodecs4 { ("deferred" | optional(bool8, lengthDelimited(channelReadyCodec)))).as[DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED] val DATA_WAIT_FOR_DUAL_FUNDING_READY_03_Codec: Codec[DATA_WAIT_FOR_DUAL_FUNDING_READY] = ( - ("commitments" | commitmentsCodec) :: + ("commitments" | commitmentsCodecWithoutFirstRemoteCommitIndex) :: + ("shortIds" | shortids)).as[DATA_WAIT_FOR_DUAL_FUNDING_READY] + + val DATA_WAIT_FOR_DUAL_FUNDING_READY_0d_Codec: Codec[DATA_WAIT_FOR_DUAL_FUNDING_READY] = ( + ("commitments" | versionedCommitmentsCodec) :: ("shortIds" | shortids)).as[DATA_WAIT_FOR_DUAL_FUNDING_READY] val DATA_NORMAL_04_Codec: Codec[DATA_NORMAL] = ( - ("commitments" | commitmentsCodec) :: + ("commitments" | commitmentsCodecWithoutFirstRemoteCommitIndex) :: + ("shortids" | shortids) :: + ("channelAnnouncement" | optional(bool8, lengthDelimited(channelAnnouncementCodec))) :: + ("channelUpdate" | lengthDelimited(channelUpdateCodec)) :: + ("localShutdown" | optional(bool8, lengthDelimited(shutdownCodec))) :: + ("remoteShutdown" | optional(bool8, lengthDelimited(shutdownCodec))) :: + ("closingFeerates" | optional(bool8, closingFeeratesCodec)) :: + ("spliceStatus" | spliceStatusCodec)).as[DATA_NORMAL] + + val DATA_NORMAL_0e_Codec: Codec[DATA_NORMAL] = ( + ("commitments" | versionedCommitmentsCodec) :: ("shortids" | shortids) :: ("channelAnnouncement" | optional(bool8, lengthDelimited(channelAnnouncementCodec))) :: ("channelUpdate" | lengthDelimited(channelUpdateCodec)) :: @@ -606,20 +672,45 @@ private[channel] object ChannelCodecs4 { ("spliceStatus" | spliceStatusCodec)).as[DATA_NORMAL] val DATA_SHUTDOWN_05_Codec: Codec[DATA_SHUTDOWN] = ( - ("commitments" | commitmentsCodec) :: + ("commitments" | commitmentsCodecWithoutFirstRemoteCommitIndex) :: + ("localShutdown" | lengthDelimited(shutdownCodec)) :: + ("remoteShutdown" | lengthDelimited(shutdownCodec)) :: + ("closingFeerates" | optional(bool8, closingFeeratesCodec))).as[DATA_SHUTDOWN] + + val DATA_SHUTDOWN_0f_Codec: Codec[DATA_SHUTDOWN] = ( + ("commitments" | versionedCommitmentsCodec) :: ("localShutdown" | lengthDelimited(shutdownCodec)) :: ("remoteShutdown" | lengthDelimited(shutdownCodec)) :: ("closingFeerates" | optional(bool8, closingFeeratesCodec))).as[DATA_SHUTDOWN] val DATA_NEGOTIATING_06_Codec: Codec[DATA_NEGOTIATING] = ( - ("commitments" | commitmentsCodec) :: + ("commitments" | commitmentsCodecWithoutFirstRemoteCommitIndex) :: + ("localShutdown" | lengthDelimited(shutdownCodec)) :: + ("remoteShutdown" | lengthDelimited(shutdownCodec)) :: + ("closingTxProposed" | listOfN(uint16, listOfN(uint16, lengthDelimited(closingTxProposedCodec)))) :: + ("bestUnpublishedClosingTx_opt" | optional(bool8, closingTxCodec))).as[DATA_NEGOTIATING] + + val DATA_NEGOTIATING_10_Codec: Codec[DATA_NEGOTIATING] = ( + ("commitments" | versionedCommitmentsCodec) :: ("localShutdown" | lengthDelimited(shutdownCodec)) :: ("remoteShutdown" | lengthDelimited(shutdownCodec)) :: ("closingTxProposed" | listOfN(uint16, listOfN(uint16, lengthDelimited(closingTxProposedCodec)))) :: ("bestUnpublishedClosingTx_opt" | optional(bool8, closingTxCodec))).as[DATA_NEGOTIATING] val DATA_CLOSING_07_Codec: Codec[DATA_CLOSING] = ( - ("commitments" | commitmentsCodec) :: + ("commitments" | commitmentsCodecWithoutFirstRemoteCommitIndex) :: + ("waitingSince" | blockHeight) :: + ("finalScriptPubKey" | lengthDelimited(bytes)) :: + ("mutualCloseProposed" | listOfN(uint16, closingTxCodec)) :: + ("mutualClosePublished" | listOfN(uint16, closingTxCodec)) :: + ("localCommitPublished" | optional(bool8, localCommitPublishedCodec)) :: + ("remoteCommitPublished" | optional(bool8, remoteCommitPublishedCodec)) :: + ("nextRemoteCommitPublished" | optional(bool8, remoteCommitPublishedCodec)) :: + ("futureRemoteCommitPublished" | optional(bool8, remoteCommitPublishedCodec)) :: + ("revokedCommitPublished" | listOfN(uint16, revokedCommitPublishedCodec))).as[DATA_CLOSING] + + val DATA_CLOSING_11_Codec: Codec[DATA_CLOSING] = ( + ("commitments" | versionedCommitmentsCodec) :: ("waitingSince" | blockHeight) :: ("finalScriptPubKey" | lengthDelimited(bytes)) :: ("mutualCloseProposed" | listOfN(uint16, closingTxCodec)) :: @@ -631,12 +722,25 @@ private[channel] object ChannelCodecs4 { ("revokedCommitPublished" | listOfN(uint16, revokedCommitPublishedCodec))).as[DATA_CLOSING] val DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT_08_Codec: Codec[DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT] = ( - ("commitments" | commitmentsCodec) :: + ("commitments" | commitmentsCodecWithoutFirstRemoteCommitIndex) :: + ("remoteChannelReestablish" | channelReestablishCodec)).as[DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT] + + val DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT_12_Codec: Codec[DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT] = ( + ("commitments" | versionedCommitmentsCodec) :: ("remoteChannelReestablish" | channelReestablishCodec)).as[DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT] } // Order matters! val channelDataCodec: Codec[PersistentChannelData] = discriminated[PersistentChannelData].by(uint16) + .typecase(0x12, Codecs.DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT_12_Codec) + .typecase(0x11, Codecs.DATA_CLOSING_11_Codec) + .typecase(0x10, Codecs.DATA_NEGOTIATING_10_Codec) + .typecase(0x0f, Codecs.DATA_SHUTDOWN_0f_Codec) + .typecase(0x0e, Codecs.DATA_NORMAL_0e_Codec) + .typecase(0x0d, Codecs.DATA_WAIT_FOR_DUAL_FUNDING_READY_0d_Codec) + .typecase(0x0c, Codecs.DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED_0c_Codec) + .typecase(0x0b, Codecs.DATA_WAIT_FOR_CHANNEL_READY_0b_Codec) + .typecase(0x0a, Codecs.DATA_WAIT_FOR_FUNDING_CONFIRMED_0a_Codec) .typecase(0x09, Codecs.DATA_WAIT_FOR_DUAL_FUNDING_SIGNED_09_Codec) .typecase(0x08, Codecs.DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT_08_Codec) .typecase(0x07, Codecs.DATA_CLOSING_07_Codec) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index 56ba361d16..4389229109 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -23,6 +23,7 @@ import fr.acinq.eclair.blockchain.fee._ import fr.acinq.eclair.channel.fsm.Channel.{ChannelConf, RemoteRbfLimits, UnhandledExceptionStrategy} import fr.acinq.eclair.channel.{ChannelFlags, LocalParams} import fr.acinq.eclair.crypto.keymanager.{LocalChannelKeyManager, LocalNodeKeyManager} +import fr.acinq.eclair.db.RevokedHtlcInfoCleaner import fr.acinq.eclair.io.MessageRelay.RelayAll import fr.acinq.eclair.io.{OpenChannelInterceptor, PeerConnection} import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig @@ -225,7 +226,8 @@ object TestConstants { timeout = 200 millis, maxAttempts = 2, ), - purgeInvoicesInterval = None + purgeInvoicesInterval = None, + revokedHtlcInfoCleanerConfig = RevokedHtlcInfoCleaner.Config(10, 100 millis) ) def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams( @@ -391,7 +393,8 @@ object TestConstants { timeout = 100 millis, maxAttempts = 2, ), - purgeInvoicesInterval = None + purgeInvoicesInterval = None, + revokedHtlcInfoCleanerConfig = RevokedHtlcInfoCleaner.Config(10, 100 millis) ) def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams( diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/CommitmentsSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/CommitmentsSpec.scala index f3cfa2940b..467b0f5f96 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/CommitmentsSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/CommitmentsSpec.scala @@ -496,7 +496,7 @@ object CommitmentsSpec { Commitments( ChannelParams(randomBytes32(), ChannelConfig.standard, ChannelFeatures(), localParams, remoteParams, ChannelFlags(announceChannel = announceChannel)), CommitmentChanges(LocalChanges(Nil, Nil, Nil), RemoteChanges(Nil, Nil, Nil), localNextHtlcId = 1, remoteNextHtlcId = 1), - List(Commitment(0, remoteFundingPubKey, LocalFundingStatus.SingleFundedUnconfirmedFundingTx(None), RemoteFundingStatus.Locked, localCommit, remoteCommit, None)), + List(Commitment(0, 0, remoteFundingPubKey, LocalFundingStatus.SingleFundedUnconfirmedFundingTx(None), RemoteFundingStatus.Locked, localCommit, remoteCommit, None)), inactive = Nil, Right(randomKey().publicKey), ShaChain.init, @@ -515,7 +515,7 @@ object CommitmentsSpec { Commitments( ChannelParams(randomBytes32(), ChannelConfig.standard, ChannelFeatures(), localParams, remoteParams, ChannelFlags(announceChannel = announceChannel)), CommitmentChanges(LocalChanges(Nil, Nil, Nil), RemoteChanges(Nil, Nil, Nil), localNextHtlcId = 1, remoteNextHtlcId = 1), - List(Commitment(0, remoteFundingPubKey, LocalFundingStatus.SingleFundedUnconfirmedFundingTx(None), RemoteFundingStatus.Locked, localCommit, remoteCommit, None)), + List(Commitment(0, 0, remoteFundingPubKey, LocalFundingStatus.SingleFundedUnconfirmedFundingTx(None), RemoteFundingStatus.Locked, localCommit, remoteCommit, None)), inactive = Nil, Right(randomKey().publicKey), ShaChain.init, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala index 11aae21ead..8dafe0e5bb 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala @@ -33,8 +33,9 @@ import fr.acinq.eclair.channel.fsm.Channel import fr.acinq.eclair.channel.fund.InteractiveTxBuilder.FullySignedSharedTransaction import fr.acinq.eclair.channel.publish.TxPublisher.{PublishFinalTx, PublishReplaceableTx, PublishTx, SetChannelId} import fr.acinq.eclair.channel.states.ChannelStateTestsBase.{FakeTxPublisherFactory, PimpTestFSM} -import fr.acinq.eclair.channel.states.ChannelStateTestsTags.{AnchorOutputsZeroFeeHtlcTxs, NoMaxHtlcValueInFlight, ZeroConf} +import fr.acinq.eclair.channel.states.ChannelStateTestsTags._ import fr.acinq.eclair.channel.states.{ChannelStateTestsBase, ChannelStateTestsTags} +import fr.acinq.eclair.db.RevokedHtlcInfoCleaner.ForgetHtlcInfos import fr.acinq.eclair.payment.relay.Relayer import fr.acinq.eclair.testutils.PimpTestProbe.convert import fr.acinq.eclair.transactions.DirectedHtlc.{incoming, outgoing} @@ -57,7 +58,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik implicit val log: akka.event.LoggingAdapter = akka.event.NoLogging override def withFixture(test: OneArgTest): Outcome = { - val tags = test.tags + ChannelStateTestsTags.DualFunding + ChannelStateTestsTags.Splicing + val tags = test.tags + DualFunding + Splicing val setup = init(tags = tags) import setup._ reachNormal(setup, tags) @@ -275,10 +276,10 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik } test("recv CMD_SPLICE (splice-in, non dual-funded channel)") { () => - val f = init(tags = Set(ChannelStateTestsTags.DualFunding, ChannelStateTestsTags.Splicing)) + val f = init(tags = Set(DualFunding, Splicing)) import f._ - reachNormal(f, tags = Set(ChannelStateTestsTags.Splicing)) // we open a non dual-funded channel + reachNormal(f, tags = Set(Splicing)) // we open a non dual-funded channel alice2bob.ignoreMsg { case _: ChannelUpdate => true } bob2alice.ignoreMsg { case _: ChannelUpdate => true } awaitCond(alice.stateName == NORMAL && bob.stateName == NORMAL) @@ -303,7 +304,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik assert(postSpliceState.commitments.latest.remoteChannelReserve == 15_000.sat) } - test("recv CMD_SPLICE (splice-in, local and remote commit index mismatch)", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("recv CMD_SPLICE (splice-in, local and remote commit index mismatch)", Tag(Quiescence)) { f => import f._ // Alice and Bob asynchronously exchange HTLCs, which makes their commit indices diverge. @@ -378,7 +379,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik sender.expectMsgType[RES_FAILURE[_, _]] } - test("recv CMD_SPLICE (splice-out, would go below reserve, quiescent)", Tag(ChannelStateTestsTags.Quiescence), Tag(NoMaxHtlcValueInFlight)) { f => + test("recv CMD_SPLICE (splice-out, would go below reserve, quiescent)", Tag(Quiescence), Tag(NoMaxHtlcValueInFlight)) { f => import f._ setupHtlcs(f) @@ -471,7 +472,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik testSpliceInAndOutCmd(f) } - test("recv CMD_SPLICE (splice-in + splice-out, quiescence)", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("recv CMD_SPLICE (splice-in + splice-out, quiescence)", Tag(Quiescence)) { f => testSpliceInAndOutCmd(f) } @@ -771,19 +772,27 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik assert(bob.stateData.asInstanceOf[DATA_NORMAL].commitments.inactive.map(_.fundingTxIndex) == Seq.empty) } - test("emit post-splice events", Tag(NoMaxHtlcValueInFlight)) { f => + test("emit post-splice events", Tag(NoMaxHtlcValueInFlight), Tag(Quiescence)) { f => import f._ + // Alice and Bob asynchronously exchange HTLCs, which makes their commit indices diverge. + addHtlc(25_000_000 msat, alice, bob, alice2bob, bob2alice) + addHtlc(50_000_000 msat, bob, alice, bob2alice, alice2bob) + crossSign(alice, bob, alice2bob, bob2alice) + val initialState = alice.stateData.asInstanceOf[DATA_NORMAL] assert(initialState.commitments.latest.capacity == 1_500_000.sat) - assert(initialState.commitments.latest.localCommit.spec.toLocal == 800_000_000.msat) - assert(initialState.commitments.latest.localCommit.spec.toRemote == 700_000_000.msat) + assert(initialState.commitments.latest.localCommit.spec.toLocal == 775_000_000.msat) + assert(initialState.commitments.latest.localCommit.spec.toRemote == 650_000_000.msat) + assert(initialState.commitments.localCommitIndex != initialState.commitments.remoteCommitIndex) val aliceEvents = TestProbe() val bobEvents = TestProbe() + systemA.eventStream.subscribe(aliceEvents.ref, classOf[ForgetHtlcInfos]) systemA.eventStream.subscribe(aliceEvents.ref, classOf[AvailableBalanceChanged]) systemA.eventStream.subscribe(aliceEvents.ref, classOf[LocalChannelUpdate]) systemA.eventStream.subscribe(aliceEvents.ref, classOf[LocalChannelDown]) + systemB.eventStream.subscribe(bobEvents.ref, classOf[ForgetHtlcInfos]) systemB.eventStream.subscribe(bobEvents.ref, classOf[AvailableBalanceChanged]) systemB.eventStream.subscribe(bobEvents.ref, classOf[LocalChannelUpdate]) systemB.eventStream.subscribe(bobEvents.ref, classOf[LocalChannelDown]) @@ -797,14 +806,16 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik alice ! WatchFundingConfirmedTriggered(BlockHeight(400000), 42, fundingTx1) alice2bob.expectMsgType[SpliceLocked] alice2bob.forward(bob) + aliceEvents.expectMsg(ForgetHtlcInfos(initialState.channelId, initialState.commitments.remoteCommitIndex)) aliceEvents.expectNoMessage(100 millis) bobEvents.expectNoMessage(100 millis) bob ! WatchFundingConfirmedTriggered(BlockHeight(400000), 42, fundingTx1) bob2alice.expectMsgType[SpliceLocked] bob2alice.forward(alice) - aliceEvents.expectAvailableBalanceChanged(balance = 1_300_000_000.msat, capacity = 2_000_000.sat) - bobEvents.expectAvailableBalanceChanged(balance = 700_000_000.msat, capacity = 2_000_000.sat) + aliceEvents.expectAvailableBalanceChanged(balance = 1_275_000_000.msat, capacity = 2_000_000.sat) + bobEvents.expectMsg(ForgetHtlcInfos(initialState.channelId, initialState.commitments.localCommitIndex)) + bobEvents.expectAvailableBalanceChanged(balance = 650_000_000.msat, capacity = 2_000_000.sat) aliceEvents.expectNoMessage(100 millis) bobEvents.expectNoMessage(100 millis) @@ -812,13 +823,15 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik bob2alice.expectMsgType[SpliceLocked] bob2alice.forward(alice) aliceEvents.expectNoMessage(100 millis) + bobEvents.expectMsg(ForgetHtlcInfos(initialState.channelId, initialState.commitments.localCommitIndex)) bobEvents.expectNoMessage(100 millis) alice ! WatchFundingConfirmedTriggered(BlockHeight(400000), 42, fundingTx2) alice2bob.expectMsgType[SpliceLocked] alice2bob.forward(bob) - aliceEvents.expectAvailableBalanceChanged(balance = 1_800_000_000.msat, capacity = 2_500_000.sat) - bobEvents.expectAvailableBalanceChanged(balance = 700_000_000.msat, capacity = 2_500_000.sat) + aliceEvents.expectMsg(ForgetHtlcInfos(initialState.channelId, initialState.commitments.remoteCommitIndex)) + aliceEvents.expectAvailableBalanceChanged(balance = 1_775_000_000.msat, capacity = 2_500_000.sat) + bobEvents.expectAvailableBalanceChanged(balance = 650_000_000.msat, capacity = 2_500_000.sat) aliceEvents.expectNoMessage(100 millis) bobEvents.expectNoMessage(100 millis) } @@ -1124,7 +1137,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik testDisconnectCommitSigNotReceived(f) } - test("disconnect (commit_sig not received, quiescence)", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("disconnect (commit_sig not received, quiescence)", Tag(Quiescence)) { f => testDisconnectCommitSigNotReceived(f) } @@ -1164,7 +1177,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik testDisconnectCommitSigReceivedByAlice(f) } - test("disconnect (commit_sig received by alice, quiescence)", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("disconnect (commit_sig received by alice, quiescence)", Tag(Quiescence)) { f => testDisconnectCommitSigReceivedByAlice(f) } @@ -1206,7 +1219,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik testDisconnectTxSignaturesSentByBob(f) } - test("disconnect (tx_signatures sent by bob, quiescence)", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("disconnect (tx_signatures sent by bob, quiescence)", Tag(Quiescence)) { f => testDisconnectTxSignaturesSentByBob(f) } @@ -1255,7 +1268,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik testDisconnectTxSignaturesReceivedByAlice(f) } - test("disconnect (tx_signatures received by alice, quiescence)", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("disconnect (tx_signatures received by alice, quiescence)", Tag(Quiescence)) { f => testDisconnectTxSignaturesReceivedByAlice(f) } @@ -1301,11 +1314,11 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik resolveHtlcs(f, htlcs, spliceOutFee = 0.sat) } - test("disconnect (tx_signatures received by alice, zero-conf)", Tag(ChannelStateTestsTags.ZeroConf), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs)) { f => + test("disconnect (tx_signatures received by alice, zero-conf)", Tag(ZeroConf), Tag(AnchorOutputsZeroFeeHtlcTxs)) { f => testDisconnectTxSignaturesReceivedByAliceZeroConf(f) } - test("disconnect (tx_signatures received by alice, zero-conf, quiescence)", Tag(ChannelStateTestsTags.ZeroConf), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs), Tag(ChannelStateTestsTags.Quiescence)) { f => + test("disconnect (tx_signatures received by alice, zero-conf, quiescence)", Tag(ZeroConf), Tag(AnchorOutputsZeroFeeHtlcTxs), Tag(Quiescence)) { f => testDisconnectTxSignaturesReceivedByAliceZeroConf(f) } @@ -1341,7 +1354,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik awaitCond(bob.stateData.asInstanceOf[DATA_NORMAL].spliceStatus == SpliceStatus.NoSplice) } - test("don't resend splice_locked when zero-conf channel confirms", Tag(ChannelStateTestsTags.ZeroConf), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs)) { f => + test("don't resend splice_locked when zero-conf channel confirms", Tag(ZeroConf), Tag(AnchorOutputsZeroFeeHtlcTxs)) { f => import f._ initiateSplice(f, spliceIn_opt = Some(SpliceIn(500_000 sat, pushAmount = 0 msat))) @@ -1538,7 +1551,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik testForceCloseWithMultipleSplicesSimple(f) } - test("force-close with multiple splices (simple, quiescence)", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("force-close with multiple splices (simple, quiescence)", Tag(Quiescence)) { f => testForceCloseWithMultipleSplicesSimple(f) } @@ -1620,7 +1633,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik testForceCloseWithMultipleSplicesPreviousActiveRemote(f) } - test("force-close with multiple splices (previous active remote, quiescence)", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("force-close with multiple splices (previous active remote, quiescence)", Tag(Quiescence)) { f => testForceCloseWithMultipleSplicesPreviousActiveRemote(f) } @@ -1698,7 +1711,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik testForceCloseWithMultipleSplicesPreviousActiveRevoked(f) } - test("force-close with multiple splices (previous active revoked, quiescent)", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("force-close with multiple splices (previous active revoked, quiescent)", Tag(Quiescence)) { f => testForceCloseWithMultipleSplicesPreviousActiveRevoked(f) } @@ -1809,11 +1822,11 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik assert(Helpers.Closing.isClosed(alice.stateData.asInstanceOf[DATA_CLOSING], None).exists(_.isInstanceOf[RemoteClose])) } - test("force-close with multiple splices (inactive remote)", Tag(ChannelStateTestsTags.ZeroConf), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs)) { f => + test("force-close with multiple splices (inactive remote)", Tag(ZeroConf), Tag(AnchorOutputsZeroFeeHtlcTxs)) { f => testForceCloseWithMultipleSplicesInactiveRemote(f) } - test("force-close with multiple splices (inactive remote, quiescence)", Tag(ChannelStateTestsTags.Quiescence), Tag(ChannelStateTestsTags.ZeroConf), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs)) { f => + test("force-close with multiple splices (inactive remote, quiescence)", Tag(Quiescence), Tag(ZeroConf), Tag(AnchorOutputsZeroFeeHtlcTxs)) { f => testForceCloseWithMultipleSplicesInactiveRemote(f) } @@ -1928,11 +1941,11 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik assert(Helpers.Closing.isClosed(alice.stateData.asInstanceOf[DATA_CLOSING], None).exists(_.isInstanceOf[RevokedClose])) } - test("force-close with multiple splices (inactive revoked)", Tag(ChannelStateTestsTags.ZeroConf), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs)) { f => + test("force-close with multiple splices (inactive revoked)", Tag(ZeroConf), Tag(AnchorOutputsZeroFeeHtlcTxs)) { f => testForceCloseWithMultipleSplicesInactiveRevoked(f) } - test("force-close with multiple splices (inactive revoked, quiescence)", Tag(ChannelStateTestsTags.Quiescence), Tag(ChannelStateTestsTags.ZeroConf), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs)) { f => + test("force-close with multiple splices (inactive revoked, quiescence)", Tag(Quiescence), Tag(ZeroConf), Tag(AnchorOutputsZeroFeeHtlcTxs)) { f => testForceCloseWithMultipleSplicesInactiveRevoked(f) } @@ -1971,7 +1984,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik bob2blockchain.expectNoMessage(100 millis) } - test("put back watches after restart (inactive)", Tag(ChannelStateTestsTags.ZeroConf), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs)) { f => + test("put back watches after restart (inactive)", Tag(ZeroConf), Tag(AnchorOutputsZeroFeeHtlcTxs)) { f => import f._ val fundingTx0 = alice.stateData.asInstanceOf[DATA_NORMAL].commitments.latest.localFundingStatus.signedTx_opt.get @@ -2030,7 +2043,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik bob2blockchain.expectNoMessage(100 millis) } - test("recv CMD_SPLICE (splice-in + splice-out) with pre and post splice htlcs", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("recv CMD_SPLICE (splice-in + splice-out) with pre and post splice htlcs", Tag(Quiescence)) { f => import f._ val htlcs = setupHtlcs(f) @@ -2067,7 +2080,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik resolveHtlcs(f, htlcs, spliceOutFee = 0.sat) } - test("recv CMD_SPLICE (splice-in + splice-out) with pending htlcs, resolved after splice locked", Tag(ChannelStateTestsTags.Quiescence), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs)) { f => + test("recv CMD_SPLICE (splice-in + splice-out) with pending htlcs, resolved after splice locked", Tag(Quiescence), Tag(AnchorOutputsZeroFeeHtlcTxs)) { f => import f._ val htlcs = setupHtlcs(f) @@ -2086,7 +2099,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik resolveHtlcs(f, htlcs, spliceOutFee = 0.sat) } - test("recv multiple CMD_SPLICE (splice-in, splice-out, quiescence)", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("recv multiple CMD_SPLICE (splice-in, splice-out, quiescence)", Tag(Quiescence)) { f => val htlcs = setupHtlcs(f) initiateSplice(f, spliceIn_opt = Some(SpliceIn(500_000 sat))) @@ -2095,7 +2108,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik resolveHtlcs(f, htlcs, spliceOutFee = spliceOutFee(f, capacity = 1_900_000.sat)) } - test("recv invalid htlc signatures during splice-in", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("recv invalid htlc signatures during splice-in", Tag(Quiescence)) { f => import f._ val htlcs = setupHtlcs(f) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala index 25cd2ef44c..bc1e5c2a6a 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala @@ -18,8 +18,7 @@ package fr.acinq.eclair.db import com.softwaremill.quicklens._ import fr.acinq.bitcoin.scalacompat.ByteVector32 -import fr.acinq.bitcoin.scalacompat.Crypto.PrivateKey -import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.bitcoin.scalacompat.Crypto.{PrivateKey, PublicKey} import fr.acinq.eclair.TestDatabases.{TestPgDatabases, TestSqliteDatabases, migrationCheck} import fr.acinq.eclair.channel.RealScidStatus import fr.acinq.eclair.db.ChannelsDbSpec.{getPgTimestamp, getTimestamp, testCases} @@ -31,7 +30,7 @@ import fr.acinq.eclair.db.sqlite.SqliteChannelsDb import fr.acinq.eclair.db.sqlite.SqliteUtils.ExtendedResultSet._ import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec import fr.acinq.eclair.wire.internal.channel.ChannelCodecsSpec -import fr.acinq.eclair.{CltvExpiry, RealShortChannelId, TestDatabases, TimestampSecond, randomBytes32, randomKey} +import fr.acinq.eclair.{CltvExpiry, RealShortChannelId, TestDatabases, randomBytes32, randomKey} import org.scalatest.funsuite.AnyFunSuite import scodec.bits.ByteVector @@ -59,7 +58,6 @@ class ChannelsDbSpec extends AnyFunSuite { test("add/remove/list channels") { forAllDbs { dbs => val db = dbs.channels - dbs.pendingCommands // needed by db.removeChannel val channel1 = ChannelCodecsSpec.normal val channel2a = ChannelCodecsSpec.normal.modify(_.commitments.params.channelId).setTo(randomBytes32()) @@ -73,7 +71,7 @@ class ChannelsDbSpec extends AnyFunSuite { intercept[SQLException](db.addHtlcInfo(channel1.channelId, commitNumber, paymentHash1, cltvExpiry1)) // no related channel - assert(db.listLocalChannels().toSet == Set.empty) + assert(db.listLocalChannels().isEmpty) db.addOrUpdateChannel(channel1) db.addOrUpdateChannel(channel1) assert(db.listLocalChannels() == List(channel1)) @@ -85,11 +83,11 @@ class ChannelsDbSpec extends AnyFunSuite { assert(db.listLocalChannels() == List(channel1, channel2b)) assert(db.getChannel(channel2b.channelId).contains(channel2b)) - assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList == Nil) + assert(db.listHtlcInfos(channel1.channelId, commitNumber).isEmpty) db.addHtlcInfo(channel1.channelId, commitNumber, paymentHash1, cltvExpiry1) db.addHtlcInfo(channel1.channelId, commitNumber, paymentHash2, cltvExpiry2) - assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList.toSet == Set((paymentHash1, cltvExpiry1), (paymentHash2, cltvExpiry2))) - assert(db.listHtlcInfos(channel1.channelId, 43).toList == Nil) + assert(db.listHtlcInfos(channel1.channelId, commitNumber).toSet == Set((paymentHash1, cltvExpiry1), (paymentHash2, cltvExpiry2))) + assert(db.listHtlcInfos(channel1.channelId, commitNumber + 1).isEmpty) assert(db.listClosedChannels(None, None).isEmpty) db.removeChannel(channel1.channelId) @@ -97,11 +95,70 @@ class ChannelsDbSpec extends AnyFunSuite { assert(db.listLocalChannels() == List(channel2b)) assert(db.listClosedChannels(None, None) == List(channel1)) assert(db.listClosedChannels(Some(channel1.remoteNodeId), None) == List(channel1)) - assert(db.listClosedChannels(Some(PrivateKey(randomBytes32()).publicKey), None) == Nil) - assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList == Nil) + assert(db.listClosedChannels(Some(PrivateKey(randomBytes32()).publicKey), None).isEmpty) + db.removeChannel(channel2b.channelId) assert(db.getChannel(channel2b.channelId).isEmpty) - assert(db.listLocalChannels() == Nil) + assert(db.listLocalChannels().isEmpty) + } + } + + test("remove htlc infos") { + forAllDbs { dbs => + val db = dbs.channels + + val channel1 = ChannelCodecsSpec.normal + val channel2 = ChannelCodecsSpec.normal.modify(_.commitments.params.channelId).setTo(randomBytes32()) + db.addOrUpdateChannel(channel1) + db.addOrUpdateChannel(channel2) + + val commitNumberSplice1 = 50 + val commitNumberSplice2 = 75 + + // The first channel has one splice transaction and is then closed. + db.addHtlcInfo(channel1.channelId, 49, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel1.channelId, 50, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel1.channelId, 50, randomBytes32(), CltvExpiry(561)) + db.markHtlcInfosForRemoval(channel1.channelId, commitNumberSplice1) + db.addHtlcInfo(channel1.channelId, 51, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel1.channelId, 52, randomBytes32(), CltvExpiry(561)) + db.removeChannel(channel1.channelId) + + // The second channel has two splice transactions. + db.addHtlcInfo(channel2.channelId, 48, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel2.channelId, 48, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel2.channelId, 49, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel2.channelId, 50, randomBytes32(), CltvExpiry(561)) + db.markHtlcInfosForRemoval(channel2.channelId, commitNumberSplice1) + db.addHtlcInfo(channel2.channelId, 74, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel2.channelId, 75, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel2.channelId, 76, randomBytes32(), CltvExpiry(561)) + db.markHtlcInfosForRemoval(channel2.channelId, commitNumberSplice2) + + // We asynchronously clean-up the HTLC data from the DB in small batches. + val obsoleteHtlcInfo = Seq( + (channel1.channelId, 49), + (channel1.channelId, 50), + (channel1.channelId, 51), + (channel1.channelId, 52), + (channel2.channelId, 48), + (channel2.channelId, 49), + (channel2.channelId, 50), + (channel2.channelId, 74), + ) + db.removeHtlcInfos(10) // This should remove all the data for one of the two channels in one batch + assert(obsoleteHtlcInfo.flatMap { case (channelId, commitNumber) => db.listHtlcInfos(channelId, commitNumber) }.size == 5) + db.removeHtlcInfos(3) // This should remove only part of the data for the remaining channel + assert(obsoleteHtlcInfo.flatMap { case (channelId, commitNumber) => db.listHtlcInfos(channelId, commitNumber) }.size == 2) + db.removeHtlcInfos(3) // This should remove the rest of the data for the remaining channel + obsoleteHtlcInfo.foreach { case (channelId, commitNumber) => db.listHtlcInfos(channelId, commitNumber).isEmpty } + + // The remaining HTLC data shouldn't be removed. + assert(db.listHtlcInfos(channel2.channelId, 75).nonEmpty) + assert(db.listHtlcInfos(channel2.channelId, 76).nonEmpty) + db.removeHtlcInfos(10) // no-op + assert(db.listHtlcInfos(channel2.channelId, 75).nonEmpty) + assert(db.listHtlcInfos(channel2.channelId, 76).nonEmpty) } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleanerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleanerSpec.scala new file mode 100644 index 0000000000..2200e97534 --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleanerSpec.scala @@ -0,0 +1,84 @@ +/* + * Copyright 2023 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.db + +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.eventstream.EventStream +import com.softwaremill.quicklens.ModifyPimp +import com.typesafe.config.ConfigFactory +import fr.acinq.eclair.TestDatabases.TestSqliteDatabases +import fr.acinq.eclair.db.RevokedHtlcInfoCleaner.ForgetHtlcInfos +import fr.acinq.eclair.wire.internal.channel.ChannelCodecsSpec +import fr.acinq.eclair.{CltvExpiry, randomBytes32} +import org.scalatest.funsuite.AnyFunSuiteLike + +import scala.concurrent.duration.DurationInt + +class RevokedHtlcInfoCleanerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with AnyFunSuiteLike { + + test("remove htlc info from closed channels at regular intervals") { + val channelsDb = TestSqliteDatabases().channels + + val channelId = randomBytes32() + channelsDb.addOrUpdateChannel(ChannelCodecsSpec.normal.modify(_.commitments.params.channelId).setTo(channelId)) + channelsDb.addHtlcInfo(channelId, 17, randomBytes32(), CltvExpiry(561)) + channelsDb.addHtlcInfo(channelId, 19, randomBytes32(), CltvExpiry(1105)) + channelsDb.addHtlcInfo(channelId, 23, randomBytes32(), CltvExpiry(1729)) + channelsDb.removeChannel(channelId) + assert(channelsDb.listHtlcInfos(channelId, 17).nonEmpty) + assert(channelsDb.listHtlcInfos(channelId, 19).nonEmpty) + assert(channelsDb.listHtlcInfos(channelId, 23).nonEmpty) + + val config = RevokedHtlcInfoCleaner.Config(batchSize = 1, interval = 10 millis) + testKit.spawn(RevokedHtlcInfoCleaner(channelsDb, config)) + + eventually { + assert(channelsDb.listHtlcInfos(channelId, 17).isEmpty) + assert(channelsDb.listHtlcInfos(channelId, 19).isEmpty) + assert(channelsDb.listHtlcInfos(channelId, 23).isEmpty) + } + } + + test("remove htlc info from spliced channels at regular intervals") { + val channelsDb = TestSqliteDatabases().channels + + val channelId = randomBytes32() + channelsDb.addOrUpdateChannel(ChannelCodecsSpec.normal.modify(_.commitments.params.channelId).setTo(channelId)) + channelsDb.addHtlcInfo(channelId, 1, randomBytes32(), CltvExpiry(561)) + channelsDb.addHtlcInfo(channelId, 2, randomBytes32(), CltvExpiry(1105)) + channelsDb.addHtlcInfo(channelId, 2, randomBytes32(), CltvExpiry(1105)) + channelsDb.addHtlcInfo(channelId, 3, randomBytes32(), CltvExpiry(1729)) + channelsDb.addHtlcInfo(channelId, 3, randomBytes32(), CltvExpiry(1729)) + channelsDb.addHtlcInfo(channelId, 4, randomBytes32(), CltvExpiry(2465)) + (1 to 4).foreach(i => assert(channelsDb.listHtlcInfos(channelId, i).nonEmpty)) + + val config = RevokedHtlcInfoCleaner.Config(batchSize = 2, interval = 10 millis) + val htlcCleaner = testKit.spawn(RevokedHtlcInfoCleaner(channelsDb, config)) + + htlcCleaner ! ForgetHtlcInfos(channelId, beforeCommitIndex = 3) + eventually { + (1 to 2).foreach(i => assert(channelsDb.listHtlcInfos(channelId, i).isEmpty)) + } + (3 to 4).foreach(i => assert(channelsDb.listHtlcInfos(channelId, i).nonEmpty)) + + testKit.system.eventStream ! EventStream.Publish(ForgetHtlcInfos(channelId, beforeCommitIndex = 5)) + eventually { + (3 to 4).foreach(i => assert(channelsDb.listHtlcInfos(channelId, i).isEmpty)) + } + } + +} diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/json/JsonSerializersSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/json/JsonSerializersSpec.scala index 90ff1ecde8..6ff72c66cd 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/json/JsonSerializersSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/json/JsonSerializersSpec.scala @@ -133,7 +133,7 @@ class JsonSerializersSpec extends TestKitBaseClass with AnyFunSuiteLike with Mat Commitments( ChannelParams(dummyBytes32, ChannelConfig.standard, ChannelFeatures(), localParams, remoteParams, ChannelFlags(announceChannel = true)), CommitmentChanges(LocalChanges(Nil, Nil, Nil), RemoteChanges(Nil, Nil, Nil), localNextHtlcId = 1, remoteNextHtlcId = 1), - List(Commitment(0, dummyPublicKey, LocalFundingStatus.SingleFundedUnconfirmedFundingTx(None), RemoteFundingStatus.Locked, localCommit, remoteCommit, None)), + List(Commitment(0, 0, dummyPublicKey, LocalFundingStatus.SingleFundedUnconfirmedFundingTx(None), RemoteFundingStatus.Locked, localCommit, remoteCommit, None)), inactive = Nil, Right(dummyPublicKey), ShaChain.init, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/PaymentPacketSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/PaymentPacketSpec.scala index d3b7be71be..624d566df9 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/PaymentPacketSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/PaymentPacketSpec.scala @@ -737,7 +737,7 @@ object PaymentPacketSpec { new Commitments( ChannelParams(channelId, ChannelConfig.standard, channelFeatures, localParams, remoteParams, channelFlags), CommitmentChanges(localChanges, remoteChanges, 0, 0), - List(Commitment(0, null, LocalFundingStatus.SingleFundedUnconfirmedFundingTx(None), RemoteFundingStatus.Locked, localCommit, remoteCommit, None)), + List(Commitment(0, 0, null, LocalFundingStatus.SingleFundedUnconfirmedFundingTx(None), RemoteFundingStatus.Locked, localCommit, remoteCommit, None)), inactive = Nil, Right(randomKey().publicKey), ShaChain.init, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/wire/internal/channel/ChannelCodecsSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/wire/internal/channel/ChannelCodecsSpec.scala index cde6a80a7d..b6df964c9c 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/wire/internal/channel/ChannelCodecsSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/wire/internal/channel/ChannelCodecsSpec.scala @@ -93,7 +93,7 @@ class ChannelCodecsSpec extends AnyFunSuite { // and re-encode it with the new codec val bin_new = ByteVector(channelDataCodec.encode(data_new).require.toByteVector.toArray) // data should now be encoded under the new format - assert(bin_new.startsWith(hex"040000")) + assert(bin_new.startsWith(hex"04000a")) // now let's decode it again val data_new2 = channelDataCodec.decode(bin_new.toBitVector).require.value // data should match perfectly @@ -324,7 +324,7 @@ object ChannelCodecsSpec { val commitments = Commitments( ChannelParams(channelId, ChannelConfig.standard, ChannelFeatures(), localParams, remoteParams, channelFlags), CommitmentChanges(LocalChanges(Nil, Nil, Nil), RemoteChanges(Nil, Nil, Nil), localNextHtlcId = 32, remoteNextHtlcId = 4), - Seq(Commitment(fundingTxIndex, remoteFundingPubKey, LocalFundingStatus.SingleFundedUnconfirmedFundingTx(None), RemoteFundingStatus.NotLocked, localCommit, remoteCommit, None)), + Seq(Commitment(fundingTxIndex, 0, remoteFundingPubKey, LocalFundingStatus.SingleFundedUnconfirmedFundingTx(None), RemoteFundingStatus.NotLocked, localCommit, remoteCommit, None)), remoteNextCommitInfo = Right(randomKey().publicKey), remotePerCommitmentSecrets = ShaChain.init, originChannels = origins) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/wire/internal/channel/version4/ChannelCodecs4Spec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/wire/internal/channel/version4/ChannelCodecs4Spec.scala index 0eeb1bd1cc..a1706baef6 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/wire/internal/channel/version4/ChannelCodecs4Spec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/wire/internal/channel/version4/ChannelCodecs4Spec.scala @@ -182,7 +182,7 @@ class ChannelCodecs4Spec extends AnyFunSuite { PrivateKey(ByteVector.fromValidHex("02" * 32)).publicKey )), remoteFundingPubKey = PrivateKey(ByteVector.fromValidHex("01" * 32)).publicKey, - localOutputs = Nil, lockTime = 0, dustLimit = 330.sat, targetFeerate = FeeratePerKw(FeeratePerByte(3.sat)), requireConfirmedInputs = RequireConfirmedInputs(false, false)) + localOutputs = Nil, lockTime = 0, dustLimit = 330.sat, targetFeerate = FeeratePerKw(FeeratePerByte(3.sat)), requireConfirmedInputs = RequireConfirmedInputs(forLocal = false, forRemote = false)), ) assert(decoded == dualFundedUnconfirmedFundingTx) }