Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various improvements and fixes #1817

Merged
merged 8 commits into from
May 25, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,11 @@ private class ZmqWatcher(chainHash: ByteVector32, blockCount: AtomicLong, client
def getUnconfirmedAncestorCount(utxo: Utxo): Future[(ByteVector32, Long)] = client.rpcClient.invoke("getmempoolentry", utxo.txId).map(json => {
val JInt(ancestorCount) = json \ "ancestorcount"
(utxo.txId, ancestorCount.toLong)
})
}).recover {
case ex: Throwable =>
log.warn(s"could not retrieve unconfirmed ancestor count for txId=${utxo.txId} amount=${utxo.amount}", ex)
pm47 marked this conversation as resolved.
Show resolved Hide resolved
(utxo.txId, 0)
}

def getUnconfirmedAncestorCountMap(utxos: Seq[Utxo]): Future[Map[ByteVector32, Long]] = Future.sequence(utxos.filter(_.confirmations == 0).map(getUnconfirmedAncestorCount)).map(_.toMap)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object ExplorerApi {
Behaviors.stopped

case WrappedFailure(e) =>
context.log.error(s"${explorer.name} failed: ", e)
context.log.warn(s"${explorer.name} failed: ", e)
Metrics.WatchdogError.withTag(Tags.Source, explorer.name).increment()
Behaviors.stopped
}
Expand Down
13 changes: 11 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1443,7 +1443,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// finally, if one of the unilateral closes is done, we move to CLOSED state, otherwise we stay (note that we don't store the state)
closingType_opt match {
case Some(closingType) =>
log.info(s"channel closed (type=$closingType)")
log.info(s"channel closed (type=${closingType_opt.map(c => EventType.Closed(c).label).getOrElse("UnknownYet")})")
context.system.eventStream.publish(ChannelClosed(self, d.channelId, closingType, d.commitments))
goto(CLOSED) using d1 storing()
case None =>
Expand Down Expand Up @@ -1701,6 +1701,14 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
peer ! Peer.Disconnect(remoteNodeId)
stay

// This handler is a workaround for an issue in lnd similar to the one above: they sometimes send announcement_signatures
// before channel_reestablish, which is a minor spec violation. It doesn't halt the channel, we can simply postpone
// that message.
case Event(remoteAnnSigs: AnnouncementSignatures, _) =>
log.warning("received announcement_signatures before channel_reestablish (known lnd bug): delaying...")
context.system.scheduler.scheduleOnce(5 seconds, self, remoteAnnSigs)
stay

case Event(c: CurrentBlockCount, d: HasCommitments) => handleNewBlock(c, d)

case Event(c: CurrentFeerates, d: HasCommitments) =>
Expand Down Expand Up @@ -2125,11 +2133,12 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
private def handleLocalError(cause: Throwable, d: Data, msg: Option[Any]) = {
cause match {
case _: ForcedLocalCommit => log.warning(s"force-closing channel at user request")
case _ if stateName == WAIT_FOR_OPEN_CHANNEL => log.info(s"${cause.getMessage} while processing msg=${msg.getOrElse("n/a").getClass.getSimpleName} in state=$stateName")
pm47 marked this conversation as resolved.
Show resolved Hide resolved
case _ => log.error(s"${cause.getMessage} while processing msg=${msg.getOrElse("n/a").getClass.getSimpleName} in state=$stateName")
}
cause match {
case _: ChannelException => ()
case _ => log.error(cause, s"msg=${msg.getOrElse("n/a")} stateData=$stateData ")
case _ => log.error(cause, s"msg=${msg.getOrElse("n/a")} in state=$stateName ")
pm47 marked this conversation as resolved.
Show resolved Hide resolved
}
val error = Error(d.channelId, cause.getMessage)
context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, stateData, LocalError(cause), isFatal = true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ object Sphinx extends Logging {
require(packet.length == PacketLength, s"invalid error packet length ${packet.length}, must be $PacketLength")

@tailrec
def loop(packet: ByteVector, sharedSecrets: Seq[(ByteVector32, PublicKey)]): DecryptedFailurePacket = sharedSecrets match {
def loop(packet: ByteVector, secrets: Seq[(ByteVector32, PublicKey)]): DecryptedFailurePacket = secrets match {
case Nil => throw new RuntimeException(s"couldn't parse error packet=$packet with sharedSecrets=$sharedSecrets")
case (secret, pubkey) :: tail =>
val packet1 = wrap(packet, secret)
Expand Down
9 changes: 7 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

package fr.acinq.eclair.io

import akka.actor.typed
import akka.actor.{Actor, ActorContext, ActorRef, ExtendedActorSystem, FSM, OneForOneStrategy, PossiblyHarmful, Props, Status, SupervisorStrategy, Terminated}
import akka.actor.{Actor, ActorContext, ActorRef, ExtendedActorSystem, FSM, OneForOneStrategy, PossiblyHarmful, Props, Status, SupervisorStrategy, Terminated, typed}
import akka.event.Logging.MDC
import akka.event.{BusLogging, DiagnosticLoggingAdapter}
import akka.util.Timeout
Expand Down Expand Up @@ -86,6 +85,12 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: EclairWa
stay using d.copy(channels = channels1)
}

// This event is usually handled while we're connected, but if our peer disconnects right when we're emitting this,
// we still want to record the channelId mapping.
case Event(ChannelIdAssigned(channel, _, temporaryChannelId, channelId), d: DisconnectedData) =>
log.info(s"channel id switch: previousId=$temporaryChannelId nextId=$channelId")
stay using d.copy(channels = d.channels + (FinalChannelId(channelId) -> channel))

case Event(_: LightningMessage, _) => stay // we probably just got disconnected and that's the last messages we received
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ class PostRestartHtlcCleaner(nodeParams: NodeParams, register: ActorRef, initial
val nonStandardRelayedOutHtlcs: Map[Origin, Set[(ByteVector32, Long)]] = nodeParams.pluginParams.collect { case p: CustomCommitmentsPlugin => p.getHtlcsRelayedOut(htlcsIn, nodeParams, log) }.flatten.toMap
val relayedOut: Map[Origin, Set[(ByteVector32, Long)]] = getHtlcsRelayedOut(channels, htlcsIn) ++ nonStandardRelayedOutHtlcs

val notRelayed = htlcsIn.filterNot(htlcIn => relayedOut.keys.exists(origin => matchesOrigin(htlcIn.add, origin)))
val notRelayed = htlcsIn.filterNot(htlcIn => {
// If an HTLC has been relayed and then settled downstream, it will not have a matching entry in relayedOut.
// When that happens, there will be an HTLC settlement command in the pendingRelay DB, and we will let the channel
// replay it instead of sending a conflicting command.
relayedOut.keys.exists(origin => matchesOrigin(htlcIn.add, origin)) || nodeParams.db.pendingCommands.listSettlementCommands(htlcIn.add.channelId).exists(_.id == htlcIn.add.id)
})
pm47 marked this conversation as resolved.
Show resolved Hide resolved
cleanupRelayDb(htlcsIn, nodeParams.db.pendingCommands)

log.info(s"htlcsIn=${htlcsIn.length} notRelayed=${notRelayed.length} relayedOut=${relayedOut.values.flatten.size}")
Expand Down Expand Up @@ -332,6 +337,7 @@ object PostRestartHtlcCleaner {
def groupByOrigin(htlcsOut: Seq[(Origin, ByteVector32, Long)], htlcsIn: Seq[IncomingHtlc]): Map[Origin, Set[(ByteVector32, Long)]] =
htlcsOut
.groupBy { case (origin, _, _) => origin }
.view
.mapValues(_.map { case (_, channelId, htlcId) => (channelId, htlcId) }.toSet)
// We are only interested in HTLCs that are pending upstream (not fulfilled nor failed yet).
// It may be the case that we have unresolved HTLCs downstream that have been resolved upstream when the downstream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
}
RemoteFailure(cfg.fullRoute(route), e)
case Failure(t) =>
log.warning(s"cannot parse returned error: ${t.getMessage}")
log.warning(s"cannot parse returned error ${fail.reason.toHex} with sharedSecrets=$sharedSecrets: ${t.getMessage}")
UnreadableRemoteFailure(cfg.fullRoute(route))
}
log.warning(s"too many failed attempts, failing the payment")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import fr.acinq.eclair.Features.StaticRemoteKey
import fr.acinq.eclair.TestConstants.{Alice, Bob}
import fr.acinq.eclair.UInt64.Conversions._
import fr.acinq.eclair._
import fr.acinq.eclair.blockchain.{CurrentBlockCount, CurrentFeerates}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._
import fr.acinq.eclair.blockchain.fee.{FeeratePerByte, FeeratePerKw, FeeratesPerKw}
import fr.acinq.eclair.blockchain.{CurrentBlockCount, CurrentFeerates}
import fr.acinq.eclair.channel.Channel._
import fr.acinq.eclair.channel.TxPublisher.{PublishRawTx, PublishTx}
import fr.acinq.eclair.channel._
Expand All @@ -39,7 +39,7 @@ import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.transactions.DirectedHtlc.{incoming, outgoing}
import fr.acinq.eclair.transactions.Transactions
import fr.acinq.eclair.transactions.Transactions.{DefaultCommitmentFormat, HtlcSuccessTx, weight2fee}
import fr.acinq.eclair.wire.protocol.{AnnouncementSignatures, ChannelUpdate, ClosingSigned, CommitSig, Error, FailureMessageCodecs, PermanentChannelFailure, RevokeAndAck, Shutdown, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFee, UpdateFulfillHtlc}
import fr.acinq.eclair.wire.protocol.{AnnouncementSignatures, ChannelUpdate, ClosingSigned, CommitSig, Error, FailureMessageCodecs, PermanentChannelFailure, RevokeAndAck, Shutdown, TemporaryNodeFailure, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFee, UpdateFulfillHtlc}
import org.scalatest.funsuite.FixtureAnyFunSuiteLike
import org.scalatest.{Outcome, Tag}
import scodec.bits._
Expand Down Expand Up @@ -1364,6 +1364,25 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
assert(initialState == bob.stateData)
}

test("recv CMD_FAIL_HTLC (htlc pending fulfill)") { f =>
import f._

val sender = TestProbe()
val (r, htlc) = addHtlc(50000000 msat, alice, bob, alice2bob, bob2alice)
crossSign(alice, bob, alice2bob, bob2alice)

// HTLC is fulfilled but alice doesn't send its revocation.
bob ! CMD_FULFILL_HTLC(htlc.id, r)
bob ! CMD_SIGN()
bob2alice.expectMsgType[UpdateFulfillHtlc]
bob2alice.expectMsgType[CommitSig]

// We cannot fail the HTLC, we must wait for the fulfill to be acked.
val c = CMD_FAIL_HTLC(htlc.id, Right(TemporaryNodeFailure), replyTo_opt = Some(sender.ref))
bob ! c
sender.expectMsg(RES_FAILURE(c, UnknownHtlcId(channelId(bob), htlc.id)))
}

test("recv CMD_FAIL_HTLC (acknowledge in case of failure)") { f =>
import f._
val sender = TestProbe()
Expand Down
17 changes: 17 additions & 0 deletions eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,23 @@ class PeerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Paralle
assert(init.fundingAmount === 15000.sat)
assert(init.pushAmount === 100.msat)
}

test("handle final channelId assigned in state DISCONNECTED") { f =>
import f._
val probe = TestProbe()
connect(remoteNodeId, peer, peerConnection, channels = Set(ChannelCodecsSpec.normal))
peer ! ConnectionDown(peerConnection.ref)
probe.send(peer, Peer.GetPeerInfo)
val peerInfo1 = probe.expectMsgType[Peer.PeerInfo]
assert(peerInfo1.state === "DISCONNECTED")
assert(peerInfo1.channels === 1)
peer ! ChannelIdAssigned(probe.ref, remoteNodeId, randomBytes32(), randomBytes32())
probe.send(peer, Peer.GetPeerInfo)
val peerInfo2 = probe.expectMsgType[Peer.PeerInfo]
assert(peerInfo2.state === "DISCONNECTED")
assert(peerInfo2.channels === 2)
}

}

object PeerSpec {
Expand Down
Loading