From 4d0b6aa6b2ea3fd4d2db814bdfb29b169ac1a02e Mon Sep 17 00:00:00 2001 From: pm47 Date: Mon, 8 Mar 2021 15:53:36 +0100 Subject: [PATCH] add meta info to local_channels table Here is the rationale for implementing channel metadata as additional columns in the `local_channels` table of the `channels` db, as opposed to a dedicated `channel_metadata` table of a `audit` db: 1) There is a migration to do (in the `local_channels` table, no less!), but it's just a table migration, as opposed to a data migration, if we had to populate a new table in a separate database. 2) We don't need to worry about creating a new metadata line when a new channel is created (compared to doing add-or-update stuff). It's only _updating_ optional columns in a best-effort manner. 3) We don't need to worry about inconsistencies between two tables located in two separated databases (that's a big one). 4) We may want to use the metadata during operations, not just for audit purposes. For example to close channels that have stayed unused for a long time. 5) The audit db is an append-only log of events and shouldn't be used for anything else. There is no `UPDATE` sql statement in `*AuditDb.scala`. The `channel_metadata` would break that heuristic. --- .../scala/fr/acinq/eclair/db/ChannelsDb.scala | 10 +- .../fr/acinq/eclair/db/DbEventHandler.scala | 21 ++-- .../fr/acinq/eclair/db/pg/PgChannelsDb.scala | 55 ++++++++- .../eclair/db/sqlite/SqliteChannelsDb.scala | 54 ++++++++- .../eclair/db/SqliteChannelsDbSpec.scala | 107 +++++++++++++++++- 5 files changed, 223 insertions(+), 24 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala index 00824c303a..2dd974826c 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala @@ -16,16 +16,22 @@ package fr.acinq.eclair.db -import java.io.Closeable - import fr.acinq.bitcoin.ByteVector32 import fr.acinq.eclair.CltvExpiry import fr.acinq.eclair.channel.HasCommitments +import fr.acinq.eclair.db.DbEventHandler.ChannelLifecycleEvent +import fr.acinq.eclair.payment.PaymentEvent + +import java.io.Closeable trait ChannelsDb extends Closeable { def addOrUpdateChannel(state: HasCommitments): Unit + def updateChannelMeta(channelId: ByteVector32, event: ChannelLifecycleEvent.EventType) + + def updateChannelMeta(channelId: ByteVector32, event: PaymentEvent) + def removeChannel(channelId: ByteVector32): Unit def listLocalChannels(): Seq[HasCommitments] diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala index dd24f3abc0..f6bf83f325 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala @@ -32,7 +32,8 @@ import fr.acinq.eclair.payment._ */ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging { - val db = nodeParams.db.audit + val auditDb: AuditDb = nodeParams.db.audit + val channelsDb: ChannelsDb = nodeParams.db.channels context.system.eventStream.subscribe(self, classOf[PaymentEvent]) context.system.eventStream.subscribe(self, classOf[NetworkFeePaid]) @@ -46,7 +47,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging { PaymentMetrics.PaymentAmount.withTag(PaymentTags.Direction, PaymentTags.Directions.Sent).record(e.recipientAmount.truncateToSatoshi.toLong) PaymentMetrics.PaymentFees.withTag(PaymentTags.Direction, PaymentTags.Directions.Sent).record(e.feesPaid.truncateToSatoshi.toLong) PaymentMetrics.PaymentParts.withTag(PaymentTags.Direction, PaymentTags.Directions.Sent).record(e.parts.length) - db.add(e) + auditDb.add(e) case _: PaymentFailed => PaymentMetrics.PaymentFailed.withTag(PaymentTags.Direction, PaymentTags.Directions.Sent).increment() @@ -54,7 +55,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging { case e: PaymentReceived => PaymentMetrics.PaymentAmount.withTag(PaymentTags.Direction, PaymentTags.Directions.Received).record(e.amount.truncateToSatoshi.toLong) PaymentMetrics.PaymentParts.withTag(PaymentTags.Direction, PaymentTags.Directions.Received).record(e.parts.length) - db.add(e) + auditDb.add(e) case e: PaymentRelayed => PaymentMetrics.PaymentAmount @@ -71,9 +72,9 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging { PaymentMetrics.PaymentParts.withTag(PaymentTags.Direction, PaymentTags.Directions.Sent).record(outgoing.length) case _: ChannelPaymentRelayed => } - db.add(e) + auditDb.add(e) - case e: NetworkFeePaid => db.add(e) + case e: NetworkFeePaid => auditDb.add(e) case e: ChannelErrorOccurred => e.error match { @@ -81,7 +82,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging { case LocalError(_) if !e.isFatal => ChannelMetrics.ChannelErrors.withTag(ChannelTags.Origin, ChannelTags.Origins.Local).withTag(ChannelTags.Fatal, value = false).increment() case RemoteError(_) => ChannelMetrics.ChannelErrors.withTag(ChannelTags.Origin, ChannelTags.Origins.Remote).increment() } - db.add(e) + auditDb.add(e) case e: ChannelStateChanged => // NB: order matters! @@ -89,8 +90,11 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging { case ChannelStateChanged(_, channelId, _, remoteNodeId, WAIT_FOR_FUNDING_LOCKED, NORMAL, Some(commitments: Commitments)) => ChannelMetrics.ChannelLifecycleEvents.withTag(ChannelTags.Event, ChannelTags.Events.Created).increment() val event = ChannelLifecycleEvent.EventType.Created - db.add(ChannelLifecycleEvent(channelId, remoteNodeId, commitments.capacity, commitments.localParams.isFunder, !commitments.announceChannel, event)) + auditDb.add(ChannelLifecycleEvent(channelId, remoteNodeId, commitments.capacity, commitments.localParams.isFunder, !commitments.announceChannel, event)) + channelsDb.updateChannelMeta(channelId, event) case ChannelStateChanged(_, _, _, _, WAIT_FOR_INIT_INTERNAL, _, _) => + case ChannelStateChanged(_, channelId, _, _, OFFLINE, SYNCING, _) => + channelsDb.updateChannelMeta(channelId, ChannelLifecycleEvent.EventType.Connected) case ChannelStateChanged(_, _, _, _, _, CLOSING, _) => ChannelMetrics.ChannelLifecycleEvents.withTag(ChannelTags.Event, ChannelTags.Events.Closing).increment() case _ => () @@ -99,7 +103,8 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging { case e: ChannelClosed => ChannelMetrics.ChannelLifecycleEvents.withTag(ChannelTags.Event, ChannelTags.Events.Closed).increment() val event = ChannelLifecycleEvent.EventType.Closed(e.closingType) - db.add(ChannelLifecycleEvent(e.channelId, e.commitments.remoteParams.nodeId, e.commitments.commitInput.txOut.amount, e.commitments.localParams.isFunder, !e.commitments.announceChannel, event)) + auditDb.add(ChannelLifecycleEvent(e.channelId, e.commitments.remoteParams.nodeId, e.commitments.commitInput.txOut.amount, e.commitments.localParams.isFunder, !e.commitments.announceChannel, event)) + channelsDb.updateChannelMeta(e.channelId, event) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala index 852a1d28ae..1519da3c24 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala @@ -20,12 +20,15 @@ import fr.acinq.bitcoin.ByteVector32 import fr.acinq.eclair.CltvExpiry import fr.acinq.eclair.channel.HasCommitments import fr.acinq.eclair.db.ChannelsDb +import fr.acinq.eclair.db.DbEventHandler.ChannelLifecycleEvent import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics import fr.acinq.eclair.db.pg.PgUtils.DatabaseLock +import fr.acinq.eclair.payment.{PaymentEvent, PaymentReceived, PaymentSent} import fr.acinq.eclair.wire.ChannelCodecs.stateDataCodec import grizzled.slf4j.Logging -import javax.sql.DataSource +import java.sql.Statement +import javax.sql.DataSource import scala.collection.immutable.Queue class PgChannelsDb(implicit ds: DataSource, lock: DatabaseLock) extends ChannelsDb with Logging { @@ -35,13 +38,25 @@ class PgChannelsDb(implicit ds: DataSource, lock: DatabaseLock) extends Channels import lock._ val DB_NAME = "channels" - val CURRENT_VERSION = 2 + val CURRENT_VERSION = 3 + + def migration23(statement: Statement) = { + statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN created_timestamp BIGINT") + statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN last_payment_sent_timestamp BIGINT") + statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN last_payment_received_timestamp BIGINT") + statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN last_connected_timestamp BIGINT") + statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN closed_timestamp BIGINT") + } inTransaction { pg => using(pg.createStatement()) { statement => getVersion(statement, DB_NAME, CURRENT_VERSION) match { + case 2 => + logger.warn(s"migrating db $DB_NAME, found version=2 current=$CURRENT_VERSION") + migration23(statement) + setVersion(statement, DB_NAME, CURRENT_VERSION) case CURRENT_VERSION => - statement.executeUpdate("CREATE TABLE IF NOT EXISTS local_channels (channel_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT FALSE)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS local_channels (channel_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT FALSE, created_timestamp BIGINT, last_payment_sent_timestamp BIGINT, last_payment_received_timestamp BIGINT, last_connected_timestamp BIGINT, closed_timestamp BIGINT)") statement.executeUpdate("CREATE TABLE IF NOT EXISTS htlc_infos (channel_id TEXT NOT NULL, commitment_number TEXT NOT NULL, payment_hash TEXT NOT NULL, cltv_expiry BIGINT NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))") statement.executeUpdate("CREATE INDEX IF NOT EXISTS htlc_infos_idx ON htlc_infos(channel_id, commitment_number)") case unknownVersion => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") @@ -56,7 +71,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: DatabaseLock) extends Channels update.setBytes(1, data) update.setString(2, state.channelId.toHex) if (update.executeUpdate() == 0) { - using(pg.prepareStatement("INSERT INTO local_channels VALUES (?, ?, FALSE)")) { statement => + using(pg.prepareStatement("INSERT INTO local_channels (channel_id, data, is_closed) VALUES (?, ?, FALSE)")) { statement => statement.setString(1, state.channelId.toHex) statement.setBytes(2, data) statement.executeUpdate() @@ -66,6 +81,38 @@ class PgChannelsDb(implicit ds: DataSource, lock: DatabaseLock) extends Channels } } + /** + * Helper method to factor updating timestamp columns + */ + private def updateChannelMetaTimestampColumn(channelId: ByteVector32, columnName: String): Unit = { + inTransaction { pg => + using(pg.prepareStatement(s"UPDATE local_channels SET $columnName=? WHERE channel_id=?")) { statement => + statement.setLong(1, System.currentTimeMillis) + statement.setString(2, channelId.toHex) + statement.executeUpdate() + } + } + } + + override def updateChannelMeta(channelId: ByteVector32, event: ChannelLifecycleEvent.EventType): Unit = { + val timestampColumn_opt = event match { + case ChannelLifecycleEvent.EventType.Created => Some("created_timestamp") + case ChannelLifecycleEvent.EventType.Connected => Some("last_connected_timestamp") + case _: ChannelLifecycleEvent.EventType.Closed => Some("closed_timestamp") + case _ => None + } + timestampColumn_opt.foreach(updateChannelMetaTimestampColumn(channelId, _)) + } + + override def updateChannelMeta(channelId: ByteVector32, event: PaymentEvent): Unit = { + val timestampColumn_opt = event match { + case _: PaymentSent => Some("last_payment_sent_timestamp") + case _: PaymentReceived => Some("last_payment_received_timestamp") + case _ => None + } + timestampColumn_opt.foreach(updateChannelMetaTimestampColumn(channelId, _)) + } + override def removeChannel(channelId: ByteVector32): Unit = withMetrics("channels/remove-channel") { withLock { pg => using(pg.prepareStatement("DELETE FROM pending_relay WHERE channel_id=?")) { statement => diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala index a49cc6c3ac..b2afbda7fa 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala @@ -16,16 +16,17 @@ package fr.acinq.eclair.db.sqlite -import java.sql.{Connection, Statement} - import fr.acinq.bitcoin.ByteVector32 import fr.acinq.eclair.CltvExpiry import fr.acinq.eclair.channel.HasCommitments import fr.acinq.eclair.db.ChannelsDb +import fr.acinq.eclair.db.DbEventHandler.ChannelLifecycleEvent import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics +import fr.acinq.eclair.payment.{PaymentEvent, PaymentReceived, PaymentSent} import fr.acinq.eclair.wire.ChannelCodecs.stateDataCodec import grizzled.slf4j.Logging +import java.sql.{Connection, Statement} import scala.collection.immutable.Queue class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging { @@ -34,7 +35,7 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging { import SqliteUtils._ val DB_NAME = "channels" - val CURRENT_VERSION = 2 + val CURRENT_VERSION = 3 // The SQLite documentation states that "It is not possible to enable or disable foreign key constraints in the middle // of a multi-statement transaction (when SQLite is not in autocommit mode).". @@ -49,13 +50,26 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging { statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN is_closed BOOLEAN NOT NULL DEFAULT 0") } + def migration23(statement: Statement) = { + statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN created_timestamp INTEGER") + statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN last_payment_sent_timestamp INTEGER") + statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN last_payment_received_timestamp INTEGER") + statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN last_connected_timestamp INTEGER") + statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN closed_timestamp INTEGER") + } + getVersion(statement, DB_NAME, CURRENT_VERSION) match { case 1 => logger.warn(s"migrating db $DB_NAME, found version=1 current=$CURRENT_VERSION") migration12(statement) + migration23(statement) + setVersion(statement, DB_NAME, CURRENT_VERSION) + case 2 => + logger.warn(s"migrating db $DB_NAME, found version=2 current=$CURRENT_VERSION") + migration23(statement) setVersion(statement, DB_NAME, CURRENT_VERSION) case CURRENT_VERSION => - statement.executeUpdate("CREATE TABLE IF NOT EXISTS local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0, created_timestamp INTEGER, last_payment_sent_timestamp INTEGER, last_payment_received_timestamp INTEGER, last_connected_timestamp INTEGER, closed_timestamp INTEGER)") statement.executeUpdate("CREATE TABLE IF NOT EXISTS htlc_infos (channel_id BLOB NOT NULL, commitment_number BLOB NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))") statement.executeUpdate("CREATE INDEX IF NOT EXISTS htlc_infos_idx ON htlc_infos(channel_id, commitment_number)") case unknownVersion => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") @@ -69,7 +83,7 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging { update.setBytes(1, data) update.setBytes(2, state.channelId.toArray) if (update.executeUpdate() == 0) { - using(sqlite.prepareStatement("INSERT INTO local_channels VALUES (?, ?, 0)")) { statement => + using(sqlite.prepareStatement("INSERT INTO local_channels (channel_id, data, is_closed) VALUES (?, ?, 0)")) { statement => statement.setBytes(1, state.channelId.toArray) statement.setBytes(2, data) statement.executeUpdate() @@ -78,6 +92,36 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging { } } + /** + * Helper method to factor updating timestamp columns + */ + private def updateChannelMetaTimestampColumn(channelId: ByteVector32, columnName: String): Unit = { + using(sqlite.prepareStatement(s"UPDATE local_channels SET $columnName=? WHERE channel_id=?")) { statement => + statement.setLong(1, System.currentTimeMillis) + statement.setBytes(2, channelId.toArray) + statement.executeUpdate() + } + } + + override def updateChannelMeta(channelId: ByteVector32, event: ChannelLifecycleEvent.EventType): Unit = { + val timestampColumn_opt = event match { + case ChannelLifecycleEvent.EventType.Created => Some("created_timestamp") + case ChannelLifecycleEvent.EventType.Connected => Some("last_connected_timestamp") + case _: ChannelLifecycleEvent.EventType.Closed => Some("closed_timestamp") + case _ => None + } + timestampColumn_opt.foreach(updateChannelMetaTimestampColumn(channelId, _)) + } + + override def updateChannelMeta(channelId: ByteVector32, event: PaymentEvent): Unit = { + val timestampColumn_opt = event match { + case _: PaymentSent => Some("last_payment_sent_timestamp") + case _: PaymentReceived => Some("last_payment_received_timestamp") + case _ => None + } + timestampColumn_opt.foreach(updateChannelMetaTimestampColumn(channelId, _)) + } + override def removeChannel(channelId: ByteVector32): Unit = withMetrics("channels/remove-channel") { using(sqlite.prepareStatement("DELETE FROM pending_relay WHERE channel_id=?")) { statement => statement.setBytes(1, channelId.toArray) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/SqliteChannelsDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/SqliteChannelsDbSpec.scala index 02fa64be24..35b73d12d1 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/SqliteChannelsDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/SqliteChannelsDbSpec.scala @@ -16,18 +16,22 @@ package fr.acinq.eclair.db -import java.sql.SQLException - +import com.softwaremill.quicklens._ import fr.acinq.bitcoin.ByteVector32 -import fr.acinq.eclair.CltvExpiry import fr.acinq.eclair.TestConstants.{TestPgDatabases, TestSqliteDatabases, forAllDbs} +import fr.acinq.eclair.db.DbEventHandler.ChannelLifecycleEvent import fr.acinq.eclair.db.sqlite.SqliteChannelsDb +import fr.acinq.eclair.db.sqlite.SqliteUtils.ExtendedResultSet._ import fr.acinq.eclair.db.sqlite.SqliteUtils.{getVersion, using} +import fr.acinq.eclair.payment.{PaymentReceived, PaymentSent} import fr.acinq.eclair.wire.ChannelCodecs.stateDataCodec import fr.acinq.eclair.wire.ChannelCodecsSpec +import fr.acinq.eclair.{CltvExpiry, MilliSatoshiLong, randomBytes32, randomKey} import org.scalatest.funsuite.AnyFunSuite import scodec.bits.ByteVector +import java.sql.SQLException + class SqliteChannelsDbSpec extends AnyFunSuite { test("init sqlite 2 times in a row") { @@ -69,7 +73,64 @@ class SqliteChannelsDbSpec extends AnyFunSuite { } } - test("migrate channel database v1 -> v2") { + test("channel metadata") { + forAllDbs { dbs => + val db = dbs.channels() + val connection = dbs.connection + + val channel1 = ChannelCodecsSpec.normal + val channel2 = channel1.modify(_.commitments.channelId).setTo(randomBytes32) + + def getTimestamp(channelId: ByteVector32, columnName: String): Option[Long] = { + using(connection.prepareStatement(s"SELECT $columnName FROM local_channels WHERE channel_id=?")) { statement => + // data type differs depending on underlying database system + dbs match { + case _: TestPgDatabases => statement.setString(1, channelId.toHex) + case _: TestSqliteDatabases => statement.setBytes(1, channelId.toArray) + } + val rs = statement.executeQuery() + rs.next() + rs.getLongNullable(columnName) + } + } + + // first we add channels + db.addOrUpdateChannel(channel1) + db.addOrUpdateChannel(channel2) + + // make sure initially all metadata are empty + assert(getTimestamp(channel1.channelId, "created_timestamp").isEmpty) + assert(getTimestamp(channel1.channelId, "last_payment_sent_timestamp").isEmpty) + assert(getTimestamp(channel1.channelId, "last_payment_received_timestamp").isEmpty) + assert(getTimestamp(channel1.channelId, "last_connected_timestamp").isEmpty) + assert(getTimestamp(channel1.channelId, "closed_timestamp").isEmpty) + + db.updateChannelMeta(channel1.channelId, ChannelLifecycleEvent.EventType.Created) + assert(getTimestamp(channel1.channelId, "created_timestamp").nonEmpty) + + db.updateChannelMeta(channel1.channelId, PaymentSent(null, randomBytes32, randomBytes32, 40000 msat, randomKey.publicKey, PaymentSent.PartialPayment(null, 42000 msat, 1000 msat, randomBytes32, None) :: Nil)) + assert(getTimestamp(channel1.channelId, "last_payment_sent_timestamp").nonEmpty) + + db.updateChannelMeta(channel1.channelId, PaymentReceived(randomBytes32, PaymentReceived.PartialPayment(42000 msat, randomBytes32) :: Nil)) + assert(getTimestamp(channel1.channelId, "last_payment_received_timestamp").nonEmpty) + + db.updateChannelMeta(channel1.channelId, ChannelLifecycleEvent.EventType.Connected) + assert(getTimestamp(channel1.channelId, "last_connected_timestamp").nonEmpty) + + db.updateChannelMeta(channel1.channelId, ChannelLifecycleEvent.EventType.Closed(null)) + assert(getTimestamp(channel1.channelId, "closed_timestamp").nonEmpty) + + // make sure all metadata are still empty for channel 2 + assert(getTimestamp(channel2.channelId, "created_timestamp").isEmpty) + assert(getTimestamp(channel2.channelId, "last_payment_sent_timestamp").isEmpty) + assert(getTimestamp(channel2.channelId, "last_payment_received_timestamp").isEmpty) + assert(getTimestamp(channel2.channelId, "last_connected_timestamp").isEmpty) + assert(getTimestamp(channel2.channelId, "closed_timestamp").isEmpty) + + } + } + + test("migrate channel database v1 -> v3") { forAllDbs { case _: TestPgDatabases => // no migration case dbs: TestSqliteDatabases => @@ -96,9 +157,45 @@ class SqliteChannelsDbSpec extends AnyFunSuite { // check that db migration works val db = new SqliteChannelsDb(sqlite) using(sqlite.createStatement()) { statement => - assert(getVersion(statement, "channels", 1) == 2) // version changed from 1 -> 2 + assert(getVersion(statement, "channels", 1) == 3) // version changed from 1 -> 3 + } + assert(db.listLocalChannels() === List(channel)) + db.updateChannelMeta(channel.channelId, ChannelLifecycleEvent.EventType.Created) // this call must not fail + } + } + + test("migrate channel database v2 -> v3") { + forAllDbs { + case _: TestPgDatabases => // no migration + case dbs: TestSqliteDatabases => + val sqlite = dbs.connection + + // create a v2 channels database + using(sqlite.createStatement()) { statement => + getVersion(statement, "channels", 2) + statement.execute("PRAGMA foreign_keys = ON") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0)") + statement.executeUpdate("CREATE TABLE IF NOT EXISTS htlc_infos (channel_id BLOB NOT NULL, commitment_number BLOB NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))") + statement.executeUpdate("CREATE INDEX IF NOT EXISTS htlc_infos_idx ON htlc_infos(channel_id, commitment_number)") + } + + // insert 1 row + val channel = ChannelCodecsSpec.normal + val data = stateDataCodec.encode(channel).require.toByteArray + using(sqlite.prepareStatement("INSERT INTO local_channels VALUES (?, ?, ?)")) { statement => + statement.setBytes(1, channel.channelId.toArray) + statement.setBytes(2, data) + statement.setBoolean(3, false) + statement.executeUpdate() + } + + // check that db migration works + val db = new SqliteChannelsDb(sqlite) + using(sqlite.createStatement()) { statement => + assert(getVersion(statement, "channels", 2) == 3) // version changed from 2 -> 3 } assert(db.listLocalChannels() === List(channel)) + db.updateChannelMeta(channel.channelId, ChannelLifecycleEvent.EventType.Created) // this call must not fail } } } \ No newline at end of file