Skip to content

Commit

Permalink
Relax single tx input requirements (#1677)
Browse files Browse the repository at this point in the history
In some places of the codebase we relied on the fact that lightning transactions
had a single input. That was correct with the standard commitments format,
but will not be the case with anchor outputs: 2nd-stage txs (htlc-txs) and
3rd-stage txs (claim-htlc-txs) can be RBF-ed and have any number of inputs
and outputs.
  • Loading branch information
t-bast committed Feb 18, 2021
1 parent 9618a6a commit ab89851
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try

/**
* A blockchain watcher that:
* - receives bitcoin events (new blocks and new txes) directly from the bitcoin network
* - also uses bitcoin-core rpc api, most notably for tx confirmation count and blockcount (because reorgs)
* Created by PM on 21/02/2016.
*/

/**
* A blockchain watcher that:
* - receives bitcoin events (new blocks and new txs) directly from the bitcoin network
* - also uses bitcoin-core rpc api, most notably for tx confirmation count and block count (because reorgs)
*/
class ZmqWatcher(chainHash: ByteVector32, blockCount: AtomicLong, client: ExtendedBitcoinClient)(implicit ec: ExecutionContext = ExecutionContext.global) extends Actor with ActorLogging {

import ZmqWatcher._
Expand Down Expand Up @@ -180,13 +183,15 @@ class ZmqWatcher(chainHash: ByteVector32, blockCount: AtomicLong, client: Extend
case PublishAsap(tx) =>
val blockCount = this.blockCount.get()
val cltvTimeout = Scripts.cltvTimeout(tx)
val csvTimeout = Scripts.csvTimeout(tx)
if (csvTimeout > 0) {
require(tx.txIn.size == 1, s"watcher only supports tx with 1 input, this tx has ${tx.txIn.size} inputs")
val parentTxid = tx.txIn.head.outPoint.txid
log.info(s"txid=${tx.txid} has a relative timeout of $csvTimeout blocks, watching parenttxid=$parentTxid tx={}", tx)
val parentPublicKey = fr.acinq.bitcoin.Script.write(fr.acinq.bitcoin.Script.pay2wsh(tx.txIn.head.witness.stack.last))
self ! WatchConfirmed(self, parentTxid, parentPublicKey, minDepth = 1, BITCOIN_PARENT_TX_CONFIRMED(tx))
val csvTimeouts = Scripts.csvTimeouts(tx)
if (csvTimeouts.nonEmpty) {
// watcher supports txs with multiple csv-delayed inputs: we watch all delayed parents and try to publish every
// time a parent's relative delays are satisfied, so we will eventually succeed.
csvTimeouts.foreach { case (parentTxId, csvTimeout) =>
log.info(s"txid=${tx.txid} has a relative timeout of $csvTimeout blocks, watching parentTxId=$parentTxId tx={}", tx)
val parentPublicKeyScript = Script.write(Script.pay2wsh(tx.txIn.find(_.outPoint.txid == parentTxId).get.witness.stack.last))
self ! WatchConfirmed(self, parentTxId, parentPublicKeyScript, minDepth = csvTimeout, BITCOIN_PARENT_TX_CONFIRMED(tx))
}
} else if (cltvTimeout > blockCount) {
log.info(s"delaying publication of txid=${tx.txid} until block=$cltvTimeout (curblock=$blockCount)")
val block2tx1 = block2tx.updated(cltvTimeout, block2tx.getOrElse(cltvTimeout, Seq.empty[Transaction]) :+ tx)
Expand All @@ -197,11 +202,9 @@ class ZmqWatcher(chainHash: ByteVector32, blockCount: AtomicLong, client: Extend
log.info(s"parent tx of txid=${tx.txid} has been confirmed")
val blockCount = this.blockCount.get()
val cltvTimeout = Scripts.cltvTimeout(tx)
val csvTimeout = Scripts.csvTimeout(tx)
val absTimeout = math.max(blockHeight + csvTimeout, cltvTimeout)
if (absTimeout > blockCount) {
log.info(s"delaying publication of txid=${tx.txid} until block=$absTimeout (curblock=$blockCount)")
val block2tx1 = block2tx.updated(absTimeout, block2tx.getOrElse(absTimeout, Seq.empty[Transaction]) :+ tx)
if (cltvTimeout > blockCount) {
log.info(s"delaying publication of txid=${tx.txid} until block=$cltvTimeout (curblock=$blockCount)")
val block2tx1 = block2tx.updated(cltvTimeout, block2tx.getOrElse(cltvTimeout, Seq.empty[Transaction]) :+ tx)
context become watching(watches, watchedUtxos, block2tx1, nextTick)
} else publish(tx)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,15 @@ class ElectrumWatcher(blockCount: AtomicLong, client: ActorRef) extends Actor wi
case PublishAsap(tx) =>
val blockCount = this.blockCount.get()
val cltvTimeout = Scripts.cltvTimeout(tx)
val csvTimeout = Scripts.csvTimeout(tx)
if (csvTimeout > 0) {
require(tx.txIn.size == 1, s"watcher only supports tx with 1 input, this tx has ${tx.txIn.size} inputs")
val parentTxid = tx.txIn.head.outPoint.txid
log.info(s"txid=${tx.txid} has a relative timeout of $csvTimeout blocks, watching parenttxid=$parentTxid tx={}", tx)
val parentPublicKeyScript = WatchConfirmed.extractPublicKeyScript(tx.txIn.head.witness)
self ! WatchConfirmed(self, parentTxid, parentPublicKeyScript, minDepth = 1, BITCOIN_PARENT_TX_CONFIRMED(tx))
val csvTimeouts = Scripts.csvTimeouts(tx)
if (csvTimeouts.nonEmpty) {
// watcher supports txs with multiple csv-delayed inputs: we watch all delayed parents and try to publish every
// time a parent's relative delays are satisfied, so we will eventually succeed.
csvTimeouts.foreach { case (parentTxId, csvTimeout) =>
log.info(s"txid=${tx.txid} has a relative timeout of $csvTimeout blocks, watching parentTxId=$parentTxId tx={}", tx)
val parentPublicKeyScript = WatchConfirmed.extractPublicKeyScript(tx.txIn.find(_.outPoint.txid == parentTxId).get.witness)
self ! WatchConfirmed(self, parentTxId, parentPublicKeyScript, minDepth = csvTimeout, BITCOIN_PARENT_TX_CONFIRMED(tx))
}
} else if (cltvTimeout > blockCount) {
log.info(s"delaying publication of txid=${tx.txid} until block=$cltvTimeout (curblock=$blockCount)")
val block2tx1 = block2tx.updated(cltvTimeout, block2tx.getOrElse(cltvTimeout, Seq.empty[Transaction]) :+ tx)
Expand All @@ -193,11 +195,9 @@ class ElectrumWatcher(blockCount: AtomicLong, client: ActorRef) extends Actor wi
log.info(s"parent tx of txid=${tx.txid} has been confirmed")
val blockCount = this.blockCount.get()
val cltvTimeout = Scripts.cltvTimeout(tx)
val csvTimeout = Scripts.csvTimeout(tx)
val absTimeout = math.max(blockHeight + csvTimeout, cltvTimeout)
if (absTimeout > blockCount) {
log.info(s"delaying publication of txid=${tx.txid} until block=$absTimeout (curblock=$blockCount)")
val block2tx1 = block2tx.updated(absTimeout, block2tx.getOrElse(absTimeout, Seq.empty[Transaction]) :+ tx)
if (cltvTimeout > blockCount) {
log.info(s"delaying publication of txid=${tx.txid} until block=$cltvTimeout (curblock=$blockCount)")
val block2tx1 = block2tx.updated(cltvTimeout, block2tx.getOrElse(cltvTimeout, Seq.empty[Transaction]) :+ tx)
context become running(height, tip, watches, scriptHashStatus, block2tx1, sent)
} else {
publish(tx)
Expand Down
23 changes: 11 additions & 12 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
val revokedCommitPublished1 = d.revokedCommitPublished.map { rev =>
val (rev1, tx_opt) = Closing.claimRevokedHtlcTxOutputs(keyManager, d.commitments, rev, tx, nodeParams.onChainFeeConf.feeEstimator)
tx_opt.foreach(claimTx => blockchain ! PublishAsap(claimTx))
tx_opt.foreach(claimTx => blockchain ! WatchSpent(self, tx, claimTx.txIn.head.outPoint.index.toInt, BITCOIN_OUTPUT_SPENT))
tx_opt.foreach(claimTx => blockchain ! WatchSpent(self, tx, claimTx.txIn.filter(_.outPoint.txid == tx.txid).head.outPoint.index.toInt, BITCOIN_OUTPUT_SPENT))
rev1
}
stay using d.copy(revokedCommitPublished = revokedCommitPublished1) storing()
Expand Down Expand Up @@ -2116,10 +2116,10 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}

/**
* This helper method will publish txes only if they haven't yet reached minDepth
* This helper method will publish txs only if they haven't yet reached minDepth
*/
def publishIfNeeded(txes: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]): Unit = {
val (skip, process) = txes.partition(Closing.inputsAlreadySpent(_, irrevocablySpent))
def publishIfNeeded(txs: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]): Unit = {
val (skip, process) = txs.partition(Closing.inputsAlreadySpent(_, irrevocablySpent))
process.foreach { tx =>
log.info(s"publishing txid=${tx.txid}")
blockchain ! PublishAsap(tx)
Expand All @@ -2128,20 +2128,20 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}

/**
* This helper method will watch txes only if they haven't yet reached minDepth
* This helper method will watch txs only if they haven't yet reached minDepth
*/
def watchConfirmedIfNeeded(txes: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]): Unit = {
val (skip, process) = txes.partition(Closing.inputsAlreadySpent(_, irrevocablySpent))
def watchConfirmedIfNeeded(txs: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]): Unit = {
val (skip, process) = txs.partition(Closing.inputsAlreadySpent(_, irrevocablySpent))
process.foreach(tx => blockchain ! WatchConfirmed(self, tx, nodeParams.minDepthBlocks, BITCOIN_TX_CONFIRMED(tx)))
skip.foreach(tx => log.info(s"no need to watch txid=${tx.txid}, it has already been confirmed"))
}

/**
* This helper method will watch txes only if the utxo they spend hasn't already been irrevocably spent
* This helper method will watch txs only if the utxo they spend hasn't already been irrevocably spent
*/
def watchSpentIfNeeded(parentTx: Transaction, txes: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]): Unit = {
val (skip, process) = txes.partition(Closing.inputsAlreadySpent(_, irrevocablySpent))
process.foreach(tx => blockchain ! WatchSpent(self, parentTx, tx.txIn.head.outPoint.index.toInt, BITCOIN_OUTPUT_SPENT))
def watchSpentIfNeeded(parentTx: Transaction, txs: Iterable[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]): Unit = {
val (skip, process) = txs.partition(Closing.inputsAlreadySpent(_, irrevocablySpent))
process.foreach(tx => blockchain ! WatchSpent(self, parentTx, tx.txIn.filter(_.outPoint.txid == parentTx.txid).head.outPoint.index.toInt, BITCOIN_OUTPUT_SPENT))
skip.foreach(tx => log.info(s"no need to watch txid=${tx.txid}, it has already been confirmed"))
}

Expand Down Expand Up @@ -2226,7 +2226,6 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

def handleRemoteSpentOther(tx: Transaction, d: HasCommitments) = {
log.warning(s"funding tx spent in txid=${tx.txid}")

Helpers.Closing.claimRevokedRemoteCommitTxOutputs(keyManager, d.commitments, tx, nodeParams.db.channels, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets) match {
case Some(revokedCommitPublished) =>
log.warning(s"txid=${tx.txid} was a revoked commitment, publishing the penalty tx")
Expand Down
Loading

0 comments on commit ab89851

Please sign in to comment.