Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Index database metrics by backend #1758

Merged
merged 2 commits into from
Apr 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ object Databases extends Logging {
}

object SqliteDatabases {
def apply(auditJdbc: Connection, networkJdbc: Connection, eclairJdbc: Connection): Databases = SqliteDatabases(
def apply(auditJdbc: Connection, networkJdbc: Connection, eclairJdbc: Connection): SqliteDatabases = SqliteDatabases(
network = new SqliteNetworkDb(networkJdbc),
audit = new SqliteAuditDb(auditJdbc),
channels = new SqliteChannelsDb(eclairJdbc),
Expand Down Expand Up @@ -155,7 +155,7 @@ object Databases extends Logging {
/**
* Given a parent folder it creates or loads all the databases from a JDBC connection
*/
def sqlite(dbdir: File): Databases = {
def sqlite(dbdir: File): SqliteDatabases = {
dbdir.mkdir()
var sqliteEclair: Connection = null
var sqliteNetwork: Connection = null
Expand Down
19 changes: 13 additions & 6 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/Monitoring.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,31 @@ package fr.acinq.eclair.db

import fr.acinq.eclair.KamonExt
import kamon.Kamon
import kamon.metric.Metric

object Monitoring {

object Metrics {
val FileBackupCompleted = Kamon.counter("db.file-backup.completed")
val FileBackupDuration = Kamon.timer("db.file-backup.duration")
val FileBackupCompleted: Metric.Counter = Kamon.counter("db.file-backup.completed")
val FileBackupDuration: Metric.Timer = Kamon.timer("db.file-backup.duration")

val DbOperation = Kamon.counter("db.operation.execute")
val DbOperationDuration = Kamon.timer("db.operation.duration")
private val DbOperation: Metric.Counter = Kamon.counter("db.operation.execute")
private val DbOperationDuration: Metric.Timer = Kamon.timer("db.operation.duration")

def withMetrics[T](name: String)(operation: => T): T = KamonExt.time(DbOperationDuration.withTag(Tags.DbOperation, name)) {
DbOperation.withTag(Tags.DbOperation, name).increment()
def withMetrics[T](name: String, backend: String)(operation: => T): T = KamonExt.time(DbOperationDuration.withTag(Tags.DbOperation, name).withTag(Tags.DbBackend, backend)) {
DbOperation.withTag(Tags.DbOperation, name).withTag(Tags.DbBackend, backend).increment()
operation
}
}

object Tags {
val DbOperation = "operation"
val DbBackend = "backend"

object DbBackends {
val Sqlite = "sqlite"
val Postgres = "postgres"
}
}

}
13 changes: 7 additions & 6 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import fr.acinq.eclair.channel.{ChannelErrorOccurred, LocalError, NetworkFeePaid
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.db._
import fr.acinq.eclair.payment._
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong}
Expand Down Expand Up @@ -66,7 +67,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}
}

override def add(e: ChannelEvent): Unit = withMetrics("audit/add-channel-lifecycle") {
override def add(e: ChannelEvent): Unit = withMetrics("audit/add-channel-lifecycle", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO channel_events VALUES (?, ?, ?, ?, ?, ?, ?)")) { statement =>
statement.setString(1, e.channelId.toHex)
Expand All @@ -81,7 +82,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}
}

override def add(e: PaymentSent): Unit = withMetrics("audit/add-payment-sent") {
override def add(e: PaymentSent): Unit = withMetrics("audit/add-payment-sent", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO sent VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
e.parts.foreach(p => {
Expand All @@ -102,7 +103,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}
}

override def add(e: PaymentReceived): Unit = withMetrics("audit/add-payment-received") {
override def add(e: PaymentReceived): Unit = withMetrics("audit/add-payment-received", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO received VALUES (?, ?, ?, ?)")) { statement =>
e.parts.foreach(p => {
Expand All @@ -117,7 +118,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}
}

override def add(e: PaymentRelayed): Unit = withMetrics("audit/add-payment-relayed") {
override def add(e: PaymentRelayed): Unit = withMetrics("audit/add-payment-relayed", DbBackends.Postgres) {
inTransaction { pg =>
val payments = e match {
case ChannelPaymentRelayed(amountIn, amountOut, _, fromChannelId, toChannelId, ts) =>
Expand All @@ -141,7 +142,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}
}

override def add(e: NetworkFeePaid): Unit = withMetrics("audit/add-network-fee") {
override def add(e: NetworkFeePaid): Unit = withMetrics("audit/add-network-fee", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO network_fees VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
statement.setString(1, e.channelId.toHex)
Expand All @@ -155,7 +156,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}
}

override def add(e: ChannelErrorOccurred): Unit = withMetrics("audit/add-channel-error") {
override def add(e: ChannelErrorOccurred): Unit = withMetrics("audit/add-channel-error", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO channel_errors VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
val (errorName, errorMessage) = e.error match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import fr.acinq.eclair.channel.HasCommitments
import fr.acinq.eclair.db.ChannelsDb
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.db.pg.PgUtils.PgLock
import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.stateDataCodec
import grizzled.slf4j.Logging
Expand Down Expand Up @@ -63,7 +64,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
}
}

override def addOrUpdateChannel(state: HasCommitments): Unit = withMetrics("channels/add-or-update-channel") {
override def addOrUpdateChannel(state: HasCommitments): Unit = withMetrics("channels/add-or-update-channel", DbBackends.Postgres) {
withLock { pg =>
val data = stateDataCodec.encode(state).require.toByteArray
using(pg.prepareStatement("UPDATE local_channels SET data=? WHERE channel_id=?")) { update =>
Expand Down Expand Up @@ -105,7 +106,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
timestampColumn_opt.foreach(updateChannelMetaTimestampColumn(channelId, _))
}

override def removeChannel(channelId: ByteVector32): Unit = withMetrics("channels/remove-channel") {
override def removeChannel(channelId: ByteVector32): Unit = withMetrics("channels/remove-channel", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("DELETE FROM pending_relay WHERE channel_id=?")) { statement =>
statement.setString(1, channelId.toHex)
Expand All @@ -124,7 +125,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
}
}

override def listLocalChannels(): Seq[HasCommitments] = withMetrics("channels/list-local-channels") {
override def listLocalChannels(): Seq[HasCommitments] = withMetrics("channels/list-local-channels", DbBackends.Postgres) {
withLock { pg =>
using(pg.createStatement) { statement =>
val rs = statement.executeQuery("SELECT data FROM local_channels WHERE is_closed=FALSE")
Expand All @@ -133,7 +134,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
}
}

override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = withMetrics("channels/add-htlc-info") {
override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = withMetrics("channels/add-htlc-info", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("INSERT INTO htlc_infos VALUES (?, ?, ?, ?)")) { statement =>
statement.setString(1, channelId.toHex)
Expand All @@ -145,7 +146,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
}
}

override def listHtlcInfos(channelId: ByteVector32, commitmentNumber: Long): Seq[(ByteVector32, CltvExpiry)] = withMetrics("channels/list-htlc-infos") {
override def listHtlcInfos(channelId: ByteVector32, commitmentNumber: Long): Seq[(ByteVector32, CltvExpiry)] = withMetrics("channels/list-htlc-infos", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("SELECT payment_hash, cltv_expiry FROM htlc_infos WHERE channel_id=? AND commitment_number=?")) { statement =>
statement.setString(1, channelId.toHex)
Expand Down
27 changes: 14 additions & 13 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgNetworkDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ package fr.acinq.eclair.db.pg
import fr.acinq.bitcoin.{ByteVector32, Crypto, Satoshi}
import fr.acinq.eclair.ShortChannelId
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.db.NetworkDb
import fr.acinq.eclair.router.Router.PublicChannel
import fr.acinq.eclair.wire.protocol.LightningMessageCodecs.{channelAnnouncementCodec, channelUpdateCodec, nodeAnnouncementCodec}
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement}
import grizzled.slf4j.Logging
import javax.sql.DataSource

import javax.sql.DataSource
import scala.collection.immutable.SortedMap

class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
Expand All @@ -48,7 +49,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}
}

override def addNode(n: NodeAnnouncement): Unit = withMetrics("network/add-node") {
override def addNode(n: NodeAnnouncement): Unit = withMetrics("network/add-node", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO nodes VALUES (?, ?) ON CONFLICT DO NOTHING")) { statement =>
statement.setString(1, n.nodeId.value.toHex)
Expand All @@ -58,7 +59,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}
}

override def updateNode(n: NodeAnnouncement): Unit = withMetrics("network/update-node") {
override def updateNode(n: NodeAnnouncement): Unit = withMetrics("network/update-node", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("UPDATE nodes SET data=? WHERE node_id=?")) { statement =>
statement.setBytes(1, nodeAnnouncementCodec.encode(n).require.toByteArray)
Expand All @@ -68,7 +69,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}
}

override def getNode(nodeId: Crypto.PublicKey): Option[NodeAnnouncement] = withMetrics("network/get-node") {
override def getNode(nodeId: Crypto.PublicKey): Option[NodeAnnouncement] = withMetrics("network/get-node", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("SELECT data FROM nodes WHERE node_id=?")) { statement =>
statement.setString(1, nodeId.value.toHex)
Expand All @@ -78,7 +79,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}
}

override def removeNode(nodeId: Crypto.PublicKey): Unit = withMetrics("network/remove-node") {
override def removeNode(nodeId: Crypto.PublicKey): Unit = withMetrics("network/remove-node", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("DELETE FROM nodes WHERE node_id=?")) { statement =>
statement.setString(1, nodeId.value.toHex)
Expand All @@ -87,7 +88,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}
}

override def listNodes(): Seq[NodeAnnouncement] = withMetrics("network/list-nodes") {
override def listNodes(): Seq[NodeAnnouncement] = withMetrics("network/list-nodes", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.createStatement()) { statement =>
val rs = statement.executeQuery("SELECT data FROM nodes")
Expand All @@ -96,7 +97,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}
}

override def addChannel(c: ChannelAnnouncement, txid: ByteVector32, capacity: Satoshi): Unit = withMetrics("network/add-channel") {
override def addChannel(c: ChannelAnnouncement, txid: ByteVector32, capacity: Satoshi): Unit = withMetrics("network/add-channel", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO channels VALUES (?, ?, ?, ?) ON CONFLICT DO NOTHING")) { statement =>
statement.setLong(1, c.shortChannelId.toLong)
Expand All @@ -108,7 +109,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}
}

override def updateChannel(u: ChannelUpdate): Unit = withMetrics("network/update-channel") {
override def updateChannel(u: ChannelUpdate): Unit = withMetrics("network/update-channel", DbBackends.Postgres) {
val column = if (u.isNode1) "channel_update_1" else "channel_update_2"
inTransaction { pg =>
using(pg.prepareStatement(s"UPDATE channels SET $column=? WHERE short_channel_id=?")) { statement =>
Expand All @@ -119,7 +120,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}
}

override def listChannels(): SortedMap[ShortChannelId, PublicChannel] = withMetrics("network/list-channels") {
override def listChannels(): SortedMap[ShortChannelId, PublicChannel] = withMetrics("network/list-channels", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.createStatement()) { statement =>
val rs = statement.executeQuery("SELECT channel_announcement, txid, capacity_sat, channel_update_1, channel_update_2 FROM channels")
Expand All @@ -137,7 +138,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}
}

override def removeChannels(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/remove-channels") {
override def removeChannels(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/remove-channels", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.createStatement) { statement =>
shortChannelIds
Expand All @@ -150,7 +151,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}
}

override def addToPruned(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/add-to-pruned") {
override def addToPruned(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/add-to-pruned", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO pruned VALUES (?) ON CONFLICT DO NOTHING")) { statement =>
shortChannelIds.foreach(shortChannelId => {
Expand All @@ -162,15 +163,15 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
}
}

override def removeFromPruned(shortChannelId: ShortChannelId): Unit = withMetrics("network/remove-from-pruned") {
override def removeFromPruned(shortChannelId: ShortChannelId): Unit = withMetrics("network/remove-from-pruned", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.createStatement) { statement =>
statement.executeUpdate(s"DELETE FROM pruned WHERE short_channel_id=${shortChannelId.toLong}")
}
}
}

override def isPruned(shortChannelId: ShortChannelId): Boolean = withMetrics("network/is-pruned") {
override def isPruned(shortChannelId: ShortChannelId): Boolean = withMetrics("network/is-pruned", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("SELECT short_channel_id from pruned WHERE short_channel_id=?")) { statement =>
statement.setLong(1, shortChannelId.toLong)
Expand Down
Loading