Skip to content

Commit

Permalink
Modify to work with PeerSwap as a plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
remyers committed Oct 20, 2022
1 parent 1270c07 commit 32c2a96
Show file tree
Hide file tree
Showing 22 changed files with 63 additions and 289 deletions.
22 changes: 0 additions & 22 deletions eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ import fr.acinq.eclair.payment.send.MultiPartPaymentLifecycle.PreimageReceived
import fr.acinq.eclair.payment.send.PaymentInitiator._
import fr.acinq.eclair.router.Router
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.swap.SwapRegister
import fr.acinq.eclair.swap.SwapResponses.{Response, Status}
import fr.acinq.eclair.wire.protocol.MessageOnionCodecs.blindedRouteCodec
import fr.acinq.eclair.wire.protocol._
import grizzled.slf4j.Logging
Expand Down Expand Up @@ -165,14 +163,6 @@ trait Eclair {
def sendOnionMessage(intermediateNodes: Seq[PublicKey], destination: Either[PublicKey, Sphinx.RouteBlinding.BlindedRoute], replyPath: Option[Seq[PublicKey]], userCustomContent: ByteVector)(implicit timeout: Timeout): Future[SendOnionMessageResponse]

def stop(): Future[Unit]

def swapIn(shortChannelId: ShortChannelId, amount: Satoshi)(implicit timeout: Timeout): Future[Response]

def swapOut(shortChannelId: ShortChannelId, amount: Satoshi)(implicit timeout: Timeout): Future[Response]

def listSwaps()(implicit timeout: Timeout): Future[Iterable[Status]]

def cancelSwap(swapId: String)(implicit timeout: Timeout): Future[Response]
}

class EclairImpl(appKit: Kit) extends Eclair with Logging {
Expand Down Expand Up @@ -590,16 +580,4 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
sys.exit(0)
Future.successful(())
}

override def swapIn(shortChannelId: ShortChannelId, amount: Satoshi)(implicit timeout: Timeout): Future[Response] =
appKit.swapRegister.ask(ref => SwapRegister.SwapInRequested(ref, amount, shortChannelId))(timeout, appKit.system.scheduler.toTyped)

override def swapOut(shortChannelId: ShortChannelId, amount: Satoshi)(implicit timeout: Timeout): Future[Response] =
appKit.swapRegister.ask(ref => SwapRegister.SwapOutRequested(ref, amount, shortChannelId))(timeout, appKit.system.scheduler.toTyped)

override def listSwaps()(implicit timeout: Timeout): Future[Iterable[Status]] =
appKit.swapRegister.ask(ref => SwapRegister.ListPendingSwaps(ref))(timeout, appKit.system.scheduler.toTyped)

override def cancelSwap(swapId: String)(implicit timeout: Timeout): Future[Response] =
appKit.swapRegister.ask(ref => SwapRegister.CancelSwapRequested(ref, swapId))(timeout, appKit.system.scheduler.toTyped)
}
9 changes: 2 additions & 7 deletions eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import fr.acinq.eclair.router.Announcements.AddressException
import fr.acinq.eclair.router.Graph.{HeuristicsConstants, WeightRatios}
import fr.acinq.eclair.router.PathFindingExperimentConf
import fr.acinq.eclair.router.Router.{MultiPartParams, PathFindingConf, RouterConf, SearchBoundaries}
import fr.acinq.eclair.swap.SwapKeyManager
import fr.acinq.eclair.tor.Socks5ProxyParams
import fr.acinq.eclair.wire.protocol._
import grizzled.slf4j.Logging
Expand All @@ -55,7 +54,6 @@ import scala.jdk.CollectionConverters._
*/
case class NodeParams(nodeKeyManager: NodeKeyManager,
channelKeyManager: ChannelKeyManager,
swapKeyManager: SwapKeyManager,
instanceId: UUID, // a unique instance ID regenerated after each restart
private val blockHeight: AtomicLong,
alias: String,
Expand Down Expand Up @@ -142,7 +140,6 @@ object NodeParams extends Logging {
val oldSeedPath = new File(datadir, "seed.dat")
val nodeSeedFilename: String = "node_seed.dat"
val channelSeedFilename: String = "channel_seed.dat"
val swapSeedFilename: String = "swap_seed.dat"

def getSeed(filename: String): ByteVector = {
val seedPath = new File(datadir, filename)
Expand All @@ -160,8 +157,7 @@ object NodeParams extends Logging {

val nodeSeed = getSeed(nodeSeedFilename)
val channelSeed = getSeed(channelSeedFilename)
val swapSeed = getSeed(swapSeedFilename)
Seeds(nodeSeed, channelSeed, swapSeed)
Seeds(nodeSeed, channelSeed)
}

private val chain2Hash: Map[String, ByteVector32] = Map(
Expand Down Expand Up @@ -192,7 +188,7 @@ object NodeParams extends Logging {
}
}

def makeNodeParams(config: Config, instanceId: UUID, nodeKeyManager: NodeKeyManager, channelKeyManager: ChannelKeyManager, swapKeyManager: SwapKeyManager,
def makeNodeParams(config: Config, instanceId: UUID, nodeKeyManager: NodeKeyManager, channelKeyManager: ChannelKeyManager,
torAddress_opt: Option[NodeAddress], database: Databases, blockHeight: AtomicLong, feeEstimator: FeeEstimator,
pluginParams: Seq[PluginParams] = Nil): NodeParams = {
// check configuration for keys that have been renamed
Expand Down Expand Up @@ -426,7 +422,6 @@ object NodeParams extends Logging {
NodeParams(
nodeKeyManager = nodeKeyManager,
channelKeyManager = channelKeyManager,
swapKeyManager = swapKeyManager,
instanceId = instanceId,
blockHeight = blockHeight,
alias = nodeAlias,
Expand Down
17 changes: 6 additions & 11 deletions eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import fr.acinq.eclair.payment.receive.PaymentHandler
import fr.acinq.eclair.payment.relay.Relayer
import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator}
import fr.acinq.eclair.router._
import fr.acinq.eclair.swap.{LocalSwapKeyManager, SwapRegister}
import fr.acinq.eclair.tor.{Controller, TorProtocolHandler}
import fr.acinq.eclair.wire.protocol.NodeAddress
import grizzled.slf4j.Logging
Expand Down Expand Up @@ -94,13 +93,12 @@ class Setup(val datadir: File,

datadir.mkdirs()
val config = system.settings.config.getConfig("eclair")
val Seeds(nodeSeed, channelSeed, swapSeed) = seeds_opt.getOrElse(NodeParams.getSeeds(datadir))
val Seeds(nodeSeed, channelSeed) = seeds_opt.getOrElse(NodeParams.getSeeds(datadir))
val chain = config.getString("chain")
val chaindir = new File(datadir, chain)
chaindir.mkdirs()
val nodeKeyManager = new LocalNodeKeyManager(nodeSeed, NodeParams.hashFromChain(chain))
val channelKeyManager = new LocalChannelKeyManager(channelSeed, NodeParams.hashFromChain(chain))
val swapKeyManager = new LocalSwapKeyManager(swapSeed, NodeParams.hashFromChain(chain))
val instanceId = UUID.randomUUID()

logger.info(s"instanceid=$instanceId")
Expand Down Expand Up @@ -134,7 +132,7 @@ class Setup(val datadir: File,
// @formatter:on
}

val nodeParams = NodeParams.makeNodeParams(config, instanceId, nodeKeyManager, channelKeyManager, swapKeyManager, initTor(), databases, blockHeight, feeEstimator, pluginParams)
val nodeParams = NodeParams.makeNodeParams(config, instanceId, nodeKeyManager, channelKeyManager, initTor(), databases, blockHeight, feeEstimator, pluginParams)
pluginParams.foreach(param => logger.info(s"using plugin=${param.name}"))

val serverBindingAddress = new InetSocketAddress(config.getString("server.binding-ip"), config.getInt("server.port"))
Expand Down Expand Up @@ -306,8 +304,7 @@ class Setup(val datadir: File,
txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, watcher, bitcoinClient)
channelFactory = Peer.SimpleChannelFactory(nodeParams, watcher, relayer, bitcoinClient, txPublisherFactory)
paymentInitiator = system.actorOf(SimpleSupervisor.props(PaymentInitiator.props(nodeParams, PaymentInitiator.SimplePaymentFactory(nodeParams, router, register)), "payment-initiator", SupervisorStrategy.Restart))
swapRegister = system.spawn(Behaviors.supervise(SwapRegister(nodeParams, paymentInitiator, watcher, register, bitcoinClient, nodeParams.db.swaps.restore().toSet)).onFailure(typed.SupervisorStrategy.resume), "swap-register")
peerFactory = Switchboard.SimplePeerFactory(nodeParams, bitcoinClient, channelFactory, swapRegister)
peerFactory = Switchboard.SimplePeerFactory(nodeParams, bitcoinClient, channelFactory)

switchboard = system.actorOf(SimpleSupervisor.props(Switchboard.props(nodeParams, peerFactory), "switchboard", SupervisorStrategy.Resume))
clientSpawner = system.actorOf(SimpleSupervisor.props(ClientSpawner.props(nodeParams.keyPair, nodeParams.socksProxy_opt, nodeParams.peerConnectionConf, switchboard, router), "client-spawner", SupervisorStrategy.Restart))
Expand All @@ -332,8 +329,7 @@ class Setup(val datadir: File,
channelsListener = channelsListener,
balanceActor = balanceActor,
postman = postman,
wallet = bitcoinClient,
swapRegister = swapRegister)
wallet = bitcoinClient)

zmqBlockTimeout = after(5 seconds, using = system.scheduler)(Future.failed(BitcoinZMQConnectionTimeoutException))
zmqTxTimeout = after(5 seconds, using = system.scheduler)(Future.failed(BitcoinZMQConnectionTimeoutException))
Expand Down Expand Up @@ -384,7 +380,7 @@ class Setup(val datadir: File,

object Setup {

final case class Seeds(nodeSeed: ByteVector, channelSeed: ByteVector, swapSeed: ByteVector)
final case class Seeds(nodeSeed: ByteVector, channelSeed: ByteVector)

}

Expand All @@ -401,8 +397,7 @@ case class Kit(nodeParams: NodeParams,
channelsListener: typed.ActorRef[ChannelsListener.Command],
balanceActor: typed.ActorRef[BalanceActor.Command],
postman: typed.ActorRef[Postman.Command],
wallet: OnChainWallet,
swapRegister: typed.ActorRef[SwapRegister.Command])
wallet: OnChainWallet)

object Kit {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1642,7 +1642,9 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, val
log.warning(s"processing local commit spent in catch-all handler")
spendLocalCurrent(d)

case Event(msg: HasSwapId, _) => send(msg)
// forward unknown messages that originate from loaded plugins
case Event(unknownMsg: UnknownMessage, _) if nodeParams.pluginMessageTags.contains(unknownMsg.tag) =>
send(unknownMsg)
stay()
}

Expand Down
5 changes: 0 additions & 5 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ trait Databases {
def peers: PeersDb
def payments: PaymentsDb
def pendingCommands: PendingCommandsDb
def swaps: SwapsDb
//@formatter:on
}

Expand All @@ -66,7 +65,6 @@ object Databases extends Logging {
peers: SqlitePeersDb,
payments: SqlitePaymentsDb,
pendingCommands: SqlitePendingCommandsDb,
swaps: SqliteSwapsDb,
private val backupConnection: Connection) extends Databases with FileBackup {
override def backup(backupFile: File): Unit = SqliteUtils.using(backupConnection.createStatement()) {
statement => {
Expand All @@ -85,7 +83,6 @@ object Databases extends Logging {
peers = new SqlitePeersDb(eclairJdbc),
payments = new SqlitePaymentsDb(eclairJdbc),
pendingCommands = new SqlitePendingCommandsDb(eclairJdbc),
swaps = new SqliteSwapsDb(eclairJdbc),
backupConnection = eclairJdbc
)
}
Expand All @@ -98,7 +95,6 @@ object Databases extends Logging {
payments: PgPaymentsDb,
pendingCommands: PgPendingCommandsDb,
dataSource: HikariDataSource,
swaps: PgSwapsDb,
lock: PgLock) extends Databases with ExclusiveLock {
override def obtainExclusiveLock(): Unit = lock.obtainExclusiveLock(dataSource)
}
Expand Down Expand Up @@ -158,7 +154,6 @@ object Databases extends Logging {
peers = new PgPeersDb,
payments = new PgPaymentsDb,
pendingCommands = new PgPendingCommandsDb,
swaps = new PgSwapsDb,
dataSource = ds,
lock = lock)

Expand Down
34 changes: 0 additions & 34 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import fr.acinq.eclair.db.DualDatabases.runAsync
import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
import fr.acinq.eclair.router.Router
import fr.acinq.eclair.swap.SwapData
import fr.acinq.eclair.swap.SwapEvents.SwapEvent
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAddress, NodeAnnouncement}
import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, RealShortChannelId, ShortChannelId, TimestampMilli}
import grizzled.slf4j.Logging
Expand Down Expand Up @@ -41,8 +39,6 @@ case class DualDatabases(primary: Databases, secondary: Databases) extends Datab

override val pendingCommands: PendingCommandsDb = DualPendingCommandsDb(primary.pendingCommands, secondary.pendingCommands)

override val swaps: SwapsDb = DualSwapsDb(primary.swaps, secondary.swaps)

/** if one of the database supports file backup, we use it */
override def backup(backupFile: File): Unit = (primary, secondary) match {
case (f: FileBackup, _) => f.backup(backupFile)
Expand Down Expand Up @@ -392,33 +388,3 @@ case class DualPendingCommandsDb(primary: PendingCommandsDb, secondary: PendingC
primary.listSettlementCommands()
}
}

case class DualSwapsDb(primary: SwapsDb, secondary: SwapsDb) extends SwapsDb {

private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-pending-commands").build()))

override def add(swapData: SwapData): Unit = {
runAsync(secondary.add(swapData))
primary.add(swapData)
}

override def addResult(swapEvent: SwapEvent): Unit = {
runAsync(secondary.addResult(swapEvent))
primary.addResult(swapEvent)
}

override def remove(swapId: String): Unit = {
runAsync(secondary.remove(swapId))
primary.remove(swapId)
}

override def restore(): Seq[SwapData] = {
runAsync(secondary.restore())
primary.restore()
}

override def list(): Seq[SwapData] = {
runAsync(secondary.list())
primary.list()
}
}
13 changes: 4 additions & 9 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ import fr.acinq.eclair.io.PeerConnection.KillReason
import fr.acinq.eclair.io.Switchboard.RelayMessage
import fr.acinq.eclair.message.OnionMessages
import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
import fr.acinq.eclair.swap.SwapRegister
import fr.acinq.eclair.swap.SwapRegister.MessageReceived
import fr.acinq.eclair.wire.protocol
import fr.acinq.eclair.wire.protocol.{Error, HasChannelId, HasSwapId, HasTemporaryChannelId, LightningMessage, NodeAddress, OnionMessage, RoutingMessage, UnknownMessage, Warning}
import fr.acinq.eclair.wire.protocol.{Error, HasChannelId, HasTemporaryChannelId, LightningMessage, NodeAddress, OnionMessage, RoutingMessage, UnknownMessage, Warning}
import scodec.bits.ByteVector

import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -57,7 +55,7 @@ import scala.util.{Failure, Success}
*
* Created by PM on 26/08/2016.
*/
class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainAddressGenerator, channelFactory: Peer.ChannelFactory, switchboard: ActorRef, swapRegister: typed.ActorRef[SwapRegister.Command]) extends FSMDiagnosticActorLogging[Peer.State, Peer.Data] {
class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainAddressGenerator, channelFactory: Peer.ChannelFactory, switchboard: ActorRef) extends FSMDiagnosticActorLogging[Peer.State, Peer.Data] {

import Peer._

Expand Down Expand Up @@ -300,10 +298,7 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainA
replyTo_opt.foreach(_ ! MessageRelay.Sent(messageId))
stay()

case Event(message: HasSwapId, d: ConnectedData) =>
swapRegister ! MessageReceived(message)
stay()

// TODO: plugin actors should register to receive messages with certain tags
case Event(unknownMsg: UnknownMessage, d: ConnectedData) if nodeParams.pluginMessageTags.contains(unknownMsg.tag) =>
context.system.eventStream.publish(UnknownMessageReceived(self, remoteNodeId, unknownMsg, d.connectionInfo))
stay()
Expand Down Expand Up @@ -494,7 +489,7 @@ object Peer {
context.actorOf(Channel.props(nodeParams, wallet, remoteNodeId, watcher, relayer, txPublisherFactory, origin_opt))
}

def props(nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainAddressGenerator, channelFactory: ChannelFactory, switchboard: ActorRef, swapRegister: typed.ActorRef[SwapRegister.Command]): Props = Props(new Peer(nodeParams, remoteNodeId, wallet, channelFactory, switchboard, swapRegister))
def props(nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainAddressGenerator, channelFactory: ChannelFactory, switchboard: ActorRef): Props = Props(new Peer(nodeParams, remoteNodeId, wallet, channelFactory, switchboard))

// @formatter:off

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ import fr.acinq.eclair.io.MessageRelay.RelayPolicy
import fr.acinq.eclair.io.Peer.PeerInfoResponse
import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
import fr.acinq.eclair.router.Router.RouterConf
import fr.acinq.eclair.swap.SwapRegister
import fr.acinq.eclair.wire.protocol.OnionMessage
import fr.acinq.eclair.{SubscriptionsComplete, NodeParams}
import fr.acinq.eclair.{NodeParams, SubscriptionsComplete}

/**
* Ties network connections to peers.
Expand Down Expand Up @@ -153,9 +152,9 @@ object Switchboard {
def spawn(context: ActorContext, remoteNodeId: PublicKey): ActorRef
}

case class SimplePeerFactory(nodeParams: NodeParams, wallet: OnChainAddressGenerator, channelFactory: Peer.ChannelFactory, swapRegister: typed.ActorRef[SwapRegister.Command]) extends PeerFactory {
case class SimplePeerFactory(nodeParams: NodeParams, wallet: OnChainAddressGenerator, channelFactory: Peer.ChannelFactory) extends PeerFactory {
override def spawn(context: ActorContext, remoteNodeId: PublicKey): ActorRef =
context.actorOf(Peer.props(nodeParams, remoteNodeId, wallet, channelFactory, context.self, swapRegister), name = peerActorName(remoteNodeId))
context.actorOf(Peer.props(nodeParams, remoteNodeId, wallet, channelFactory, context.self), name = peerActorName(remoteNodeId))
}

def props(nodeParams: NodeParams, peerFactory: PeerFactory) = Props(new Switchboard(nodeParams, peerFactory))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

package fr.acinq.eclair.transactions

import fr.acinq.bitcoin.ScriptFlags
import fr.acinq.bitcoin.SigHash._
import fr.acinq.bitcoin.SigVersion._
import fr.acinq.bitcoin.scalacompat.Crypto.{PrivateKey, PublicKey, ripemd160}
import fr.acinq.bitcoin.scalacompat.Script._
import fr.acinq.bitcoin.scalacompat._
import fr.acinq.bitcoin.SigHash._
import fr.acinq.bitcoin.SigVersion._
import fr.acinq.bitcoin.ScriptFlags
import fr.acinq.eclair._
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.transactions.CommitmentOutput._
Expand Down Expand Up @@ -100,7 +100,7 @@ object Transactions {
case object Remote extends TxOwner
}

sealed trait TransactionWithInputInfo {
trait TransactionWithInputInfo {
def input: InputInfo
def desc: String
def tx: Transaction
Expand Down Expand Up @@ -163,10 +163,6 @@ object Transactions {
sealed trait TxGenerationSkipped
case object OutputNotFound extends TxGenerationSkipped { override def toString = "output not found (probably trimmed)" }
case object AmountBelowDustLimit extends TxGenerationSkipped { override def toString = "amount is below dust limit" }

case class SwapClaimByInvoiceTx(input: InputInfo, tx: Transaction) extends TransactionWithInputInfo { override def desc: String = "swap-claimbyinvoice-tx" }
case class SwapClaimByCoopTx(input: InputInfo, tx: Transaction) extends TransactionWithInputInfo { override def desc: String = "swap-claimbycoop-tx" }
case class SwapClaimByCsvTx(input: InputInfo, tx: Transaction) extends TransactionWithInputInfo { override def desc: String = "swap-claimbycsv-tx" }
// @formatter:on

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package fr.acinq.eclair.wire.protocol
import fr.acinq.bitcoin.scalacompat.ScriptWitness
import fr.acinq.eclair.wire.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.wire.protocol.CommonCodecs._
import fr.acinq.eclair.wire.protocol.PeerSwapMessageCodecs._
import fr.acinq.eclair.{Feature, Features, InitFeature, KamonExt}
import scodec.bits.{BitVector, ByteVector, HexStringSyntax}
import scodec.codecs._
Expand Down Expand Up @@ -464,14 +463,7 @@ object LightningMessageCodecs {
.typecase(264, replyChannelRangeCodec)
.typecase(265, gossipTimestampFilterCodec)
.typecase(513, onionMessageCodec)
// TODO: move PeerSwap message handling to a plugin
.typecase(42069, swapInRequestCodec)
.typecase(42071, swapOutRequestCodec)
.typecase(42073, swapInAgreementCodec)
.typecase(42075, swapOutAgreementCodec)
.typecase(42077, openingTxBroadcastedCodec)
.typecase(42079, canceledCodec)
.typecase(42081, coopCloseCodec)

// NB: blank lines to minimize merge conflicts

//
Expand Down
Loading

0 comments on commit 32c2a96

Please sign in to comment.