Skip to content

Commit

Permalink
Asynchronously clean up obsolete HTLC info from DB
Browse files Browse the repository at this point in the history
When a channel is closed, we can forget the data from historical HTLCs
sent and received through that channel (which is otherwise required to
punish cheating attempts by our peer).

We previously synchronously removed that data from the DB when the closing
transaction confirmed. However, this could create performance issues as
the `htlc_infos` table can be very large for busy nodes and many concurrent
writes may be happening at the same time.

We don't need to get rid of this data immediately: we only want to remove
it to avoid degrading the performance of active channels that read and
write to the `htlc_infos` table. We now mark channels as closed in a
dedicated table, and run a background actor that deletes batches of
obsolete htlc data at regular intervals. This ensures that the table is
eventually cleaned up, without impacting the performance of active
channels.

Fixes #2610 and #2702
  • Loading branch information
t-bast committed Sep 21, 2023
1 parent 6f87137 commit efd305f
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 41 deletions.
6 changes: 6 additions & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,12 @@ eclair {
migrate-on-restart = false // migrate sqlite -> postgres on restart (only applies if sqlite is primary)
compare-on-restart = false // compare sqlite and postgres dbs on restart (only applies if sqlite is primary)
}
closed-channels-cleaner {
// Number of old HTLCs to delete per batch: a higher value will clean up closed channels faster, but may have a higher impact on performance.
batch-size = 50000
// Frequency at which batches of HTLCs are deleted: a lower value will clean up closed channels faster, but may have a higher impact on performance.
interval = 5 minutes
}
}

file-backup {
Expand Down
9 changes: 7 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
blockchainWatchdogThreshold: Int,
blockchainWatchdogSources: Seq[String],
onionMessageConfig: OnionMessageConfig,
purgeInvoicesInterval: Option[FiniteDuration]) {
purgeInvoicesInterval: Option[FiniteDuration],
closedChannelsCleanerConfig: ClosedChannelsCleaner.Config) {
val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey

val nodeId: PublicKey = nodeKeyManager.nodeId
Expand Down Expand Up @@ -599,7 +600,11 @@ object NodeParams extends Logging {
timeout = FiniteDuration(config.getDuration("onion-messages.reply-timeout").getSeconds, TimeUnit.SECONDS),
maxAttempts = config.getInt("onion-messages.max-attempts"),
),
purgeInvoicesInterval = purgeInvoicesInterval
purgeInvoicesInterval = purgeInvoicesInterval,
closedChannelsCleanerConfig = ClosedChannelsCleaner.Config(
batchSize = config.getInt("db.closed-channels-cleaner.batch-size"),
interval = FiniteDuration(config.getDuration("db.closed-channels-cleaner.interval").getSeconds, TimeUnit.SECONDS)
)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ trait ChannelsDb {

def removeChannel(channelId: ByteVector32): Unit

def removeHtlcInfos(batchSize: Int): Unit

def listLocalChannels(): Seq[PersistentChannelData]

def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2023 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.db

import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors

import scala.concurrent.duration.FiniteDuration

/**
* When a channel is closed, we can remove the information about old HTLCs that was stored in the DB to punish revoked commitments.
* We potentially have millions of rows to delete per channel, and there is no rush to remove them.
* We don't want this to negatively impact active channels, so this actor deletes that data in small batches, at regular intervals.
*/
object ClosedChannelsCleaner {

// @formatter:off
sealed trait Command
private case object DeleteBatch extends Command
// @formatter:on

case class Config(batchSize: Int, interval: FiniteDuration)

def apply(db: ChannelsDb, config: Config): Behavior[Command] = {
Behaviors.setup { _ =>
Behaviors.withTimers { timers =>
timers.startTimerWithFixedDelay(DeleteBatch, config.interval)
Behaviors.receiveMessage {
case DeleteBatch =>
db.removeHtlcInfos(config.batchSize)
Behaviors.same
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package fr.acinq.eclair.db

import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter.ClassicActorContextOps
import akka.actor.{Actor, DiagnosticActorLogging, Props}
import akka.event.Logging.MDC
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
Expand All @@ -33,8 +36,10 @@ import fr.acinq.eclair.{Logs, NodeParams}
*/
class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorLogging {

val auditDb: AuditDb = nodeParams.db.audit
val channelsDb: ChannelsDb = nodeParams.db.channels
private val auditDb: AuditDb = nodeParams.db.audit
private val channelsDb: ChannelsDb = nodeParams.db.channels

context.spawn(Behaviors.supervise(ClosedChannelsCleaner(channelsDb, nodeParams.closedChannelsCleanerConfig)).onFailure(SupervisorStrategy.restart), name = "closed-channels-cleaner")

context.system.eventStream.subscribe(self, classOf[PaymentSent])
context.system.eventStream.subscribe(self, classOf[PaymentFailed])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
import fr.acinq.eclair.router.Router
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAddress, NodeAnnouncement}
import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli, TimestampSecond}
import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli}
import grizzled.slf4j.Logging

import java.io.File
Expand Down Expand Up @@ -225,6 +225,11 @@ case class DualChannelsDb(primary: ChannelsDb, secondary: ChannelsDb) extends Ch
primary.removeChannel(channelId)
}

override def removeHtlcInfos(batchSize: Int): Unit = {
runAsync(secondary.removeHtlcInfos(batchSize))
primary.removeHtlcInfos(batchSize)
}

override def listLocalChannels(): Seq[PersistentChannelData] = {
runAsync(secondary.listLocalChannels())
primary.listLocalChannels()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import javax.sql.DataSource

object PgChannelsDb {
val DB_NAME = "channels"
val CURRENT_VERSION = 8
val CURRENT_VERSION = 9
}

class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb with Logging {
Expand Down Expand Up @@ -65,7 +65,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
statement.executeUpdate("ALTER TABLE local_channels ALTER COLUMN last_connected_timestamp SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + last_connected_timestamp * interval '1 millisecond'")
statement.executeUpdate("ALTER TABLE local_channels ALTER COLUMN closed_timestamp SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + closed_timestamp * interval '1 millisecond'")

statement.executeUpdate("ALTER TABLE htlc_infos ALTER COLUMN commitment_number SET DATA TYPE BIGINT USING commitment_number::BIGINT")
statement.executeUpdate("ALTER TABLE htlc_infos ALTER COLUMN commitment_number SET DATA TYPE BIGINT USING commitment_number::BIGINT")
}

def migration45(statement: Statement): Unit = {
Expand Down Expand Up @@ -115,12 +115,17 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
statement.executeUpdate("CREATE INDEX local_channels_remote_node_id_idx ON local.channels(remote_node_id)")
}

def migration89(statement: Statement): Unit = {
statement.executeUpdate("CREATE TABLE local.closed_channels_to_clean_up (channel_id TEXT NOT NULL PRIMARY KEY)")
}

getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local")

statement.executeUpdate("CREATE TABLE local.channels (channel_id TEXT NOT NULL PRIMARY KEY, remote_node_id TEXT NOT NULL, data BYTEA NOT NULL, json JSONB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT FALSE, created_timestamp TIMESTAMP WITH TIME ZONE, last_payment_sent_timestamp TIMESTAMP WITH TIME ZONE, last_payment_received_timestamp TIMESTAMP WITH TIME ZONE, last_connected_timestamp TIMESTAMP WITH TIME ZONE, closed_timestamp TIMESTAMP WITH TIME ZONE)")
statement.executeUpdate("CREATE TABLE local.htlc_infos (channel_id TEXT NOT NULL, commitment_number BIGINT NOT NULL, payment_hash TEXT NOT NULL, cltv_expiry BIGINT NOT NULL, FOREIGN KEY(channel_id) REFERENCES local.channels(channel_id))")
statement.executeUpdate("CREATE TABLE local.closed_channels_to_clean_up (channel_id TEXT NOT NULL PRIMARY KEY)")

statement.executeUpdate("CREATE INDEX local_channels_type_idx ON local.channels ((json->>'type'))")
statement.executeUpdate("CREATE INDEX local_channels_remote_node_id_idx ON local.channels(remote_node_id)")
Expand All @@ -145,6 +150,9 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
if (v < 8) {
migration78(statement)
}
if (v < 9) {
migration89(statement)
}
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand Down Expand Up @@ -225,7 +233,9 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
statement.executeUpdate()
}

using(pg.prepareStatement("DELETE FROM local.htlc_infos WHERE channel_id=?")) { statement =>
// The htlc_infos may contain millions of rows, which is very expensive to delete synchronously.
// We instead run an asynchronous job to clean up that data in small batches.
using(pg.prepareStatement("INSERT INTO local.closed_channels_to_clean_up VALUES (?) ON CONFLICT DO NOTHING")) { statement =>
statement.setString(1, channelId.toHex)
statement.executeUpdate()
}
Expand All @@ -238,6 +248,30 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
}
}

override def removeHtlcInfos(batchSize: Int): Unit = withMetrics("channels/remove-htlc-infos", DbBackends.Postgres) {
withLock { pg =>
// Check if there are channels that need to be cleaned up.
val channelId_opt = using(pg.prepareStatement("SELECT channel_id FROM local.closed_channels_to_clean_up LIMIT 1")) { statement =>
statement.executeQuery().map(rs => ByteVector32(rs.getByteVector32FromHex("channel_id"))).lastOption
}
// Remove a batch of HTLC information for that channel.
channelId_opt.foreach(channelId => {
val deletedCount = using(pg.prepareStatement(s"DELETE FROM local.htlc_infos WHERE channel_id=? AND commitment_number IN (SELECT commitment_number FROM local.htlc_infos WHERE channel_id=? LIMIT $batchSize)")) { statement =>
statement.setString(1, channelId.toHex)
statement.setString(2, channelId.toHex)
statement.executeUpdate()
}
// If we've deleted all HTLC information for that channel, we can now remove it from the DB.
if (deletedCount < batchSize) {
using(pg.prepareStatement("DELETE FROM local.closed_channels_to_clean_up WHERE channel_id=?")) { statement =>
statement.setString(1, channelId.toHex)
statement.executeUpdate()
}
}
})
}
}

override def listLocalChannels(): Seq[PersistentChannelData] = withMetrics("channels/list-local-channels", DbBackends.Postgres) {
withLock { pg =>
using(pg.createStatement) { statement =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec
import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampMilli, TimestampSecond}
import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampMilli}
import grizzled.slf4j.Logging
import scodec.bits.BitVector

import java.sql.{Connection, Statement}

object SqliteChannelsDb {
val DB_NAME = "channels"
val CURRENT_VERSION = 4
val CURRENT_VERSION = 5
}

class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging {
Expand Down Expand Up @@ -79,10 +79,15 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging {
)(logger)
}

def migration45(): Unit = {
statement.executeUpdate("CREATE TABLE closed_channels_to_clean_up (channel_id BLOB NOT NULL PRIMARY KEY)")
}

getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE TABLE local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0, created_timestamp INTEGER, last_payment_sent_timestamp INTEGER, last_payment_received_timestamp INTEGER, last_connected_timestamp INTEGER, closed_timestamp INTEGER)")
statement.executeUpdate("CREATE TABLE htlc_infos (channel_id BLOB NOT NULL, commitment_number INTEGER NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))")
statement.executeUpdate("CREATE TABLE closed_channels_to_clean_up (channel_id BLOB NOT NULL PRIMARY KEY)")
statement.executeUpdate("CREATE INDEX htlc_infos_idx ON htlc_infos(channel_id, commitment_number)")
case Some(v@(1 | 2 | 3)) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
Expand All @@ -95,6 +100,9 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging {
if (v < 4) {
migration34()
}
if (v < 5) {
migration45()
}
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand Down Expand Up @@ -152,7 +160,9 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging {
statement.executeUpdate()
}

using(sqlite.prepareStatement("DELETE FROM htlc_infos WHERE channel_id=?")) { statement =>
// The htlc_infos may contain millions of rows, which is very expensive to delete synchronously.
// We instead run an asynchronous job to clean up that data in small batches.
using(sqlite.prepareStatement("INSERT INTO closed_channels_to_clean_up VALUES (?)")) { statement =>
statement.setBytes(1, channelId.toArray)
statement.executeUpdate()
}
Expand All @@ -164,6 +174,28 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging {
}
}

override def removeHtlcInfos(batchSize: Int): Unit = withMetrics("channels/remove-htlc-infos", DbBackends.Sqlite) {
// Check if there are channels that need to be cleaned up.
val channelId_opt = using(sqlite.prepareStatement("SELECT channel_id FROM closed_channels_to_clean_up LIMIT 1")) { statement =>
statement.executeQuery().map(rs => ByteVector32(rs.getByteVector32("channel_id"))).lastOption
}
// Remove a batch of HTLC information for that channel.
channelId_opt.foreach(channelId => {
val deletedCount = using(sqlite.prepareStatement(s"DELETE FROM htlc_infos WHERE channel_id=? AND commitment_number IN (SELECT commitment_number FROM htlc_infos WHERE channel_id=? LIMIT $batchSize)")) { statement =>
statement.setBytes(1, channelId.toArray)
statement.setBytes(2, channelId.toArray)
statement.executeUpdate()
}
// If we've deleted all HTLC information for that channel, we can now remove it from the DB.
if (deletedCount < batchSize) {
using(sqlite.prepareStatement("DELETE FROM closed_channels_to_clean_up WHERE channel_id=?")) { statement =>
statement.setBytes(1, channelId.toArray)
statement.executeUpdate()
}
}
})
}

override def listLocalChannels(): Seq[PersistentChannelData] = withMetrics("channels/list-local-channels", DbBackends.Sqlite) {
using(sqlite.createStatement) { statement =>
statement.executeQuery("SELECT data FROM local_channels WHERE is_closed=0")
Expand All @@ -175,21 +207,21 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging {
override def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Sqlite) {
val sql = "SELECT data FROM local_channels WHERE is_closed=1 ORDER BY closed_timestamp DESC"
remoteNodeId_opt match {
case None =>
using(sqlite.prepareStatement(limited(sql, paginated_opt))) { statement =>
statement.executeQuery().mapCodec(channelDataCodec).toSeq
}
case Some(nodeId) =>
using(sqlite.prepareStatement(sql)) { statement =>
val filtered = statement.executeQuery()
.mapCodec(channelDataCodec).filter(_.remoteNodeId == nodeId)
val limited = paginated_opt match {
case None => filtered
case Some(p) => filtered.slice(p.skip, p.skip + p.count)
}
limited.toSeq
case None =>
using(sqlite.prepareStatement(limited(sql, paginated_opt))) { statement =>
statement.executeQuery().mapCodec(channelDataCodec).toSeq
}
case Some(nodeId) =>
using(sqlite.prepareStatement(sql)) { statement =>
val filtered = statement.executeQuery()
.mapCodec(channelDataCodec).filter(_.remoteNodeId == nodeId)
val limited = paginated_opt match {
case None => filtered
case Some(p) => filtered.slice(p.skip, p.skip + p.count)
}
}
limited.toSeq
}
}
}

override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = withMetrics("channels/add-htlc-info", DbBackends.Sqlite) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import fr.acinq.eclair.blockchain.fee._
import fr.acinq.eclair.channel.fsm.Channel.{ChannelConf, RemoteRbfLimits, UnhandledExceptionStrategy}
import fr.acinq.eclair.channel.{ChannelFlags, LocalParams}
import fr.acinq.eclair.crypto.keymanager.{LocalChannelKeyManager, LocalNodeKeyManager}
import fr.acinq.eclair.db.ClosedChannelsCleaner
import fr.acinq.eclair.io.MessageRelay.RelayAll
import fr.acinq.eclair.io.{OpenChannelInterceptor, PeerConnection}
import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig
Expand Down Expand Up @@ -220,7 +221,8 @@ object TestConstants {
timeout = 200 millis,
maxAttempts = 2,
),
purgeInvoicesInterval = None
purgeInvoicesInterval = None,
closedChannelsCleanerConfig = ClosedChannelsCleaner.Config(10, 100 millis)
)

def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams(
Expand Down Expand Up @@ -381,7 +383,8 @@ object TestConstants {
timeout = 100 millis,
maxAttempts = 2,
),
purgeInvoicesInterval = None
purgeInvoicesInterval = None,
closedChannelsCleanerConfig = ClosedChannelsCleaner.Config(10, 100 millis)
)

def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams(
Expand Down
Loading

0 comments on commit efd305f

Please sign in to comment.