Skip to content

Commit

Permalink
Removed Globals class (#1127)
Browse files Browse the repository at this point in the history
This is a prerequisite to parallelization of tests.
  • Loading branch information
pm47 committed Sep 11, 2019
1 parent 26e4432 commit 2fbf46a
Show file tree
Hide file tree
Showing 45 changed files with 429 additions and 419 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ case class CltvExpiryDelta(private val underlying: Int) extends Ordered[CltvExpi
/**
* Adds the current block height to the given delta to obtain an absolute expiry.
*/
def toCltvExpiry = CltvExpiry(Globals.blockCount.get() + underlying)
def toCltvExpiry(blockHeight: Long) = CltvExpiry(blockHeight + underlying)

// @formatter:off
def +(other: Int): CltvExpiryDelta = CltvExpiryDelta(underlying + other)
Expand Down
2 changes: 1 addition & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ class EclairImpl(appKit: Kit) extends Eclair {
GetInfoResponse(nodeId = appKit.nodeParams.nodeId,
alias = appKit.nodeParams.alias,
chainHash = appKit.nodeParams.chainHash,
blockHeight = Globals.blockCount.intValue(),
blockHeight = appKit.nodeParams.currentBlockHeight.toInt,
publicAddresses = appKit.nodeParams.publicAddresses)
)

Expand Down
48 changes: 0 additions & 48 deletions eclair-core/src/main/scala/fr/acinq/eclair/Globals.scala

This file was deleted.

6 changes: 5 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.io.File
import java.net.InetSocketAddress
import java.nio.file.Files
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong

import com.typesafe.config.{Config, ConfigFactory}
import fr.acinq.bitcoin.Crypto.PublicKey
Expand All @@ -41,6 +42,7 @@ import scala.concurrent.duration.FiniteDuration
* Created by PM on 26/02/2017.
*/
case class NodeParams(keyManager: KeyManager,
private val blockCount: AtomicLong,
alias: String,
color: Color,
publicAddresses: List[NodeAddress],
Expand Down Expand Up @@ -80,6 +82,7 @@ case class NodeParams(keyManager: KeyManager,
maxPaymentAttempts: Int) {
val privateKey = keyManager.nodeKey.privateKey
val nodeId = keyManager.nodeId
def currentBlockHeight: Long = blockCount.get
}

object NodeParams {
Expand Down Expand Up @@ -124,7 +127,7 @@ object NodeParams {
}
}

def makeNodeParams(config: Config, keyManager: KeyManager, torAddress_opt: Option[NodeAddress], database: Databases, feeEstimator: FeeEstimator): NodeParams = {
def makeNodeParams(config: Config, keyManager: KeyManager, torAddress_opt: Option[NodeAddress], database: Databases, blockCount: AtomicLong, feeEstimator: FeeEstimator): NodeParams = {

val chain = config.getString("chain")
val chainHash = makeChainHash(chain)
Expand Down Expand Up @@ -201,6 +204,7 @@ object NodeParams {

NodeParams(
keyManager = keyManager,
blockCount = blockCount,
alias = nodeAlias,
color = Color(color(0), color(1), color(2)),
publicAddresses = addresses,
Expand Down
44 changes: 32 additions & 12 deletions eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.io.File
import java.net.InetSocketAddress
import java.sql.DriverManager
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}

import akka.Done
import akka.actor.{ActorRef, ActorSystem, Props, SupervisorStrategy}
Expand Down Expand Up @@ -93,12 +94,31 @@ class Setup(datadir: File,
case None => Databases.sqliteJDBC(chaindir)
}

/**
* This counter holds the current blockchain height.
* It is mainly used to calculate htlc expiries.
* The value is read by all actors, hence it needs to be thread-safe.
*/
val blockCount = new AtomicLong(0)

/**
* This holds the current feerates, in satoshi-per-kilobytes.
* The value is read by all actors, hence it needs to be thread-safe.
*/
val feeratesPerKB = new AtomicReference[FeeratesPerKB](null)

/**
* This holds the current feerates, in satoshi-per-kw.
* The value is read by all actors, hence it needs to be thread-safe.
*/
val feeratesPerKw = new AtomicReference[FeeratesPerKw](null)

val feeEstimator = new FeeEstimator {
override def getFeeratePerKb(target: Int): Long = Globals.feeratesPerKB.get().feePerBlock(target)
override def getFeeratePerKw(target: Int): Long = Globals.feeratesPerKw.get().feePerBlock(target)
override def getFeeratePerKb(target: Int): Long = feeratesPerKB.get().feePerBlock(target)
override def getFeeratePerKw(target: Int): Long = feeratesPerKw.get().feePerBlock(target)
}

val nodeParams = NodeParams.makeNodeParams(config, keyManager, initTor(), database, feeEstimator)
val nodeParams = NodeParams.makeNodeParams(config, keyManager, initTor(), database, blockCount, feeEstimator)

val serverBindingAddress = new InetSocketAddress(
config.getString("server.binding-ip"),
Expand Down Expand Up @@ -170,7 +190,7 @@ class Setup(datadir: File,
val stream = classOf[Setup].getResourceAsStream(addressesFile)
ElectrumClientPool.readServerAddresses(stream, sslEnabled)
}
val electrumClient = system.actorOf(SimpleSupervisor.props(Props(new ElectrumClientPool(addresses)), "electrum-client", SupervisorStrategy.Resume))
val electrumClient = system.actorOf(SimpleSupervisor.props(Props(new ElectrumClientPool(blockCount, addresses)), "electrum-client", SupervisorStrategy.Resume))
Electrum(electrumClient)
}

Expand All @@ -193,8 +213,8 @@ class Setup(datadir: File,
blocks_72 = config.getLong("on-chain-fees.default-feerates.72"),
blocks_144 = config.getLong("on-chain-fees.default-feerates.144")
)
Globals.feeratesPerKB.set(confDefaultFeerates)
Globals.feeratesPerKw.set(FeeratesPerKw(confDefaultFeerates))
feeratesPerKB.set(confDefaultFeerates)
feeratesPerKw.set(FeeratesPerKw(confDefaultFeerates))
confDefaultFeerates
}
minFeeratePerByte = config.getLong("min-feerate")
Expand All @@ -208,10 +228,10 @@ class Setup(datadir: File,
}
_ = system.scheduler.schedule(0 seconds, 10 minutes)(feeProvider.getFeerates.map {
case feerates: FeeratesPerKB =>
Globals.feeratesPerKB.set(feerates)
Globals.feeratesPerKw.set(FeeratesPerKw(feerates))
system.eventStream.publish(CurrentFeerates(Globals.feeratesPerKw.get))
logger.info(s"current feeratesPerKB=${Globals.feeratesPerKB.get()} feeratesPerKw=${Globals.feeratesPerKw.get()}")
feeratesPerKB.set(feerates)
feeratesPerKw.set(FeeratesPerKw(feerates))
system.eventStream.publish(CurrentFeerates(feeratesPerKw.get))
logger.info(s"current feeratesPerKB=${feeratesPerKB.get()} feeratesPerKw=${feeratesPerKw.get()}")
feeratesRetrieved.trySuccess(Done)
})
_ <- feeratesRetrieved.future
Expand All @@ -220,11 +240,11 @@ class Setup(datadir: File,
case Bitcoind(bitcoinClient) =>
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqblock"), Some(zmqBlockConnected))), "zmqblock", SupervisorStrategy.Restart))
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqtx"), Some(zmqTxConnected))), "zmqtx", SupervisorStrategy.Restart))
system.actorOf(SimpleSupervisor.props(ZmqWatcher.props(new ExtendedBitcoinClient(new BatchingBitcoinJsonRPCClient(bitcoinClient))), "watcher", SupervisorStrategy.Resume))
system.actorOf(SimpleSupervisor.props(ZmqWatcher.props(blockCount, new ExtendedBitcoinClient(new BatchingBitcoinJsonRPCClient(bitcoinClient))), "watcher", SupervisorStrategy.Resume))
case Electrum(electrumClient) =>
zmqBlockConnected.success(Done)
zmqTxConnected.success(Done)
system.actorOf(SimpleSupervisor.props(Props(new ElectrumWatcher(electrumClient)), "watcher", SupervisorStrategy.Resume))
system.actorOf(SimpleSupervisor.props(Props(new ElectrumWatcher(blockCount, electrumClient)), "watcher", SupervisorStrategy.Resume))
}

router = system.actorOf(SimpleSupervisor.props(Router.props(nodeParams, watcher, Some(routerInitialized)), "router", SupervisorStrategy.Resume))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
package fr.acinq.eclair.blockchain.bitcoind

import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicLong

import akka.actor.{Actor, ActorLogging, Cancellable, Props, Terminated}
import akka.pattern.pipe
import fr.acinq.bitcoin._
import fr.acinq.eclair.Globals
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient
import fr.acinq.eclair.channel.BITCOIN_PARENT_TX_CONFIRMED
Expand All @@ -39,7 +39,7 @@ import scala.util.Try
* - also uses bitcoin-core rpc api, most notably for tx confirmation count and blockcount (because reorgs)
* Created by PM on 21/02/2016.
*/
class ZmqWatcher(client: ExtendedBitcoinClient)(implicit ec: ExecutionContext = ExecutionContext.global) extends Actor with ActorLogging {
class ZmqWatcher(blockCount: AtomicLong, client: ExtendedBitcoinClient)(implicit ec: ExecutionContext = ExecutionContext.global) extends Actor with ActorLogging {

import ZmqWatcher._

Expand Down Expand Up @@ -80,7 +80,7 @@ class ZmqWatcher(client: ExtendedBitcoinClient)(implicit ec: ExecutionContext =
client.getBlockCount.map {
case count =>
log.debug(s"setting blockCount=$count")
Globals.blockCount.set(count)
blockCount.set(count)
context.system.eventStream.publish(CurrentBlockCount(count))
}
// TODO: beware of the herd effect
Expand Down Expand Up @@ -151,7 +151,7 @@ class ZmqWatcher(client: ExtendedBitcoinClient)(implicit ec: ExecutionContext =
context become watching(watches + w, addWatchedUtxos(watchedUtxos, w), block2tx, nextTick)

case PublishAsap(tx) =>
val blockCount = Globals.blockCount.get()
val blockCount = this.blockCount.get()
val cltvTimeout = Scripts.cltvTimeout(tx)
val csvTimeout = Scripts.csvTimeout(tx)
if (csvTimeout > 0) {
Expand All @@ -168,7 +168,7 @@ class ZmqWatcher(client: ExtendedBitcoinClient)(implicit ec: ExecutionContext =

case WatchEventConfirmed(BITCOIN_PARENT_TX_CONFIRMED(tx), blockHeight, _, _) =>
log.info(s"parent tx of txid=${tx.txid} has been confirmed")
val blockCount = Globals.blockCount.get()
val blockCount = this.blockCount.get()
val csvTimeout = Scripts.csvTimeout(tx)
val absTimeout = blockHeight + csvTimeout
if (absTimeout > blockCount) {
Expand Down Expand Up @@ -226,7 +226,7 @@ class ZmqWatcher(client: ExtendedBitcoinClient)(implicit ec: ExecutionContext =

object ZmqWatcher {

def props(client: ExtendedBitcoinClient)(implicit ec: ExecutionContext = ExecutionContext.global) = Props(new ZmqWatcher(client)(ec))
def props(blockCount: AtomicLong, client: ExtendedBitcoinClient)(implicit ec: ExecutionContext = ExecutionContext.global) = Props(new ZmqWatcher(blockCount, client)(ec))

case object TickNewBlock

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package fr.acinq.eclair.blockchain.electrum

import java.io.InputStream
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicLong

import akka.actor.{Actor, ActorRef, FSM, OneForOneStrategy, Props, SupervisorStrategy, Terminated}
import fr.acinq.bitcoin.BlockHeader
import fr.acinq.eclair.Globals
import fr.acinq.eclair.blockchain.CurrentBlockCount
import fr.acinq.eclair.blockchain.electrum.ElectrumClient.SSL
import fr.acinq.eclair.blockchain.electrum.ElectrumClientPool.ElectrumServerAddress
Expand All @@ -32,7 +32,7 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.Random

class ElectrumClientPool(serverAddresses: Set[ElectrumServerAddress])(implicit val ec: ExecutionContext) extends Actor with FSM[ElectrumClientPool.State, ElectrumClientPool.Data] {
class ElectrumClientPool(blockCount: AtomicLong, serverAddresses: Set[ElectrumServerAddress])(implicit val ec: ExecutionContext) extends Actor with FSM[ElectrumClientPool.State, ElectrumClientPool.Data] {
import ElectrumClientPool._

val statusListeners = collection.mutable.HashSet.empty[ActorRef]
Expand Down Expand Up @@ -166,10 +166,10 @@ class ElectrumClientPool(serverAddresses: Set[ElectrumServerAddress])(implicit v

private def updateBlockCount(blockCount: Long): Unit = {
// when synchronizing we don't want to advertise previous blocks
if (Globals.blockCount.get() < blockCount) {
if (this.blockCount.get() < blockCount) {
log.debug("current blockchain height={}", blockCount)
context.system.eventStream.publish(CurrentBlockCount(blockCount))
Globals.blockCount.set(blockCount)
this.blockCount.set(blockCount)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@

package fr.acinq.eclair.blockchain.electrum

import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicLong

import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props, Stash, Terminated}
import akka.actor.{Actor, ActorLogging, ActorRef, Stash, Terminated}
import fr.acinq.bitcoin.{BlockHeader, ByteVector32, Script, Transaction, TxIn, TxOut}
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.blockchain.electrum.ElectrumClient.{SSL, computeScriptHash}
import fr.acinq.eclair.channel.{BITCOIN_FUNDING_DEPTHOK, BITCOIN_FUNDING_SPENT, BITCOIN_PARENT_TX_CONFIRMED}
import fr.acinq.eclair.blockchain.electrum.ElectrumClient.computeScriptHash
import fr.acinq.eclair.channel.BITCOIN_PARENT_TX_CONFIRMED
import fr.acinq.eclair.transactions.Scripts
import fr.acinq.eclair.{Globals, LongToBtcAmount, ShortChannelId, TxCoordinates}
import fr.acinq.eclair.{LongToBtcAmount, ShortChannelId, TxCoordinates}

import scala.collection.SortedMap
import scala.collection.immutable.Queue

class ElectrumWatcher(client: ActorRef) extends Actor with Stash with ActorLogging {

class ElectrumWatcher(blockCount: AtomicLong, client: ActorRef) extends Actor with Stash with ActorLogging {

client ! ElectrumClient.AddStatusListener(self)

Expand Down Expand Up @@ -162,7 +163,7 @@ class ElectrumWatcher(client: ActorRef) extends Actor with Stash with ActorLoggi
case ElectrumClient.ServerError(ElectrumClient.GetTransaction(txid, Some(origin: ActorRef)), _) => origin ! GetTxWithMetaResponse(txid, None, tip.time)

case PublishAsap(tx) =>
val blockCount = Globals.blockCount.get()
val blockCount = this.blockCount.get()
val cltvTimeout = Scripts.cltvTimeout(tx)
val csvTimeout = Scripts.csvTimeout(tx)
if (csvTimeout > 0) {
Expand All @@ -183,7 +184,7 @@ class ElectrumWatcher(client: ActorRef) extends Actor with Stash with ActorLoggi

case WatchEventConfirmed(BITCOIN_PARENT_TX_CONFIRMED(tx), blockHeight, _, _) =>
log.info(s"parent tx of txid=${tx.txid} has been confirmed")
val blockCount = Globals.blockCount.get()
val blockCount = this.blockCount.get()
val csvTimeout = Scripts.csvTimeout(tx)
val absTimeout = blockHeight + csvTimeout
if (absTimeout > blockCount) {
Expand Down Expand Up @@ -211,38 +212,3 @@ class ElectrumWatcher(client: ActorRef) extends Actor with Stash with ActorLoggi
}

}

object ElectrumWatcher extends App {

val system = ActorSystem()

import scala.concurrent.ExecutionContext.Implicits.global

class Root extends Actor with ActorLogging {
val client = context.actorOf(Props(new ElectrumClient(new InetSocketAddress("localhost", 51000), ssl = SSL.OFF)), "client")
client ! ElectrumClient.AddStatusListener(self)

override def unhandled(message: Any): Unit = {
super.unhandled(message)
log.warning(s"unhandled message $message")
}

def receive = {
case ElectrumClient.ElectrumReady(_, _, _) =>
log.info(s"starting watcher")
context become running(context.actorOf(Props(new ElectrumWatcher(client)), "watcher"))
}

def running(watcher: ActorRef): Receive = {
case watch: Watch => watcher forward watch
}
}

val root = system.actorOf(Props[Root], "root")
val scanner = new java.util.Scanner(System.in)
while (true) {
val tx = Transaction.read(scanner.nextLine())
root ! WatchSpent(root, tx.txid, 0, tx.txOut(0).publicKeyScript, BITCOIN_FUNDING_SPENT)
root ! WatchConfirmed(root, tx.txid, tx.txOut(0).publicKeyScript, 4L, BITCOIN_FUNDING_DEPTHOK)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
handleCommandError(AddHtlcFailed(d.channelId, c.paymentHash, error, origin(c), Some(d.channelUpdate), Some(c)), c)

case Event(c: CMD_ADD_HTLC, d: DATA_NORMAL) =>
Try(Commitments.sendAdd(d.commitments, c, origin(c))) match {
Try(Commitments.sendAdd(d.commitments, c, origin(c), nodeParams.currentBlockHeight)) match {
case Success(Right((commitments1, add))) =>
if (c.commit) self ! CMD_SIGN
handleCommandSuccess(sender, d.copy(commitments = commitments1)) sending add
Expand Down
Loading

0 comments on commit 2fbf46a

Please sign in to comment.