Skip to content

Commit

Permalink
add meta info to local_channels table
Browse files Browse the repository at this point in the history
Here is the rationale for implementing channel metadata as additional
columns in the `local_channels` table of the `channels` db, as opposed
to a dedicated `channel_metadata` table of a `audit` db:

1) There is a migration to do (in the `local_channels` table, no less!),
but it's just a table migration, as opposed to a data migration, if we
had to populate a new table in a separate database.
2) We don't need to worry about creating a new metadata line when a new
channel is created (compared to doing add-or-update stuff). It's only
_updating_ optional columns in a best-effort manner.
3) We don't need to worry about inconsistencies between two tables
located in two separated databases (that's a big one).
4) We may want to use the metadata during operations, not just for
audit purposes. For example to close channels that have stayed unused
for a long time.
5) The audit db is an append-only log of events and shouldn't be used
for anything else. There is no `UPDATE` sql statement in
`*AuditDb.scala`. The `channel_metadata` would break that heuristic.
  • Loading branch information
pm47 committed Mar 10, 2021
1 parent c09eae6 commit 4d0b6aa
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 24 deletions.
10 changes: 8 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,22 @@

package fr.acinq.eclair.db

import java.io.Closeable

import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.CltvExpiry
import fr.acinq.eclair.channel.HasCommitments
import fr.acinq.eclair.db.DbEventHandler.ChannelLifecycleEvent
import fr.acinq.eclair.payment.PaymentEvent

import java.io.Closeable

trait ChannelsDb extends Closeable {

def addOrUpdateChannel(state: HasCommitments): Unit

def updateChannelMeta(channelId: ByteVector32, event: ChannelLifecycleEvent.EventType)

def updateChannelMeta(channelId: ByteVector32, event: PaymentEvent)

def removeChannel(channelId: ByteVector32): Unit

def listLocalChannels(): Seq[HasCommitments]
Expand Down
21 changes: 13 additions & 8 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import fr.acinq.eclair.payment._
*/
class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging {

val db = nodeParams.db.audit
val auditDb: AuditDb = nodeParams.db.audit
val channelsDb: ChannelsDb = nodeParams.db.channels

context.system.eventStream.subscribe(self, classOf[PaymentEvent])
context.system.eventStream.subscribe(self, classOf[NetworkFeePaid])
Expand All @@ -46,15 +47,15 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging {
PaymentMetrics.PaymentAmount.withTag(PaymentTags.Direction, PaymentTags.Directions.Sent).record(e.recipientAmount.truncateToSatoshi.toLong)
PaymentMetrics.PaymentFees.withTag(PaymentTags.Direction, PaymentTags.Directions.Sent).record(e.feesPaid.truncateToSatoshi.toLong)
PaymentMetrics.PaymentParts.withTag(PaymentTags.Direction, PaymentTags.Directions.Sent).record(e.parts.length)
db.add(e)
auditDb.add(e)

case _: PaymentFailed =>
PaymentMetrics.PaymentFailed.withTag(PaymentTags.Direction, PaymentTags.Directions.Sent).increment()

case e: PaymentReceived =>
PaymentMetrics.PaymentAmount.withTag(PaymentTags.Direction, PaymentTags.Directions.Received).record(e.amount.truncateToSatoshi.toLong)
PaymentMetrics.PaymentParts.withTag(PaymentTags.Direction, PaymentTags.Directions.Received).record(e.parts.length)
db.add(e)
auditDb.add(e)

case e: PaymentRelayed =>
PaymentMetrics.PaymentAmount
Expand All @@ -71,26 +72,29 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging {
PaymentMetrics.PaymentParts.withTag(PaymentTags.Direction, PaymentTags.Directions.Sent).record(outgoing.length)
case _: ChannelPaymentRelayed =>
}
db.add(e)
auditDb.add(e)

case e: NetworkFeePaid => db.add(e)
case e: NetworkFeePaid => auditDb.add(e)

case e: ChannelErrorOccurred =>
e.error match {
case LocalError(_) if e.isFatal => ChannelMetrics.ChannelErrors.withTag(ChannelTags.Origin, ChannelTags.Origins.Local).withTag(ChannelTags.Fatal, value = true).increment()
case LocalError(_) if !e.isFatal => ChannelMetrics.ChannelErrors.withTag(ChannelTags.Origin, ChannelTags.Origins.Local).withTag(ChannelTags.Fatal, value = false).increment()
case RemoteError(_) => ChannelMetrics.ChannelErrors.withTag(ChannelTags.Origin, ChannelTags.Origins.Remote).increment()
}
db.add(e)
auditDb.add(e)

case e: ChannelStateChanged =>
// NB: order matters!
e match {
case ChannelStateChanged(_, channelId, _, remoteNodeId, WAIT_FOR_FUNDING_LOCKED, NORMAL, Some(commitments: Commitments)) =>
ChannelMetrics.ChannelLifecycleEvents.withTag(ChannelTags.Event, ChannelTags.Events.Created).increment()
val event = ChannelLifecycleEvent.EventType.Created
db.add(ChannelLifecycleEvent(channelId, remoteNodeId, commitments.capacity, commitments.localParams.isFunder, !commitments.announceChannel, event))
auditDb.add(ChannelLifecycleEvent(channelId, remoteNodeId, commitments.capacity, commitments.localParams.isFunder, !commitments.announceChannel, event))
channelsDb.updateChannelMeta(channelId, event)
case ChannelStateChanged(_, _, _, _, WAIT_FOR_INIT_INTERNAL, _, _) =>
case ChannelStateChanged(_, channelId, _, _, OFFLINE, SYNCING, _) =>
channelsDb.updateChannelMeta(channelId, ChannelLifecycleEvent.EventType.Connected)
case ChannelStateChanged(_, _, _, _, _, CLOSING, _) =>
ChannelMetrics.ChannelLifecycleEvents.withTag(ChannelTags.Event, ChannelTags.Events.Closing).increment()
case _ => ()
Expand All @@ -99,7 +103,8 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging {
case e: ChannelClosed =>
ChannelMetrics.ChannelLifecycleEvents.withTag(ChannelTags.Event, ChannelTags.Events.Closed).increment()
val event = ChannelLifecycleEvent.EventType.Closed(e.closingType)
db.add(ChannelLifecycleEvent(e.channelId, e.commitments.remoteParams.nodeId, e.commitments.commitInput.txOut.amount, e.commitments.localParams.isFunder, !e.commitments.announceChannel, event))
auditDb.add(ChannelLifecycleEvent(e.channelId, e.commitments.remoteParams.nodeId, e.commitments.commitInput.txOut.amount, e.commitments.localParams.isFunder, !e.commitments.announceChannel, event))
channelsDb.updateChannelMeta(e.channelId, event)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.CltvExpiry
import fr.acinq.eclair.channel.HasCommitments
import fr.acinq.eclair.db.ChannelsDb
import fr.acinq.eclair.db.DbEventHandler.ChannelLifecycleEvent
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.pg.PgUtils.DatabaseLock
import fr.acinq.eclair.payment.{PaymentEvent, PaymentReceived, PaymentSent}
import fr.acinq.eclair.wire.ChannelCodecs.stateDataCodec
import grizzled.slf4j.Logging
import javax.sql.DataSource

import java.sql.Statement
import javax.sql.DataSource
import scala.collection.immutable.Queue

class PgChannelsDb(implicit ds: DataSource, lock: DatabaseLock) extends ChannelsDb with Logging {
Expand All @@ -35,13 +38,25 @@ class PgChannelsDb(implicit ds: DataSource, lock: DatabaseLock) extends Channels
import lock._

val DB_NAME = "channels"
val CURRENT_VERSION = 2
val CURRENT_VERSION = 3

def migration23(statement: Statement) = {
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN created_timestamp BIGINT")
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN last_payment_sent_timestamp BIGINT")
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN last_payment_received_timestamp BIGINT")
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN last_connected_timestamp BIGINT")
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN closed_timestamp BIGINT")
}

inTransaction { pg =>
using(pg.createStatement()) { statement =>
getVersion(statement, DB_NAME, CURRENT_VERSION) match {
case 2 =>
logger.warn(s"migrating db $DB_NAME, found version=2 current=$CURRENT_VERSION")
migration23(statement)
setVersion(statement, DB_NAME, CURRENT_VERSION)
case CURRENT_VERSION =>
statement.executeUpdate("CREATE TABLE IF NOT EXISTS local_channels (channel_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT FALSE)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS local_channels (channel_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT FALSE, created_timestamp BIGINT, last_payment_sent_timestamp BIGINT, last_payment_received_timestamp BIGINT, last_connected_timestamp BIGINT, closed_timestamp BIGINT)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS htlc_infos (channel_id TEXT NOT NULL, commitment_number TEXT NOT NULL, payment_hash TEXT NOT NULL, cltv_expiry BIGINT NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS htlc_infos_idx ON htlc_infos(channel_id, commitment_number)")
case unknownVersion => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
Expand All @@ -56,7 +71,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: DatabaseLock) extends Channels
update.setBytes(1, data)
update.setString(2, state.channelId.toHex)
if (update.executeUpdate() == 0) {
using(pg.prepareStatement("INSERT INTO local_channels VALUES (?, ?, FALSE)")) { statement =>
using(pg.prepareStatement("INSERT INTO local_channels (channel_id, data, is_closed) VALUES (?, ?, FALSE)")) { statement =>
statement.setString(1, state.channelId.toHex)
statement.setBytes(2, data)
statement.executeUpdate()
Expand All @@ -66,6 +81,38 @@ class PgChannelsDb(implicit ds: DataSource, lock: DatabaseLock) extends Channels
}
}

/**
* Helper method to factor updating timestamp columns
*/
private def updateChannelMetaTimestampColumn(channelId: ByteVector32, columnName: String): Unit = {
inTransaction { pg =>
using(pg.prepareStatement(s"UPDATE local_channels SET $columnName=? WHERE channel_id=?")) { statement =>
statement.setLong(1, System.currentTimeMillis)
statement.setString(2, channelId.toHex)
statement.executeUpdate()
}
}
}

override def updateChannelMeta(channelId: ByteVector32, event: ChannelLifecycleEvent.EventType): Unit = {
val timestampColumn_opt = event match {
case ChannelLifecycleEvent.EventType.Created => Some("created_timestamp")
case ChannelLifecycleEvent.EventType.Connected => Some("last_connected_timestamp")
case _: ChannelLifecycleEvent.EventType.Closed => Some("closed_timestamp")
case _ => None
}
timestampColumn_opt.foreach(updateChannelMetaTimestampColumn(channelId, _))
}

override def updateChannelMeta(channelId: ByteVector32, event: PaymentEvent): Unit = {
val timestampColumn_opt = event match {
case _: PaymentSent => Some("last_payment_sent_timestamp")
case _: PaymentReceived => Some("last_payment_received_timestamp")
case _ => None
}
timestampColumn_opt.foreach(updateChannelMetaTimestampColumn(channelId, _))
}

override def removeChannel(channelId: ByteVector32): Unit = withMetrics("channels/remove-channel") {
withLock { pg =>
using(pg.prepareStatement("DELETE FROM pending_relay WHERE channel_id=?")) { statement =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@

package fr.acinq.eclair.db.sqlite

import java.sql.{Connection, Statement}

import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.CltvExpiry
import fr.acinq.eclair.channel.HasCommitments
import fr.acinq.eclair.db.ChannelsDb
import fr.acinq.eclair.db.DbEventHandler.ChannelLifecycleEvent
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.payment.{PaymentEvent, PaymentReceived, PaymentSent}
import fr.acinq.eclair.wire.ChannelCodecs.stateDataCodec
import grizzled.slf4j.Logging

import java.sql.{Connection, Statement}
import scala.collection.immutable.Queue

class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging {
Expand All @@ -34,7 +35,7 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging {
import SqliteUtils._

val DB_NAME = "channels"
val CURRENT_VERSION = 2
val CURRENT_VERSION = 3

// The SQLite documentation states that "It is not possible to enable or disable foreign key constraints in the middle
// of a multi-statement transaction (when SQLite is not in autocommit mode).".
Expand All @@ -49,13 +50,26 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging {
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN is_closed BOOLEAN NOT NULL DEFAULT 0")
}

def migration23(statement: Statement) = {
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN created_timestamp INTEGER")
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN last_payment_sent_timestamp INTEGER")
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN last_payment_received_timestamp INTEGER")
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN last_connected_timestamp INTEGER")
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN closed_timestamp INTEGER")
}

getVersion(statement, DB_NAME, CURRENT_VERSION) match {
case 1 =>
logger.warn(s"migrating db $DB_NAME, found version=1 current=$CURRENT_VERSION")
migration12(statement)
migration23(statement)
setVersion(statement, DB_NAME, CURRENT_VERSION)
case 2 =>
logger.warn(s"migrating db $DB_NAME, found version=2 current=$CURRENT_VERSION")
migration23(statement)
setVersion(statement, DB_NAME, CURRENT_VERSION)
case CURRENT_VERSION =>
statement.executeUpdate("CREATE TABLE IF NOT EXISTS local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0, created_timestamp INTEGER, last_payment_sent_timestamp INTEGER, last_payment_received_timestamp INTEGER, last_connected_timestamp INTEGER, closed_timestamp INTEGER)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS htlc_infos (channel_id BLOB NOT NULL, commitment_number BLOB NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS htlc_infos_idx ON htlc_infos(channel_id, commitment_number)")
case unknownVersion => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
Expand All @@ -69,7 +83,7 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging {
update.setBytes(1, data)
update.setBytes(2, state.channelId.toArray)
if (update.executeUpdate() == 0) {
using(sqlite.prepareStatement("INSERT INTO local_channels VALUES (?, ?, 0)")) { statement =>
using(sqlite.prepareStatement("INSERT INTO local_channels (channel_id, data, is_closed) VALUES (?, ?, 0)")) { statement =>
statement.setBytes(1, state.channelId.toArray)
statement.setBytes(2, data)
statement.executeUpdate()
Expand All @@ -78,6 +92,36 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging {
}
}

/**
* Helper method to factor updating timestamp columns
*/
private def updateChannelMetaTimestampColumn(channelId: ByteVector32, columnName: String): Unit = {
using(sqlite.prepareStatement(s"UPDATE local_channels SET $columnName=? WHERE channel_id=?")) { statement =>
statement.setLong(1, System.currentTimeMillis)
statement.setBytes(2, channelId.toArray)
statement.executeUpdate()
}
}

override def updateChannelMeta(channelId: ByteVector32, event: ChannelLifecycleEvent.EventType): Unit = {
val timestampColumn_opt = event match {
case ChannelLifecycleEvent.EventType.Created => Some("created_timestamp")
case ChannelLifecycleEvent.EventType.Connected => Some("last_connected_timestamp")
case _: ChannelLifecycleEvent.EventType.Closed => Some("closed_timestamp")
case _ => None
}
timestampColumn_opt.foreach(updateChannelMetaTimestampColumn(channelId, _))
}

override def updateChannelMeta(channelId: ByteVector32, event: PaymentEvent): Unit = {
val timestampColumn_opt = event match {
case _: PaymentSent => Some("last_payment_sent_timestamp")
case _: PaymentReceived => Some("last_payment_received_timestamp")
case _ => None
}
timestampColumn_opt.foreach(updateChannelMetaTimestampColumn(channelId, _))
}

override def removeChannel(channelId: ByteVector32): Unit = withMetrics("channels/remove-channel") {
using(sqlite.prepareStatement("DELETE FROM pending_relay WHERE channel_id=?")) { statement =>
statement.setBytes(1, channelId.toArray)
Expand Down
Loading

0 comments on commit 4d0b6aa

Please sign in to comment.