Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add router API for invoice routing hints #1590

Merged
merged 3 commits into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import fr.acinq.eclair.payment.receive.MultiPartHandler.ReceivePayment
import fr.acinq.eclair.payment.relay.Relayer.{GetOutgoingChannels, OutgoingChannels, UsableBalance}
import fr.acinq.eclair.payment.send.PaymentInitiator.{SendPaymentRequest, SendPaymentToRouteRequest, SendPaymentToRouteResponse}
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.router.{NetworkStats, RouteCalculation}
import fr.acinq.eclair.router.{NetworkStats, RouteCalculation, Router}
import fr.acinq.eclair.wire._
import scodec.bits.ByteVector

Expand Down Expand Up @@ -195,7 +195,7 @@ class EclairImpl(appKit: Kit) extends Eclair {
} yield peerinfos

override def nodes(nodeIds_opt: Option[Set[PublicKey]])(implicit timeout: Timeout): Future[Iterable[NodeAnnouncement]] = {
(appKit.router ? Symbol("nodes"))
(appKit.router ? Router.GetNodes)
.mapTo[Iterable[NodeAnnouncement]]
.map(_.filter(n => nodeIds_opt.forall(_.contains(n.nodeId))))
}
Expand All @@ -216,12 +216,12 @@ class EclairImpl(appKit: Kit) extends Eclair {
}

override def allChannels()(implicit timeout: Timeout): Future[Iterable[ChannelDesc]] = {
(appKit.router ? Symbol("channels")).mapTo[Iterable[ChannelAnnouncement]].map(_.map(c => ChannelDesc(c.shortChannelId, c.nodeId1, c.nodeId2)))
(appKit.router ? Router.GetChannels).mapTo[Iterable[ChannelAnnouncement]].map(_.map(c => ChannelDesc(c.shortChannelId, c.nodeId1, c.nodeId2)))
}

override def allUpdates(nodeId_opt: Option[PublicKey])(implicit timeout: Timeout): Future[Iterable[ChannelUpdate]] = nodeId_opt match {
case None => (appKit.router ? Symbol("updates")).mapTo[Iterable[ChannelUpdate]]
case Some(pk) => (appKit.router ? Symbol("channelsMap")).mapTo[Map[ShortChannelId, PublicChannel]].map { channels =>
case None => (appKit.router ? Router.GetChannelUpdates).mapTo[Iterable[ChannelUpdate]]
case Some(pk) => (appKit.router ? Router.GetChannelsMap).mapTo[Map[ShortChannelId, PublicChannel]].map { channels =>
channels.values.flatMap {
case PublicChannel(ann, _, _, Some(u1), _, _) if ann.nodeId1 == pk && u1.isNode1 => List(u1)
case PublicChannel(ann, _, _, _, Some(u2), _) if ann.nodeId2 == pk && !u2.isNode1 => List(u2)
Expand Down
52 changes: 45 additions & 7 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,19 +162,25 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
log.info("reinstating shortChannelId={} from nodeId={}", shortChannelId, nodeId)
stay using d.copy(excludedChannels = d.excludedChannels - desc)

case Event(Symbol("nodes"), d) =>
case Event(GetNodes, d) =>
sender ! d.nodes.values
stay

case Event(Symbol("channels"), d) =>
case Event(GetLocalChannels, d) =>
val scids = d.graph.getIncomingEdgesOf(nodeParams.nodeId).map(_.desc.shortChannelId)
val localChannels = scids.flatMap(scid => d.channels.get(scid).orElse(d.privateChannels.get(scid))).map(c => LocalChannel(nodeParams.nodeId, c))
sender ! localChannels
stay

case Event(GetChannels, d) =>
sender ! d.channels.values.map(_.ann)
stay

case Event(Symbol("channelsMap"), d) =>
case Event(GetChannelsMap, d) =>
sender ! d.channels
stay

case Event(Symbol("updates"), d) =>
case Event(GetChannelUpdates, d) =>
val updates: Iterable[ChannelUpdate] = d.channels.values.flatMap(d => d.update_1_opt ++ d.update_2_opt) ++ d.privateChannels.values.flatMap(d => d.update_1_opt ++ d.update_2_opt)
sender ! updates
stay
Expand Down Expand Up @@ -286,7 +292,16 @@ object Router {
// @formatter:off
case class ChannelDesc(shortChannelId: ShortChannelId, a: PublicKey, b: PublicKey)
case class ChannelMeta(balance1: MilliSatoshi, balance2: MilliSatoshi)
case class PublicChannel(ann: ChannelAnnouncement, fundingTxid: ByteVector32, capacity: Satoshi, update_1_opt: Option[ChannelUpdate], update_2_opt: Option[ChannelUpdate], meta_opt: Option[ChannelMeta]) {
sealed trait ChannelDetails {
val capacity: Satoshi
def getNodeIdSameSideAs(u: ChannelUpdate): PublicKey
def getChannelUpdateSameSideAs(u: ChannelUpdate): Option[ChannelUpdate]
def getBalanceSameSideAs(u: ChannelUpdate): Option[MilliSatoshi]
def updateChannelUpdateSameSideAs(u: ChannelUpdate): ChannelDetails
def updateBalances(commitments: AbstractCommitments): ChannelDetails
def applyChannelUpdate(update: Either[LocalChannelUpdate, RemoteChannelUpdate]): ChannelDetails
}
case class PublicChannel(ann: ChannelAnnouncement, fundingTxid: ByteVector32, capacity: Satoshi, update_1_opt: Option[ChannelUpdate], update_2_opt: Option[ChannelUpdate], meta_opt: Option[ChannelMeta]) extends ChannelDetails {
update_1_opt.foreach(u => assert(Announcements.isNode1(u.channelFlags)))
update_2_opt.foreach(u => assert(!Announcements.isNode1(u.channelFlags)))

Expand All @@ -304,7 +319,7 @@ object Router {
case Right(rcu) => updateChannelUpdateSameSideAs(rcu.channelUpdate)
}
}
case class PrivateChannel(localNodeId: PublicKey, remoteNodeId: PublicKey, update_1_opt: Option[ChannelUpdate], update_2_opt: Option[ChannelUpdate], meta: ChannelMeta) {
case class PrivateChannel(localNodeId: PublicKey, remoteNodeId: PublicKey, update_1_opt: Option[ChannelUpdate], update_2_opt: Option[ChannelUpdate], meta: ChannelMeta) extends ChannelDetails {
val (nodeId1, nodeId2) = if (Announcements.isNode1(localNodeId, remoteNodeId)) (localNodeId, remoteNodeId) else (remoteNodeId, localNodeId)
val capacity: Satoshi = (meta.balance1 + meta.balance2).truncateToSatoshi

Expand All @@ -322,6 +337,24 @@ object Router {
case Right(rcu) => updateChannelUpdateSameSideAs(rcu.channelUpdate)
}
}
case class LocalChannel(localNodeId: PublicKey, channel: ChannelDetails) {
val isPrivate: Boolean = channel match {
case _: PrivateChannel => true
case _ => false
}
val capacity: Satoshi = channel.capacity
val remoteNodeId: PublicKey = channel match {
case c: PrivateChannel => c.remoteNodeId
case c: PublicChannel => if (c.ann.nodeId1 == localNodeId) c.ann.nodeId2 else c.ann.nodeId1
}
/** Our remote peer's channel_update: this is what must be used in invoice routing hints. */
val remoteUpdate: Option[ChannelUpdate] = channel match {
case c: PrivateChannel => if (remoteNodeId == c.nodeId1) c.update_1_opt else c.update_2_opt
case c: PublicChannel => if (remoteNodeId == c.ann.nodeId1) c.update_1_opt else c.update_2_opt
}
/** Create an invoice routing hint from that channel. Note that if the channel is private, the invoice will leak its existence. */
def toExtraHop: Option[ExtraHop] = remoteUpdate.map(u => ExtraHop(remoteNodeId, u.shortChannelId, u.feeBaseMsat, u.feeProportionalMillionths, u.cltvExpiryDelta))
}
// @formatter:on

case class AssistedChannel(extraHop: ExtraHop, nextNodeId: PublicKey, htlcMaximum: MilliSatoshi)
Expand Down Expand Up @@ -466,6 +499,11 @@ object Router {
case object GetRoutingStateStreaming
case object RoutingStateStreamingUpToDate
case object GetRouterData
case object GetNodes
case object GetLocalChannels
case object GetChannels
case object GetChannelsMap
case object GetChannelUpdates
t-bast marked this conversation as resolved.
Show resolved Hide resolved
// @formatter:on

// @formatter:off
Expand Down Expand Up @@ -508,7 +546,7 @@ object Router {
stash: Stash,
rebroadcast: Rebroadcast,
awaiting: Map[ChannelAnnouncement, Seq[RemoteGossip]], // note: this is a seq because we want to preserve order: first actor is the one who we need to send a tcp-ack when validation is done
privateChannels: Map[ShortChannelId, PrivateChannel], // short_channel_id -> node_id
privateChannels: Map[ShortChannelId, PrivateChannel],
excludedChannels: Set[ChannelDesc], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure
graph: DirectedGraph,
sync: Map[PublicKey, Syncing] // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class EclairImplSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with I

{
val fRes = eclair.nodes()
router.expectMsg(Symbol("nodes"))
router.expectMsg(Router.GetNodes)
router.reply(allNodes)
awaitCond(fRes.value match {
case Some(Success(nodes)) =>
Expand All @@ -179,7 +179,7 @@ class EclairImplSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with I
}
{
val fRes = eclair.nodes(Some(Set(remoteNodeAnn1.nodeId, remoteNodeAnn2.nodeId)))
router.expectMsg(Symbol("nodes"))
router.expectMsg(Router.GetNodes)
router.reply(allNodes)
awaitCond(fRes.value match {
case Some(Success(nodes)) =>
Expand All @@ -190,7 +190,7 @@ class EclairImplSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with I
}
{
val fRes = eclair.nodes(Some(Set(randomKey.publicKey)))
router.expectMsg(Symbol("nodes"))
router.expectMsg(Router.GetNodes)
router.reply(allNodes)
awaitCond(fRes.value match {
case Some(Success(nodes)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.receive.MultiPartHandler.ReceivePayment
import fr.acinq.eclair.payment.receive.{ForwardHandler, PaymentHandler}
import fr.acinq.eclair.payment.send.PaymentInitiator.SendPaymentRequest
import fr.acinq.eclair.router.Router
import fr.acinq.eclair.transactions.{Scripts, Transactions}
import fr.acinq.eclair.wire.{ChannelAnnouncement, ChannelUpdate, PermanentChannelFailure, UpdateAddHtlc}
import fr.acinq.eclair.{LongToBtcAmount, MilliSatoshi, randomBytes32}
Expand All @@ -53,11 +54,11 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec {
def awaitAnnouncements(channels: Int): Unit = {
val sender = TestProbe()
awaitCond({
sender.send(nodes("A").router, Symbol("channels"))
sender.send(nodes("A").router, Router.GetChannels)
sender.expectMsgType[Iterable[ChannelAnnouncement]].size == channels
}, max = 60 seconds, interval = 1 second)
awaitCond({
sender.send(nodes("A").router, Symbol("updates"))
sender.send(nodes("A").router, Router.GetChannelUpdates)
sender.expectMsgType[Iterable[ChannelUpdate]].size == 2 * channels
}, max = 60 seconds, interval = 1 second)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,15 @@ class PaymentIntegrationSpec extends IntegrationSpec {
subset.foreach {
case (_, setup) =>
awaitCond({
sender.send(setup.router, Symbol("nodes"))
sender.send(setup.router, Router.GetNodes)
sender.expectMsgType[Iterable[NodeAnnouncement]].size == nodes
}, max = 60 seconds, interval = 1 second)
awaitCond({
sender.send(setup.router, Symbol("channels"))
sender.send(setup.router, Router.GetChannels)
sender.expectMsgType[Iterable[ChannelAnnouncement]].size == channels
}, max = 60 seconds, interval = 1 second)
awaitCond({
sender.send(setup.router, Symbol("updates"))
sender.send(setup.router, Router.GetChannelUpdates)
sender.expectMsgType[Iterable[ChannelUpdate]].size == updates
}, max = 60 seconds, interval = 1 second)
}
Expand Down Expand Up @@ -168,7 +168,7 @@ class PaymentIntegrationSpec extends IntegrationSpec {
val sender = TestProbe()
// to simulate this, we will update B's relay params
// first we find out the short channel id for channel B-C
sender.send(nodes("B").router, Symbol("channels"))
sender.send(nodes("B").router, Router.GetChannels)
val shortIdBC = sender.expectMsgType[Iterable[ChannelAnnouncement]].find(c => Set(c.nodeId1, c.nodeId2) == Set(nodes("B").nodeParams.nodeId, nodes("C").nodeParams.nodeId)).get.shortChannelId
// we also need the full commitment
nodes("B").register ! Register.ForwardShortId(sender.ref, shortIdBC, CMD_GETINFO(ActorRef.noSender))
Expand All @@ -193,7 +193,7 @@ class PaymentIntegrationSpec extends IntegrationSpec {

awaitCond({
// in the meantime, the router will have updated its state
sender.send(nodes("A").router, Symbol("channelsMap"))
sender.send(nodes("A").router, Router.GetChannelsMap)
// we then put everything back like before by asking B to refresh its channel update (this will override the one we created)
val u_opt = updateFor(nodes("B").nodeParams.nodeId, sender.expectMsgType[Map[ShortChannelId, PublicChannel]](10 seconds).apply(channelUpdateBC.shortChannelId))
u_opt.contains(channelUpdateBC)
Expand All @@ -209,7 +209,7 @@ class PaymentIntegrationSpec extends IntegrationSpec {
assert(channelUpdateBC_new.timestamp > channelUpdateBC.timestamp)
assert(channelUpdateBC_new.cltvExpiryDelta == nodes("B").nodeParams.expiryDelta)
awaitCond({
sender.send(nodes("A").router, Symbol("channelsMap"))
sender.send(nodes("A").router, Router.GetChannelsMap)
val u = updateFor(nodes("B").nodeParams.nodeId, sender.expectMsgType[Map[ShortChannelId, PublicChannel]].apply(channelUpdateBC.shortChannelId)).get
u.cltvExpiryDelta == nodes("B").nodeParams.expiryDelta
}, max = 30 seconds, interval = 1 second)
Expand Down Expand Up @@ -660,7 +660,7 @@ class PaymentIntegrationSpec extends IntegrationSpec {
sender.expectMsg(GossipDecision.Accepted(ann))
}
awaitCond({
sender.send(nodes("D").router, Symbol("channels"))
sender.send(nodes("D").router, Router.GetChannels)
sender.expectMsgType[Iterable[ChannelAnnouncement]].size == channels.size + 8 // 8 original channels (A -> B is private)
}, max = 120 seconds, interval = 1 second)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import fr.acinq.eclair.crypto.keymanager.{LocalChannelKeyManager, LocalNodeKeyMa
import fr.acinq.eclair.io.Peer.PeerRoutingMessage
import fr.acinq.eclair.router.Announcements._
import fr.acinq.eclair.router.BaseRouterSpec.channelAnnouncement
import fr.acinq.eclair.router.Router.{ChannelDesc, ChannelMeta, GossipDecision, PrivateChannel}
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.transactions.Scripts
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{TestKitBaseClass, randomKey, _}
Expand Down Expand Up @@ -194,11 +194,11 @@ abstract class BaseRouterSpec extends TestKitBaseClass with FixtureAnyFunSuiteLi
GossipDecision.Accepted(node_h))
peerConnection.expectNoMsg()
awaitCond({
sender.send(router, Symbol("nodes"))
sender.send(router, GetNodes)
val nodes = sender.expectMsgType[Iterable[NodeAnnouncement]]
sender.send(router, Symbol("channels"))
sender.send(router, GetChannels)
val channels = sender.expectMsgType[Iterable[ChannelAnnouncement]]
sender.send(router, Symbol("updates"))
sender.send(router, GetChannelUpdates)
val updates = sender.expectMsgType[Iterable[ChannelUpdate]]
nodes.size === 8 && channels.size === 5 && updates.size === 11
}, max = 10 seconds, interval = 1 second)
Expand Down
18 changes: 18 additions & 0 deletions eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,24 @@ class RouterSpec extends BaseRouterSpec {
state.channels.foreach(c => assert(c.capacity === publicChannelCapacity))
}

test("send local channels") { fixture =>
import fixture._
// We need a channel update from our private remote peer, otherwise we can't create invoice routing information.
val peerConnection = TestProbe()
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, g, update_ga))
val sender = TestProbe()
sender.send(router, GetLocalChannels)
val localChannels = sender.expectMsgType[Seq[LocalChannel]]
assert(localChannels.size === 2)
assert(localChannels.map(_.remoteNodeId).toSet === Set(b, g))
assert(localChannels.exists(_.isPrivate)) // a ---> g
assert(localChannels.exists(!_.isPrivate)) // a ---> b
assert(localChannels.flatMap(_.toExtraHop).toSet === Set(
ExtraHop(b, channelId_ab, update_ba.feeBaseMsat, update_ba.feeProportionalMillionths, update_ba.cltvExpiryDelta),
ExtraHop(g, channelId_ag, update_ga.feeBaseMsat, update_ga.feeProportionalMillionths, update_ga.cltvExpiryDelta)
))
}

test("send network statistics") { fixture =>
import fixture._
val sender = TestProbe()
Expand Down