Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made sync params configurable #1124

Merged
merged 4 commits into from
Sep 5, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ eclair {
sync {
request-node-announcements = true // if true we will ask for node announcements when we receive channel ids that we don't know
encoding-type = zlib // encoding for short_channel_ids and timestamps in query channel sync messages; other possible value is "uncompressed"
channel-range-chunk-size = 2500 // do not change this unless you know what you are doing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not also keep the previous message? I think it was useful

pm47 marked this conversation as resolved.
Show resolved Hide resolved
pm47 marked this conversation as resolved.
Show resolved Hide resolved
channel-query-chunk-size = 100 // do not change this unless you know what you are doing
pm47 marked this conversation as resolved.
Show resolved Hide resolved
}

// the values below will be used to perform route searching
Expand Down
2 changes: 2 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ object NodeParams {
randomizeRouteSelection = config.getBoolean("router.randomize-route-selection"),
requestNodeAnnouncements = config.getBoolean("router.sync.request-node-announcements"),
encodingType = routerSyncEncodingType,
channelRangeChunkSize = config.getInt("router.sync.channel-range-chunk-size"),
t-bast marked this conversation as resolved.
Show resolved Hide resolved
channelQueryChunkSize = config.getInt("router.sync.channel-query-chunk-size"),
searchMaxRouteLength = config.getInt("router.path-finding.max-route-length"),
searchMaxCltv = CltvExpiryDelta(config.getInt("router.path-finding.max-cltv")),
searchMaxFeeBase = Satoshi(config.getLong("router.path-finding.fee-threshold-sat")),
Expand Down
14 changes: 6 additions & 8 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ case class RouterConf(randomizeRouteSelection: Boolean,
routerBroadcastInterval: FiniteDuration,
requestNodeAnnouncements: Boolean,
encodingType: EncodingType,
channelRangeChunkSize: Int,
channelQueryChunkSize: Int,
searchMaxFeeBase: Satoshi,
searchMaxFeePct: Double,
searchMaxRouteLength: Int,
Expand Down Expand Up @@ -163,8 +165,6 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
setTimer(TickBroadcast.toString, TickBroadcast, nodeParams.routerConf.routerBroadcastInterval, repeat = true)
setTimer(TickPruneStaleChannels.toString, TickPruneStaleChannels, 1 hour, repeat = true)

val SHORTID_WINDOW = 100

val defaultRouteParams = getDefaultRouteParams(nodeParams.routerConf)

val db = nodeParams.db.network
Expand Down Expand Up @@ -541,7 +541,7 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
// keep channel ids that are in [firstBlockNum, firstBlockNum + numberOfBlocks]
val shortChannelIds: SortedSet[ShortChannelId] = d.channels.keySet.filter(keep(firstBlockNum, numberOfBlocks, _))
log.info("replying with {} items for range=({}, {})", shortChannelIds.size, firstBlockNum, numberOfBlocks)
split(shortChannelIds)
split(shortChannelIds, nodeParams.routerConf.channelRangeChunkSize)
.foreach(chunk => {
val (timestamps, checksums) = routingMessage.queryFlags_opt match {
case Some(extension) if extension.wantChecksums | extension.wantTimestamps =>
Expand Down Expand Up @@ -589,7 +589,7 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
log.info(s"received reply_channel_range with {} channels, we're missing {} channel announcements and {} updates, format={}", shortChannelIds.array.size, channelCount, updatesCount, shortChannelIds.encoding)
// we update our sync data to this node (there may be multiple channel range responses and we can only query one set of ids at a time)
val replies = shortChannelIdAndFlags
.grouped(SHORTID_WINDOW)
.grouped(nodeParams.routerConf.channelQueryChunkSize)
.map(chunk => QueryShortChannelIds(chainHash,
shortChannelIds = EncodedShortChannelIds(shortChannelIds.encoding, chunk.map(_.shortChannelId)),
if (routingMessage.timestamps_opt.isDefined || routingMessage.checksums_opt.isDefined)
Expand Down Expand Up @@ -811,7 +811,6 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
}

object Router {
val SHORTID_WINDOW = 100

def props(nodeParams: NodeParams, watcher: ActorRef, initialized: Option[Promise[Done]] = None) = Props(new Router(nodeParams, watcher, initialized))

Expand Down Expand Up @@ -1070,15 +1069,14 @@ object Router {
* @param shortChannelIds
* @return
*/
def split(shortChannelIds: SortedSet[ShortChannelId]): List[ShortChannelIdsChunk] = {
def split(shortChannelIds: SortedSet[ShortChannelId], channelRangeChunkSize: Int): List[ShortChannelIdsChunk] = {
// this algorithm can split blocks (meaning that we can in theory generate several replies with the same first_block/num_blocks
// and a different set of short_channel_ids) but it doesn't matter
val SPLIT_SIZE = 3500 // we can theoretically fit 4091 uncompressed channel ids in a single lightning message (max size 65 Kb)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sstone pretty sure this calculation was wrong, can you reproduce it?

if (shortChannelIds.isEmpty) {
List(ShortChannelIdsChunk(0, 0, List.empty))
} else {
shortChannelIds
.grouped(SPLIT_SIZE)
.grouped(channelRangeChunkSize)
.toList
.map { group =>
// NB: group is never empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ object ReplyChannelRangeTlv {
case class EncodedChecksums(checksums: List[Checksums]) extends ReplyChannelRangeTlv

val timestampsCodec: Codec[Timestamps] = (
("checksum1" | uint32) ::
("checksum2" | uint32)
("timestamp1" | uint32) ::
("timestamp2" | uint32)
).as[Timestamps]

val encodedTimestampsCodec: Codec[EncodedTimestamps] = variableSizeBytesLong(varintoverflow,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ object TestConstants {
routerBroadcastInterval = 5 seconds,
requestNodeAnnouncements = true,
encodingType = EncodingType.COMPRESSED_ZLIB,
channelRangeChunkSize = 20,
channelQueryChunkSize = 5,
searchMaxFeeBase = 21 sat,
searchMaxFeePct = 0.03,
searchMaxCltv = CltvExpiryDelta(2016),
Expand Down Expand Up @@ -182,6 +184,8 @@ object TestConstants {
routerBroadcastInterval = 5 seconds,
requestNodeAnnouncements = true,
encodingType = EncodingType.UNCOMPRESSED,
channelRangeChunkSize = 20,
channelQueryChunkSize = 5,
searchMaxFeeBase = 21 sat,
searchMaxFeePct = 0.03,
searchMaxCltv = CltvExpiryDelta(2016),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike {

val fakeRoutingInfo: TreeMap[ShortChannelId, (PublicChannel, NodeAnnouncement, NodeAnnouncement)] = RoutingSyncSpec
.shortChannelIds
.take(4567)
.take(60)
.foldLeft(TreeMap.empty[ShortChannelId, (PublicChannel, NodeAnnouncement, NodeAnnouncement)]) {
case (m, shortChannelId) => m + (shortChannelId -> makeFakeRoutingInfo(shortChannelId))
}
Expand Down Expand Up @@ -137,25 +137,25 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike {
awaitCond(alice.stateData.nodes === bob.stateData.nodes)

// add some channels and updates to bob and resync
fakeRoutingInfo.take(40).values.foreach {
fakeRoutingInfo.take(10).values.foreach {
case (pc, na1, na2) =>
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.ann))
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_1_opt.get))
// we don't send channel_update #2
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na1))
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2))
}
awaitCond(bob.stateData.channels.size === 40 && countUpdates(bob.stateData.channels) === 40)
assert(BasicSyncResult(ranges = 1, queries = 1, channels = 40, updates = 40, nodes = 80) === sync(alice, bob, extendedQueryFlags_opt).counts)
awaitCond(bob.stateData.channels.size === 10 && countUpdates(bob.stateData.channels) === 10)
assert(BasicSyncResult(ranges = 1, queries = 2, channels = 10, updates = 10, nodes = 10 * 2) === sync(alice, bob, extendedQueryFlags_opt).counts)
awaitCond(alice.stateData.channels === bob.stateData.channels)

// add some updates to bob and resync
fakeRoutingInfo.take(40).values.foreach {
fakeRoutingInfo.take(10).values.foreach {
case (pc, na1, na2) =>
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_2_opt.get))
}
awaitCond(bob.stateData.channels.size === 40 && countUpdates(bob.stateData.channels) === 80)
assert(BasicSyncResult(ranges = 1, queries = 1, channels = 40, updates = 80, nodes = 80) === sync(alice, bob, extendedQueryFlags_opt).counts)
awaitCond(bob.stateData.channels.size === 10 && countUpdates(bob.stateData.channels) === 10 * 2)
assert(BasicSyncResult(ranges = 1, queries = 2, channels = 10, updates = 10 * 2, nodes = 10 * 2) === sync(alice, bob, extendedQueryFlags_opt).counts)
awaitCond(alice.stateData.channels === bob.stateData.channels)

// add everything (duplicates will be ignored)
Expand All @@ -168,7 +168,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike {
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2))
}
awaitCond(bob.stateData.channels.size === fakeRoutingInfo.size && countUpdates(bob.stateData.channels) === 2 * fakeRoutingInfo.size, max = 60 seconds)
assert(BasicSyncResult(ranges = 2, queries = 46, channels = fakeRoutingInfo.size, updates = 2 * fakeRoutingInfo.size, nodes = 2 * fakeRoutingInfo.size) === sync(alice, bob, extendedQueryFlags_opt).counts)
assert(BasicSyncResult(ranges = 3, queries = 12, channels = fakeRoutingInfo.size, updates = 2 * fakeRoutingInfo.size, nodes = 2 * fakeRoutingInfo.size) === sync(alice, bob, extendedQueryFlags_opt).counts)
awaitCond(alice.stateData.channels === bob.stateData.channels, max = 60 seconds)
}

Expand All @@ -185,26 +185,26 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike {
awaitCond(alice.stateData.channels === bob.stateData.channels)

// add some channels and updates to bob and resync
fakeRoutingInfo.take(40).values.foreach {
fakeRoutingInfo.take(10).values.foreach {
case (pc, na1, na2) =>
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.ann))
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_1_opt.get))
// we don't send channel_update #2
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na1))
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2))
}
awaitCond(bob.stateData.channels.size === 40 && countUpdates(bob.stateData.channels) === 40)
assert(BasicSyncResult(ranges = 1, queries = 1, channels = 40, updates = 40, nodes = if (requestNodeAnnouncements) 80 else 0) === sync(alice, bob, extendedQueryFlags_opt).counts)
awaitCond(bob.stateData.channels.size === 10 && countUpdates(bob.stateData.channels) === 10)
assert(BasicSyncResult(ranges = 1, queries = 2, channels = 10, updates = 10, nodes = if (requestNodeAnnouncements) 10 * 2 else 0) === sync(alice, bob, extendedQueryFlags_opt).counts)
awaitCond(alice.stateData.channels === bob.stateData.channels, max = 60 seconds)
if (requestNodeAnnouncements) awaitCond(alice.stateData.nodes === bob.stateData.nodes)

// add some updates to bob and resync
fakeRoutingInfo.take(40).values.foreach {
fakeRoutingInfo.take(10).values.foreach {
case (pc, na1, na2) =>
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, pc.update_2_opt.get))
}
awaitCond(bob.stateData.channels.size === 40 && countUpdates(bob.stateData.channels) === 80)
assert(BasicSyncResult(ranges = 1, queries = 1, channels = 0, updates = 40, nodes = if (requestNodeAnnouncements) 80 else 0) === sync(alice, bob, extendedQueryFlags_opt).counts)
awaitCond(bob.stateData.channels.size === 10 && countUpdates(bob.stateData.channels) === 10 * 2)
assert(BasicSyncResult(ranges = 1, queries = 2, channels = 0, updates = 10, nodes = if (requestNodeAnnouncements) 10 * 2 else 0) === sync(alice, bob, extendedQueryFlags_opt).counts)
awaitCond(alice.stateData.channels === bob.stateData.channels, max = 60 seconds)

// add everything (duplicates will be ignored)
Expand All @@ -217,7 +217,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike {
sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, na2))
}
awaitCond(bob.stateData.channels.size === fakeRoutingInfo.size && countUpdates(bob.stateData.channels) === 2 * fakeRoutingInfo.size, max = 60 seconds)
assert(BasicSyncResult(ranges = 2, queries = 46, channels = fakeRoutingInfo.size - 40, updates = 2 * (fakeRoutingInfo.size - 40), nodes = if (requestNodeAnnouncements) 2 * (fakeRoutingInfo.size - 40) else 0) === sync(alice, bob, extendedQueryFlags_opt).counts)
assert(BasicSyncResult(ranges = 3, queries = 10, channels = fakeRoutingInfo.size - 10, updates = 2 * (fakeRoutingInfo.size - 10), nodes = if (requestNodeAnnouncements) 2 * (fakeRoutingInfo.size - 10) else 0) === sync(alice, bob, extendedQueryFlags_opt).counts)
awaitCond(alice.stateData.channels === bob.stateData.channels, max = 60 seconds)

// bump random channel_updates
Expand All @@ -226,9 +226,9 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike {
makeNewerChannelUpdate(c, if (side) u1 else u2)
}

val bumpedUpdates = (List(0, 42, 147, 153, 654, 834, 4301).map(touchUpdate(_, true)) ++ List(1, 42, 150, 200).map(touchUpdate(_, false))).toSet
val bumpedUpdates = (List(0, 3, 7).map(touchUpdate(_, true)) ++ List(1, 3, 9).map(touchUpdate(_, false))).toSet
bumpedUpdates.foreach(c => sender.send(bob, PeerRoutingMessage(sender.ref, charlieId, c)))
assert(BasicSyncResult(ranges = 2, queries = 2, channels = 0, updates = bumpedUpdates.size, nodes = if (requestNodeAnnouncements) 20 else 0) === sync(alice, bob, extendedQueryFlags_opt).counts)
assert(BasicSyncResult(ranges = 3, queries = 1, channels = 0, updates = bumpedUpdates.size, nodes = if (requestNodeAnnouncements) 5 * 2 else 0) === sync(alice, bob, extendedQueryFlags_opt).counts)
awaitCond(alice.stateData.channels === bob.stateData.channels, max = 60 seconds)
if (requestNodeAnnouncements) awaitCond(alice.stateData.nodes === bob.stateData.nodes)
}
Expand All @@ -254,7 +254,7 @@ class RoutingSyncSpec extends TestKit(ActorSystem("test")) with FunSuiteLike {
val QueryChannelRange(chainHash, firstBlockNum, numberOfBlocks, _) = sender.expectMsgType[QueryChannelRange]
sender.expectMsgType[GossipTimestampFilter]

val block1 = ReplyChannelRange(chainHash, firstBlockNum, numberOfBlocks, 1, EncodedShortChannelIds(EncodingType.UNCOMPRESSED, fakeRoutingInfo.take(100).keys.toList), None, None)
val block1 = ReplyChannelRange(chainHash, firstBlockNum, numberOfBlocks, 1, EncodedShortChannelIds(EncodingType.UNCOMPRESSED, fakeRoutingInfo.take(params.routerConf.channelQueryChunkSize).keys.toList), None, None)

// send first block
sender.send(router, PeerRoutingMessage(transport.ref, remoteNodeId, block1))
Expand Down