Skip to content

Commit

Permalink
Extended Queries: use TLV format for optional data (#1072)
Browse files Browse the repository at this point in the history
* Extended Queries: use TLV format for optional data

Optional query extensions now use TLV instead of a custom format.
Flags are encoded as varint instead of bytes as originally proposed. With the current proposal they will all fit on a single byte, but will be
much easier to extends this way.

* Move query message TLVs to their own namespace

We add one new class for each TLV type, with specific TLV types, and encapsulate codecs.

* Optional TLVs are represented as a list, not an optional list

TLVs that extend regular LN messages can be represented as a TlvStream and not an Option[TlvStream] since we don't need
to explicitely terminate the stream (either by preprending its length or using a specific terminator) as we do in Onion TLVs.

No TLVs simply means that the TLV stream is empty.

* Update to match  BOLT PR

Checksums in ReplyChannelRange now have the same encoding as short channel ids and timestamps: one byte for
the encoding type (uncompressed or zlib) followed by encoded data.

* TLV Stream: Implement a generic "get" method for TLV fields

If a have a TLV stream of type MyTLV which is a subtype of TLV, and MyTLV1 and MYTLV2 are both
subtypes of MyTLV then we can use stream.get[MyTLV1] to get the TLV record of type MYTLV1 (if any)
in our TLV stream.

* Extended range queries: Implement latest BOLT changes

Checksums are just transmitted as a raw array, with optional compression as it would be useless here.

* Use extended range queries on regtest and testnet

We will use them on mainnet as soon as lightning/bolts#557 has been merged.

* Address review comments

* Router: rework handling of ReplyChannelRange

We remove the ugly and inefficient zipWithIndex we had before

* NodeParams: move fee base check to its proper place

* Router: minor cleanup
  • Loading branch information
sstone authored and pm47 committed Aug 22, 2019
1 parent d321a21 commit 0780fc2
Show file tree
Hide file tree
Showing 16 changed files with 509 additions and 191 deletions.
9 changes: 7 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import fr.acinq.eclair.router.RouterConf
import fr.acinq.eclair.tor.Socks5ProxyParams
import fr.acinq.eclair.wire.{Color, NodeAddress}
import scodec.bits.ByteVector

import scala.collection.JavaConversions._
import scala.concurrent.duration.FiniteDuration

Expand Down Expand Up @@ -76,7 +77,6 @@ case class NodeParams(keyManager: KeyManager,
routerConf: RouterConf,
socksProxy_opt: Option[Socks5ProxyParams],
maxPaymentAttempts: Int) {

val privateKey = keyManager.nodeKey.privateKey
val nodeId = keyManager.nodeId
}
Expand Down Expand Up @@ -186,6 +186,11 @@ object NodeParams {
claimMainBlockTarget = config.getInt("on-chain-fees.target-blocks.claim-main")
)

val feeBase = MilliSatoshi(config.getInt("fee-base-msat"))
// fee base is in msat but is encoded on 32 bits and not 64 in the BOLTs, which is why it has
// to be below 0x100000000 msat which is about 42 mbtc
require(feeBase <= MilliSatoshi(0xFFFFFFFFL), "fee-base-msat must be below 42 mbtc")

NodeParams(
keyManager = keyManager,
alias = nodeAlias,
Expand All @@ -209,7 +214,7 @@ object NodeParams {
toRemoteDelayBlocks = config.getInt("to-remote-delay-blocks"),
maxToLocalDelayBlocks = config.getInt("max-to-local-delay-blocks"),
minDepthBlocks = config.getInt("mindepth-blocks"),
feeBase = MilliSatoshi(config.getInt("fee-base-msat")),
feeBase = feeBase,
feeProportionalMillionth = config.getInt("fee-proportional-millionths"),
reserveToFundingRatio = config.getDouble("reserve-to-funding-ratio"),
maxReserveToFundingRatio = config.getDouble("max-reserve-to-funding-ratio"),
Expand Down
13 changes: 10 additions & 3 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import akka.event.Logging.MDC
import akka.util.Timeout
import com.google.common.net.HostAndPort
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{ByteVector32, DeterministicWallet, Protocol, Satoshi}
import fr.acinq.eclair
import fr.acinq.bitcoin.{Block, ByteVector32, DeterministicWallet, Protocol, Satoshi}
import fr.acinq.eclair.blockchain.EclairWallet
import fr.acinq.eclair.channel._
import fr.acinq.eclair.crypto.TransportHandler
Expand Down Expand Up @@ -155,7 +154,15 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, authenticator: A
if (remoteHasChannelRangeQueriesOptional || remoteHasChannelRangeQueriesMandatory) {
// if they support channel queries, always ask for their filter
// NB: we always add extended info; if peer doesn't understand them it will ignore them
router ! SendChannelQuery(remoteNodeId, d.transport, flags_opt = Some(ExtendedQueryFlags.TIMESTAMPS_AND_CHECKSUMS))

// README: for now we do not activate extended queries on mainnet
val flags_opt = nodeParams.chainHash match {
case Block.RegtestGenesisBlock.hash | Block.TestnetGenesisBlock.hash =>
log.info("using extended range queries")
Some(QueryChannelRangeTlv.QueryFlags(QueryChannelRangeTlv.QueryFlags.WANT_ALL))
case _ => None
}
router ! SendChannelQuery(remoteNodeId, d.transport, flags_opt = flags_opt)
}

// let's bring existing/requested channels online
Expand Down
127 changes: 82 additions & 45 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package fr.acinq.eclair.wire
import java.net.{Inet4Address, Inet6Address, InetAddress}

import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
import fr.acinq.eclair.crypto.Mac32
import fr.acinq.bitcoin.{ByteVector32, ByteVector64, Satoshi}
import fr.acinq.eclair.crypto.Mac32
import fr.acinq.eclair.{MilliSatoshi, ShortChannelId, UInt64}
import org.apache.commons.codec.binary.Base32
import scodec.bits.{BitVector, ByteVector}
Expand Down Expand Up @@ -57,6 +57,10 @@ object CommonCodecs {
val satoshi: Codec[Satoshi] = uint64overflow.xmapc(l => Satoshi(l))(_.toLong)
val millisatoshi: Codec[MilliSatoshi] = uint64overflow.xmapc(l => MilliSatoshi(l))(_.amount)

// this is needed because some millisatoshi values are encoded on 32 bits in the BOLTs
// this codec will fail if the amount does not fit on 32 bits
val millisatoshi32: Codec[MilliSatoshi] = uint32.xmapc(l => MilliSatoshi(l))(_.amount)

/**
* We impose a minimal encoding on some values (such as varint and truncated int) to ensure that signed hashes can be
* re-computed correctly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package fr.acinq.eclair.wire

import fr.acinq.eclair.wire
import fr.acinq.eclair.wire.CommonCodecs._
import fr.acinq.eclair.{MilliSatoshi, wire}
import scodec.Codec
import scodec.codecs._

Expand Down Expand Up @@ -188,12 +188,11 @@ object LightningMessageCodecs {
("channelFlags" | byte) ::
("cltvExpiryDelta" | uint16) ::
("htlcMinimumMsat" | millisatoshi) ::
("feeBaseMsat" | uint32.xmapc(l => MilliSatoshi(l))(_.amount)) ::
("feeBaseMsat" | millisatoshi32) ::
("feeProportionalMillionths" | uint32) ::
("htlcMaximumMsat" | conditional((messageFlags & 1) != 0, millisatoshi))
})


val channelUpdateWitnessCodec =
("chainHash" | bytes32) ::
("shortChannelId" | shortchannelid) ::
Expand All @@ -202,7 +201,7 @@ object LightningMessageCodecs {
("channelFlags" | byte) ::
("cltvExpiryDelta" | uint16) ::
("htlcMinimumMsat" | millisatoshi) ::
("feeBaseMsat" | uint32.xmapc(l => MilliSatoshi(l))(_.amount)) ::
("feeBaseMsat" | millisatoshi32) ::
("feeProportionalMillionths" | uint32) ::
("htlcMaximumMsat" | conditional((messageFlags & 1) != 0, millisatoshi)) ::
("unknownFields" | bytes)
Expand All @@ -217,51 +216,38 @@ object LightningMessageCodecs {
.\(0) { case a@EncodedShortChannelIds(EncodingType.UNCOMPRESSED, _) => a }((provide[EncodingType](EncodingType.UNCOMPRESSED) :: list(shortchannelid)).as[EncodedShortChannelIds])
.\(1) { case a@EncodedShortChannelIds(EncodingType.COMPRESSED_ZLIB, _) => a }((provide[EncodingType](EncodingType.COMPRESSED_ZLIB) :: zlib(list(shortchannelid))).as[EncodedShortChannelIds])

val encodedQueryFlagsCodec: Codec[EncodedQueryFlags] =
discriminated[EncodedQueryFlags].by(byte)
.\(0) { case a@EncodedQueryFlags(EncodingType.UNCOMPRESSED, _) => a }((provide[EncodingType](EncodingType.UNCOMPRESSED) :: list(byte)).as[EncodedQueryFlags])
.\(1) { case a@EncodedQueryFlags(EncodingType.COMPRESSED_ZLIB, _) => a }((provide[EncodingType](EncodingType.COMPRESSED_ZLIB) :: zlib(list(byte))).as[EncodedQueryFlags])

val queryShortChannelIdsCodec: Codec[QueryShortChannelIds] = (
("chainHash" | bytes32) ::
("shortChannelIds" | variableSizeBytes(uint16, encodedShortChannelIdsCodec)) ::
("queryFlags_opt" | optional(bitsRemaining, variableSizeBytes(uint16, encodedQueryFlagsCodec)))
val queryShortChannelIdsCodec: Codec[QueryShortChannelIds] = {
Codec(
("chainHash" | bytes32) ::
("shortChannelIds" | variableSizeBytes(uint16, encodedShortChannelIdsCodec)) ::
("tlvStream" | QueryShortChannelIdsTlv.codec)
).as[QueryShortChannelIds]
}

val replyShortChanelIdsEndCodec: Codec[ReplyShortChannelIdsEnd] = (
("chainHash" | bytes32) ::
("complete" | byte)
).as[ReplyShortChannelIdsEnd]

val extendedQueryFlagsCodec: Codec[ExtendedQueryFlags] =
discriminated[ExtendedQueryFlags].by(byte)
.typecase(1, provide(ExtendedQueryFlags.TIMESTAMPS_AND_CHECKSUMS))

val queryChannelRangeCodec: Codec[QueryChannelRange] = (
("chainHash" | bytes32) ::
("firstBlockNum" | uint32) ::
("numberOfBlocks" | uint32) ::
("optionExtendedQueryFlags" | optional(bitsRemaining, extendedQueryFlagsCodec))
).as[QueryChannelRange]

val timestampsAndChecksumsCodec: Codec[TimestampsAndChecksums] = (
("timestamp1" | uint32) ::
("timestamp2" | uint32) ::
("checksum1" | uint32) ::
("checksum2" | uint32)
).as[TimestampsAndChecksums]

val extendedInfoCodec: Codec[ExtendedInfo] = list(timestampsAndChecksumsCodec).as[ExtendedInfo]

val replyChannelRangeCodec: Codec[ReplyChannelRange] = (
("chainHash" | bytes32) ::
("firstBlockNum" | uint32) ::
("numberOfBlocks" | uint32) ::
("complete" | byte) ::
("shortChannelIds" | variableSizeBytes(uint16, encodedShortChannelIdsCodec)) ::
("optionExtendedQueryFlags_opt" | optional(bitsRemaining, extendedQueryFlagsCodec)) ::
("extendedInfo_opt" | optional(bitsRemaining, variableSizeBytes(uint16, extendedInfoCodec)))
).as[ReplyChannelRange]
val queryChannelRangeCodec: Codec[QueryChannelRange] = {
Codec(
("chainHash" | bytes32) ::
("firstBlockNum" | uint32) ::
("numberOfBlocks" | uint32) ::
("tlvStream" | QueryChannelRangeTlv.codec)
).as[QueryChannelRange]
}

val replyChannelRangeCodec: Codec[ReplyChannelRange] = {
Codec(
("chainHash" | bytes32) ::
("firstBlockNum" | uint32) ::
("numberOfBlocks" | uint32) ::
("complete" | byte) ::
("shortChannelIds" | variableSizeBytes(uint16, encodedShortChannelIdsCodec)) ::
("tlvStream" | ReplyChannelRangeTlv.codec)
).as[ReplyChannelRange]
}

val gossipTimestampFilterCodec: Codec[GossipTimestampFilter] = (
("chainHash" | bytes32) ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import java.net.{Inet4Address, Inet6Address, InetAddress, InetSocketAddress}
import java.nio.charset.StandardCharsets

import com.google.common.base.Charsets
import fr.acinq.bitcoin.{ByteVector32, ByteVector64, Satoshi}
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
import fr.acinq.bitcoin.{ByteVector32, ByteVector64, Satoshi}
import fr.acinq.eclair.{MilliSatoshi, ShortChannelId, UInt64}
import scodec.bits.ByteVector

Expand Down Expand Up @@ -233,61 +233,54 @@ object EncodingType {
}
// @formatter:on

case object QueryFlagTypes {
val INCLUDE_CHANNEL_ANNOUNCEMENT: Byte = 1
val INCLUDE_CHANNEL_UPDATE_1: Byte = 2
val INCLUDE_CHANNEL_UPDATE_2: Byte = 4
val INCLUDE_ALL: Byte = (INCLUDE_CHANNEL_ANNOUNCEMENT | INCLUDE_CHANNEL_UPDATE_1 | INCLUDE_CHANNEL_UPDATE_2).toByte

def includeAnnouncement(flag: Byte) = (flag & QueryFlagTypes.INCLUDE_CHANNEL_ANNOUNCEMENT) != 0

def includeUpdate1(flag: Byte) = (flag & QueryFlagTypes.INCLUDE_CHANNEL_UPDATE_1) != 0

def includeUpdate2(flag: Byte) = (flag & QueryFlagTypes.INCLUDE_CHANNEL_UPDATE_2) != 0
}

case class EncodedShortChannelIds(encoding: EncodingType,
array: List[ShortChannelId])

case class EncodedQueryFlags(encoding: EncodingType,
array: List[Byte])

case class QueryShortChannelIds(chainHash: ByteVector32,
shortChannelIds: EncodedShortChannelIds,
queryFlags_opt: Option[EncodedQueryFlags]) extends RoutingMessage with HasChainHash
tlvStream: TlvStream[QueryShortChannelIdsTlv] = TlvStream.empty) extends RoutingMessage with HasChainHash {
val queryFlags_opt: Option[QueryShortChannelIdsTlv.EncodedQueryFlags] = tlvStream.get[QueryShortChannelIdsTlv.EncodedQueryFlags]
}

case class ReplyShortChannelIdsEnd(chainHash: ByteVector32,
complete: Byte) extends RoutingMessage with HasChainHash

// @formatter:off
sealed trait ExtendedQueryFlags
object ExtendedQueryFlags {
case object TIMESTAMPS_AND_CHECKSUMS extends ExtendedQueryFlags
}
// @formatter:on

case class QueryChannelRange(chainHash: ByteVector32,
firstBlockNum: Long,
numberOfBlocks: Long,
extendedQueryFlags_opt: Option[ExtendedQueryFlags]) extends RoutingMessage with HasChainHash
tlvStream: TlvStream[QueryChannelRangeTlv] = TlvStream.empty) extends RoutingMessage {
val queryFlags_opt: Option[QueryChannelRangeTlv.QueryFlags] = tlvStream.get[QueryChannelRangeTlv.QueryFlags]
}

case class ReplyChannelRange(chainHash: ByteVector32,
firstBlockNum: Long,
numberOfBlocks: Long,
complete: Byte,
shortChannelIds: EncodedShortChannelIds,
extendedQueryFlags_opt: Option[ExtendedQueryFlags],
extendedInfo_opt: Option[ExtendedInfo]) extends RoutingMessage with HasChainHash {
extendedInfo_opt.foreach(extendedInfo => require(shortChannelIds.array.size == extendedInfo.array.size, s"shortChannelIds.size=${shortChannelIds.array.size} != extendedInfo.size=${extendedInfo.array.size}"))
tlvStream: TlvStream[ReplyChannelRangeTlv] = TlvStream.empty) extends RoutingMessage {
val timestamps_opt: Option[ReplyChannelRangeTlv.EncodedTimestamps] = tlvStream.get[ReplyChannelRangeTlv.EncodedTimestamps]

val checksums_opt: Option[ReplyChannelRangeTlv.EncodedChecksums] = tlvStream.get[ReplyChannelRangeTlv.EncodedChecksums]
}

case class GossipTimestampFilter(chainHash: ByteVector32,
firstTimestamp: Long,
timestampRange: Long) extends RoutingMessage with HasChainHash
object ReplyChannelRange {
def apply(chainHash: ByteVector32,
firstBlockNum: Long,
numberOfBlocks: Long,
complete: Byte,
shortChannelIds: EncodedShortChannelIds,
timestamps: Option[ReplyChannelRangeTlv.EncodedTimestamps],
checksums: Option[ReplyChannelRangeTlv.EncodedChecksums]) = {
timestamps.foreach(ts => require(ts.timestamps.length == shortChannelIds.array.length))
checksums.foreach(cs => require(cs.checksums.length == shortChannelIds.array.length))
new ReplyChannelRange(chainHash, firstBlockNum, numberOfBlocks, complete, shortChannelIds, TlvStream(timestamps.toList ::: checksums.toList))
}
}

case class TimestampsAndChecksums(timestamp1: Long,
checksum1: Long,
timestamp2: Long,
checksum2: Long)

case class ExtendedInfo(array: List[TimestampsAndChecksums])
case class GossipTimestampFilter(chainHash: ByteVector32,
firstTimestamp: Long,
timestampRange: Long) extends RoutingMessage with HasChainHash
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package fr.acinq.eclair.wire

import fr.acinq.eclair.UInt64
import fr.acinq.eclair.wire.CommonCodecs.{shortchannelid, varint, varintoverflow}
import scodec.Codec
import scodec.codecs._

sealed trait QueryChannelRangeTlv extends Tlv

object QueryChannelRangeTlv {
/**
* Optional query flag that is appended to QueryChannelRange
* @param flag bit 1 set means I want timestamps, bit 2 set means I want checksums
*/
case class QueryFlags(flag: Long) extends QueryChannelRangeTlv {
val wantTimestamps = QueryFlags.wantTimestamps(flag)

val wantChecksums = QueryFlags.wantChecksums(flag)
}

case object QueryFlags {
val WANT_TIMESTAMPS: Long = 1
val WANT_CHECKSUMS: Long = 2
val WANT_ALL: Long = (WANT_TIMESTAMPS | WANT_CHECKSUMS)

def wantTimestamps(flag: Long) = (flag & WANT_TIMESTAMPS) != 0

def wantChecksums(flag: Long) = (flag & WANT_CHECKSUMS) != 0
}

val queryFlagsCodec: Codec[QueryFlags] = Codec(("flag" | varintoverflow)).as[QueryFlags]

val codec: Codec[TlvStream[QueryChannelRangeTlv]] = TlvCodecs.tlvStream(discriminated.by(varint)
.typecase(UInt64(1), variableSizeBytesLong(varintoverflow, queryFlagsCodec))
)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package fr.acinq.eclair.wire

import fr.acinq.eclair.UInt64
import fr.acinq.eclair.wire.CommonCodecs.{shortchannelid, varint, varintoverflow}
import scodec.Codec
import scodec.codecs.{byte, discriminated, list, provide, variableSizeBytesLong, zlib}

sealed trait QueryShortChannelIdsTlv extends Tlv

object QueryShortChannelIdsTlv {

/**
* Optional TLV-based query message that can be appended to QueryShortChannelIds
* @param encoding 0 means uncompressed, 1 means compressed with zlib
* @param array array of query flags, each flags specifies the info we want for a given channel
*/
case class EncodedQueryFlags(encoding: EncodingType, array: List[Long]) extends QueryShortChannelIdsTlv

case object QueryFlagType {
val INCLUDE_CHANNEL_ANNOUNCEMENT: Long = 1
val INCLUDE_CHANNEL_UPDATE_1: Long = 2
val INCLUDE_CHANNEL_UPDATE_2: Long = 4
val INCLUDE_ALL: Long = (INCLUDE_CHANNEL_ANNOUNCEMENT | INCLUDE_CHANNEL_UPDATE_1 | INCLUDE_CHANNEL_UPDATE_2)

def includeAnnouncement(flag: Long) = (flag & INCLUDE_CHANNEL_ANNOUNCEMENT) != 0

def includeUpdate1(flag: Long) = (flag & INCLUDE_CHANNEL_UPDATE_1) != 0

def includeUpdate2(flag: Long) = (flag & INCLUDE_CHANNEL_UPDATE_2) != 0
}

val encodedQueryFlagsCodec: Codec[EncodedQueryFlags] =
discriminated[EncodedQueryFlags].by(byte)
.\(0) { case a@EncodedQueryFlags(EncodingType.UNCOMPRESSED, _) => a }((provide[EncodingType](EncodingType.UNCOMPRESSED) :: list(varintoverflow)).as[EncodedQueryFlags])
.\(1) { case a@EncodedQueryFlags(EncodingType.COMPRESSED_ZLIB, _) => a }((provide[EncodingType](EncodingType.COMPRESSED_ZLIB) :: zlib(list(varintoverflow))).as[EncodedQueryFlags])


val codec: Codec[TlvStream[QueryShortChannelIdsTlv]] = TlvCodecs.tlvStream(discriminated.by(varint)
.typecase(UInt64(1), variableSizeBytesLong(varintoverflow, encodedQueryFlagsCodec))
)
}
Loading

0 comments on commit 0780fc2

Please sign in to comment.