Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a random delay before processing blocks #1825

Merged
merged 6 commits into from
May 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ eclair {
// expiry-delta-blocks.
fulfill-safety-before-timeout-blocks = 24
min-final-expiry-delta-blocks = 30 // Bolt 11 invoice's min_final_cltv_expiry; must be strictly greater than fulfill-safety-before-timeout-blocks
max-block-processing-delay = 30 seconds // we add a random delay before processing blocks, capped at this value, to prevent herd effect

fee-base-msat = 1000
fee-proportional-millionths = 100 // fee charged per transferred satoshi in millionths of a satoshi (100 = 0.01%)
Expand Down
2 changes: 2 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
expiryDelta: CltvExpiryDelta,
fulfillSafetyBeforeTimeout: CltvExpiryDelta,
minFinalExpiryDelta: CltvExpiryDelta,
maxBlockProcessingDelay: FiniteDuration,
htlcMinimum: MilliSatoshi,
toRemoteDelay: CltvExpiryDelta,
maxToLocalDelay: CltvExpiryDelta,
Expand Down Expand Up @@ -337,6 +338,7 @@ object NodeParams extends Logging {
expiryDelta = expiryDelta,
fulfillSafetyBeforeTimeout = fulfillSafetyBeforeTimeout,
minFinalExpiryDelta = minFinalExpiryDelta,
maxBlockProcessingDelay = FiniteDuration(config.getDuration("max-block-processing-delay").getSeconds, TimeUnit.SECONDS),
htlcMinimum = htlcMinimum,
toRemoteDelay = CltvExpiryDelta(config.getInt("to-remote-delay-blocks")),
maxToLocalDelay = CltvExpiryDelta(config.getInt("max-to-local-delay-blocks")),
Expand Down
27 changes: 17 additions & 10 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,19 @@ import fr.acinq.eclair.crypto.ShaChain
import fr.acinq.eclair.crypto.keymanager.ChannelKeyManager
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent.EventType
import fr.acinq.eclair.db.PendingCommandsDb
import fr.acinq.eclair.db.pg.PgUtils.PgLock.logger
import fr.acinq.eclair.io.Peer
import fr.acinq.eclair.payment.PaymentSettlingOnChain
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.transactions.Transactions.{ClosingTx, TxOwner}
import fr.acinq.eclair.transactions._
import fr.acinq.eclair.wire.protocol._
import org.sqlite.SQLiteException
import scodec.bits.ByteVector

import java.sql.SQLException
import scala.collection.immutable.Queue
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
import scala.util.{Failure, Random, Success, Try}

/**
* Created by PM on 20/08/2015.
Expand Down Expand Up @@ -125,6 +123,9 @@ object Channel {
*/
case class OutgoingMessage(msg: LightningMessage, peerConnection: ActorRef)

/** We don't immediately process [[CurrentBlockCount]] to avoid herd effects */
case class ProcessCurrentBlockCount(c: CurrentBlockCount)

}

class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId: PublicKey, blockchain: typed.ActorRef[ZmqWatcher.Command], relayer: ActorRef, txPublisherFactory: Channel.TxPublisherFactory, origin_opt: Option[ActorRef] = None)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) extends FSM[State, Data] with FSMDiagnosticActorLogging[State, Data] {
Expand All @@ -150,6 +151,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

// this will be used to detect htlc timeouts
context.system.eventStream.subscribe(self, classOf[CurrentBlockCount])
// the constant delay by which we delay processing of blocks (it will be smoothened among all channels)
private val blockProcessingDelay = Random.nextLong(nodeParams.maxBlockProcessingDelay.toMillis + 1).millis
// this will be used to make sure the current commitment fee is up-to-date
context.system.eventStream.subscribe(self, classOf[CurrentFeerates])

Expand Down Expand Up @@ -631,7 +634,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

case Event(BITCOIN_FUNDING_PUBLISH_FAILED, d: DATA_WAIT_FOR_FUNDING_CONFIRMED) => handleFundingPublishFailed(d)

case Event(c: CurrentBlockCount, d: DATA_WAIT_FOR_FUNDING_CONFIRMED) => d.fundingTx match {
case Event(ProcessCurrentBlockCount(c), d: DATA_WAIT_FOR_FUNDING_CONFIRMED) => d.fundingTx match {
case Some(_) => stay // we are funder, we're still waiting for the funding tx to be confirmed
case None if c.blockCount - d.waitingSinceBlock > FUNDING_TIMEOUT_FUNDEE =>
log.warning(s"funding tx hasn't been published in ${c.blockCount - d.waitingSinceBlock} blocks")
Expand Down Expand Up @@ -943,7 +946,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}
}

case Event(c: CurrentBlockCount, d: DATA_NORMAL) => handleNewBlock(c, d)
case Event(ProcessCurrentBlockCount(c), d: DATA_NORMAL) => handleNewBlock(c, d)

case Event(c: CurrentFeerates, d: DATA_NORMAL) => handleCurrentFeerate(c, d)

Expand Down Expand Up @@ -1221,7 +1224,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

case Event(r: RevocationTimeout, d: DATA_SHUTDOWN) => handleRevocationTimeout(r, d)

case Event(c: CurrentBlockCount, d: DATA_SHUTDOWN) => handleNewBlock(c, d)
case Event(ProcessCurrentBlockCount(c), d: DATA_SHUTDOWN) => handleNewBlock(c, d)

case Event(c: CurrentFeerates, d: DATA_SHUTDOWN) => handleCurrentFeerate(c, d)

Expand Down Expand Up @@ -1519,7 +1522,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// note: this can only happen if state is NORMAL or SHUTDOWN
// -> in NEGOTIATING there are no more htlcs
// -> in CLOSING we either have mutual closed (so no more htlcs), or already have unilaterally closed (so no action required), and we can't be in OFFLINE state anyway
case Event(c: CurrentBlockCount, d: HasCommitments) => handleNewBlock(c, d)
case Event(ProcessCurrentBlockCount(c), d: HasCommitments) => handleNewBlock(c, d)

case Event(c: CurrentFeerates, d: HasCommitments) =>
handleOfflineFeerate(c, d)
Expand Down Expand Up @@ -1710,7 +1713,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
context.system.scheduler.scheduleOnce(5 seconds, self, remoteAnnSigs)
stay

case Event(c: CurrentBlockCount, d: HasCommitments) => handleNewBlock(c, d)
case Event(ProcessCurrentBlockCount(c), d: HasCommitments) => handleNewBlock(c, d)

case Event(c: CurrentFeerates, d: HasCommitments) =>
handleOfflineFeerate(c, d)
Expand Down Expand Up @@ -1792,8 +1795,13 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// we only care about this event in NORMAL and SHUTDOWN state, and there may be cases where the task is not cancelled
case Event(_: RevocationTimeout, _) => stay

// we reschedule with a random delay to prevent herd effect when there are a lot of channels
case Event(c: CurrentBlockCount, _) =>
context.system.scheduler.scheduleOnce(blockProcessingDelay, self, ProcessCurrentBlockCount(c))
stay

// we only care about this event in NORMAL and SHUTDOWN state, and we never unregister to the event stream
case Event(CurrentBlockCount(_), _) => stay
case Event(ProcessCurrentBlockCount(_), _) => stay

// we only care about this event in NORMAL and SHUTDOWN state, and we never unregister to the event stream
case Event(CurrentFeerates(_), _) => stay
Expand Down Expand Up @@ -2568,4 +2576,3 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

}


Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ object TestConstants {
expiryDelta = CltvExpiryDelta(144),
fulfillSafetyBeforeTimeout = CltvExpiryDelta(6),
minFinalExpiryDelta = CltvExpiryDelta(18),
maxBlockProcessingDelay = 10 millis,
htlcMinimum = 0 msat,
minDepthBlocks = 3,
toRemoteDelay = CltvExpiryDelta(144),
Expand Down Expand Up @@ -215,6 +216,7 @@ object TestConstants {
expiryDelta = CltvExpiryDelta(144),
fulfillSafetyBeforeTimeout = CltvExpiryDelta(6),
minFinalExpiryDelta = CltvExpiryDelta(18),
maxBlockProcessingDelay = 10 millis,
htlcMinimum = 1000 msat,
minDepthBlocks = 3,
toRemoteDelay = CltvExpiryDelta(144),
Expand Down Expand Up @@ -287,4 +289,4 @@ object TestTags {
// Tests that call an external API (which may start failing independently of our code).
object ExternalApi extends Tag("external-api")

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ abstract class IntegrationSpec extends TestKitBaseClass with BitcoindService wit
"eclair.bitcoind.wallet" -> defaultWallet,
"eclair.mindepth-blocks" -> 2,
"eclair.max-htlc-value-in-flight-msat" -> 100000000000L,
"eclair.router.broadcast-interval" -> "2 second",
"eclair.max-block-processing-delay" -> "2 seconds",
pm47 marked this conversation as resolved.
Show resolved Hide resolved
"eclair.router.broadcast-interval" -> "2 seconds",
"eclair.auto-reconnect" -> false,
"eclair.to-remote-delay-blocks" -> 24,
"eclair.multi-part-payment-expiry" -> "20 seconds").asJava).withFallback(ConfigFactory.load())
Expand Down