From c925626d4ff18d828bdd01d8d9fb46359e2aa019 Mon Sep 17 00:00:00 2001 From: ZiluTian Date: Mon, 2 Sep 2024 16:07:45 +0200 Subject: [PATCH] Update --- Akka/src/main/scala/API/AkkaExp.scala | 69 +++++---- Akka/src/main/scala/API/Optimization.scala | 2 - Akka/src/main/scala/API/Simulate.scala | 139 ++++++++---------- Akka/src/main/scala/API/SimulateUntil.scala | 82 ----------- Akka/src/main/scala/core/DriverImpl.scala | 11 +- .../main/scala/core/LogControllerImpl.scala | 21 +-- Akka/src/main/scala/core/WorkerImpl.scala | 15 +- Akka/src/test/scala/GoLTileTest.scala | 3 +- Akka/src/test/scala/piccoloTest.scala | 7 +- Akka/src/test/scala/shortestPathTest.scala | 11 +- Akka/src/test/scala/simulateUntilTest.scala | 8 +- .../main/scala/meta/API/SimulationData.scala | 50 +++++++ .../scala/meta/API/SimulationSnapshot.scala | 11 -- 13 files changed, 189 insertions(+), 240 deletions(-) delete mode 100644 Akka/src/main/scala/API/SimulateUntil.scala create mode 100644 core/src/main/scala/meta/API/SimulationData.scala delete mode 100644 core/src/main/scala/meta/API/SimulationSnapshot.scala diff --git a/Akka/src/main/scala/API/AkkaExp.scala b/Akka/src/main/scala/API/AkkaExp.scala index 1f22bba7..19fdde55 100644 --- a/Akka/src/main/scala/API/AkkaExp.scala +++ b/Akka/src/main/scala/API/AkkaExp.scala @@ -2,6 +2,7 @@ package simulation.akka.API import akka.cluster.typed.Cluster import meta.runtime.Actor +import meta.API.{SimulationDataBuilder, TimeseriesBuilder} import com.typesafe.config.ConfigFactory import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue} import scala.collection.JavaConversions._ @@ -9,26 +10,22 @@ import akka.actor.typed.{Behavior} import akka.actor.typed.scaladsl.Behaviors import akka.actor.NoSerializationVerificationNeeded -// import akka.actor.typed.DispatcherSelector - object AkkaExp { sealed trait Command extends NoSerializationVerificationNeeded - final case class SpawnDriver(totalWorkers: Int, totalTurn: Long) extends Command - final case class SpawnWorker(workerId: Int, sims: Seq[Actor], totalWorkers: Int) extends Command + final case class SpawnDriver(totalWorkers: Int, totalTurn: Long, logControllerOn: Boolean) extends Command + final case class SpawnWorker(workerId: Int, sims: Seq[Actor], totalWorkers: Int, logControllerOn: Boolean) extends Command final case class SpawnLogController(totalWorkers: Int) extends Command final case class DriverStopped() extends Command final case class WorkerStopped(workerId: Int, sims: Seq[Actor]) extends Command final case class LogControllerStopped() extends Command - + var cluster: Cluster = null var totalWorkers: Int = 0 val stoppedWorkers: ConcurrentLinkedQueue[Int] = new ConcurrentLinkedQueue[Int]() var activeWorkers: ConcurrentLinkedQueue[Int] = new ConcurrentLinkedQueue[Int]() var finalAgents: ConcurrentLinkedQueue[Actor] = new ConcurrentLinkedQueue[Actor]() - val defaultHaltCond: Iterable[Iterable[Serializable]] => Boolean = (x: Iterable[Iterable[Serializable]]) => false - var haltCond: Iterable[Iterable[Serializable]] => Boolean = null - def materializedMachine(mid: Int, totalTurn: Long, totalWorkers: Int, actors: IndexedSeq[Actor]=Vector[Actor]()): Behavior[Command] = + def materializedMachine(mid: Int, totalTurn: Long, totalWorkers: Int, builder: SimulationDataBuilder, actors: IndexedSeq[Actor]): Behavior[Command] = Behaviors.setup { ctx => cluster = Cluster(ctx.system) this.totalWorkers = totalWorkers @@ -49,13 +46,13 @@ object AkkaExp { } else { actors.slice(i*actorsPerWorker, (i+1)*actorsPerWorker) } - ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers) + ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers, false) } - waitTillFinish(Vector.empty) + // simulateUntil supports only Standalone mode for now + waitTillFinish(Vector.empty, builder, None) } - def apply(totalTurn: Long, totalWorkers: Int, actors: IndexedSeq[Actor]=Vector[Actor](), - cond: Iterable[Iterable[Serializable]] => Boolean = defaultHaltCond): Behavior[Command] = + def apply(totalTurn: Long, totalWorkers: Int, builder: SimulationDataBuilder, actors: IndexedSeq[Actor], haltCond: Option[Iterable[Iterable[Serializable]] => Boolean]): Behavior[Command] = Behaviors.setup { ctx => cluster = Cluster(ctx.system) this.totalWorkers = totalWorkers @@ -63,16 +60,14 @@ object AkkaExp { val totalActors = actors.size var actorsPerWorker = totalActors/totalWorkers - if (cond != defaultHaltCond) { - haltCond = cond - } - stoppedWorkers.clear() activeWorkers.clear() finalAgents.clear() ctx.log.debug(f"${actorsPerWorker} actors per worker") + val logControllerOn = haltCond.isDefined || builder.isInstanceOf[TimeseriesBuilder] + // Worker id is 0-indexed if (roles.exists(p => p.startsWith("Worker"))) { ctx.log.debug(f"Creating a worker!") @@ -82,7 +77,7 @@ object AkkaExp { } else { actors.slice(wid*actorsPerWorker, (wid+1)*actorsPerWorker) } - ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers) + ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers, logControllerOn) } // Machine id is 0-indexed @@ -97,56 +92,60 @@ object AkkaExp { } else { actors.slice(wid*actorsPerWorker, (wid+1)*actorsPerWorker) } - ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers) + ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers, logControllerOn) } } if (cluster.selfMember.hasRole("Driver")) { ctx.log.debug(f"Creating a driver!") - ctx.self ! SpawnDriver(totalWorkers, totalTurn) + ctx.self ! SpawnDriver(totalWorkers, totalTurn, logControllerOn) // Co-locate the log controller with driver - if (simulation.akka.API.OptimizationConfig.logControllerEnabled) { + if (logControllerOn) { ctx.self ! SpawnLogController(totalWorkers) } } if (cluster.selfMember.hasRole("Standalone")) { ctx.log.debug(f"Standalone mode") - ctx.self ! SpawnDriver(totalWorkers, totalTurn) - if (simulation.akka.API.OptimizationConfig.logControllerEnabled) { + ctx.self ! SpawnDriver(totalWorkers, totalTurn, logControllerOn) + + if (logControllerOn) { ctx.self ! SpawnLogController(totalWorkers) } + for (i <- Range(0, totalWorkers)){ val containedAgents = if (i == totalWorkers-1){ actors.slice(i*actorsPerWorker, totalActors) } else { actors.slice(i*actorsPerWorker, (i+1)*actorsPerWorker) } - ctx.self ! SpawnWorker(i, containedAgents, totalWorkers) + ctx.self ! SpawnWorker(i, containedAgents, totalWorkers, logControllerOn) } } - waitTillFinish(Vector.empty) + waitTillFinish(Vector.empty, builder, haltCond) } - def waitTillFinish(finalAgents: IndexedSeq[Actor]): Behavior[Command] = { + def waitTillFinish(finalAgents: IndexedSeq[Actor], builder: SimulationDataBuilder, haltCond: Option[Iterable[Iterable[Serializable]] => Boolean]): Behavior[Command] = { Behaviors.receive { (ctx, message) => message match { - case SpawnDriver(totalWorkers, totalTurn) => - val driver = ctx.spawn((new simulation.akka.core.Driver).apply(totalWorkers, totalTurn), "driver") + case SpawnDriver(totalWorkers, totalTurn, logControllerOn) => + val driver = ctx.spawn((new simulation.akka.core.Driver).apply(totalWorkers, totalTurn, logControllerOn), "driver") ctx.watchWith(driver, DriverStopped()) Behaviors.same case SpawnLogController(totalWorkers) => - val logController = if (haltCond != null) { - ctx.spawn((new simulation.akka.core.LogController).apply(totalWorkers, haltCond), "logController") + val logController = if (haltCond.isDefined) { + // ctx.log.info("Conditional termination is defined!") + ctx.spawn((new simulation.akka.core.LogController).apply(totalWorkers, haltCond.get, builder), "logController") } else { - ctx.spawn((new simulation.akka.core.LogController).apply(totalWorkers), "logController") + // ctx.log.info("Conditional termination is nto defined!") + ctx.spawn((new simulation.akka.core.LogController).apply(totalWorkers, builder), "logController") } ctx.watchWith(logController, LogControllerStopped()) Behaviors.same - case SpawnWorker(workerId, agents, totalWorkers) => - val sim = ctx.spawn((new simulation.akka.core.Worker).apply(workerId, agents, totalWorkers), f"worker${workerId}") + case SpawnWorker(workerId, agents, totalWorkers, logControllerOn) => + val sim = ctx.spawn((new simulation.akka.core.Worker).apply(workerId, agents, totalWorkers, logControllerOn), f"worker${workerId}") activeWorkers.add(workerId) ctx.watchWith(sim, WorkerStopped(workerId, agents)) Behaviors.same @@ -175,12 +174,12 @@ object AkkaExp { if (!stoppedWorkers.contains(workerId)){ stoppedWorkers.add(workerId) if (activeWorkers.toSet.diff(stoppedWorkers.toSet).isEmpty){ - Simulate.addStoppedAgents(finalAgents ++ agents) + builder.addAgents(finalAgents ++ agents) Behaviors.stopped {() => ctx.system.terminate() } } else { - waitTillFinish(finalAgents ++ agents) + waitTillFinish(finalAgents ++ agents, builder, haltCond) } } else { if (activeWorkers.toSet.diff(stoppedWorkers.toSet).isEmpty){ @@ -188,7 +187,7 @@ object AkkaExp { ctx.system.terminate() } } else { - waitTillFinish(finalAgents) + waitTillFinish(finalAgents, builder, haltCond) } } } else { diff --git a/Akka/src/main/scala/API/Optimization.scala b/Akka/src/main/scala/API/Optimization.scala index 87176a34..61d48ba9 100644 --- a/Akka/src/main/scala/API/Optimization.scala +++ b/Akka/src/main/scala/API/Optimization.scala @@ -7,8 +7,6 @@ case object MergedWorker extends Optimization object OptimizationConfig { var conf: Optimization = MergedWorker - var logControllerEnabled: Boolean = false - var timeseriesSchema: SimulationTimeseries = FullTimeseries // todo: tmp, fix with proper availability input diff --git a/Akka/src/main/scala/API/Simulate.scala b/Akka/src/main/scala/API/Simulate.scala index 9c39a52b..146a739e 100644 --- a/Akka/src/main/scala/API/Simulate.scala +++ b/Akka/src/main/scala/API/Simulate.scala @@ -1,101 +1,88 @@ package simulation.akka.API import com.typesafe.config.ConfigFactory -import meta.API.SimulationSnapshot +import meta.API.{SimulationData, SimulationDataBuilder, SnapshotBuilder, TimeseriesBuilder} import meta.runtime.{Actor, Message} import scala.concurrent.Await import scala.concurrent.duration._ import akka.actor.typed.ActorSystem object Simulate { - private var stoppedAgents = IndexedSeq[Actor]() + def apply(actors: IndexedSeq[Actor], totalTurn: Long, conf: Map[String, Any], cond: Option[Iterable[Iterable[Serializable]] => Boolean] = None): SimulationData = { - var lastWords: IndexedSeq[Message] = IndexedSeq[Message]() + require(conf.isDefinedAt("role")) // Standalone, Driver, Machine-$id + require(conf.isDefinedAt("port")) // network port + require(conf.isDefinedAt("name")) // name of the actor system, to allow concurrent simulations + require(conf.isDefinedAt("data")) // timeseries or snapshot - def addStoppedAgents(agents: IndexedSeq[Actor]): Unit = { - stoppedAgents = agents - } - - var timeseries: Iterable[Iterable[Serializable]] = null - - def initialize(): Unit = { - stoppedAgents=IndexedSeq[Actor]() - lastWords=IndexedSeq[Message]() - } - - def driver(totalTurn: Long, port: Int = 25251): SimulationSnapshot = { - initialize() + val role: String = conf("role").asInstanceOf[String] + val port: Int = conf("port").asInstanceOf[Int] + val name: String = conf("name").asInstanceOf[String] + val dataConf: String = conf("data").asInstanceOf[String] - val config = ConfigFactory.parseString(s""" - akka.remote.artery.canonical.port=$port - akka.cluster.roles = [Driver] - """).withFallback(ConfigFactory.load("application")) - // If there are more workers than agents, then set the worker number to the same as agents - val workersPerMachine: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.workers-per-machine").render().toInt - val totalMachines: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.total-machines").render().toInt - var totalWorkers = workersPerMachine * totalMachines - println(f"${totalMachines} total machines, ${totalWorkers} total workers") - val actorSystem = ActorSystem(AkkaExp(totalTurn, totalWorkers), "SimsCluster", config) - Await.ready(actorSystem.whenTerminated, 10.days) - println("Simulation ends!") - SimulationSnapshot(stoppedAgents, lastWords) - } - - - // Materialized (actors are all containedActors) - def machine(mid: Int, actors: IndexedSeq[Actor], totalTurn: Long, port: Int = 0): SimulationSnapshot = { - initialize() - val config = ConfigFactory.parseString(s""" - akka.remote.artery.canonical.port=$port - akka.cluster.roles = [Machine-$mid] - """).withFallback(ConfigFactory.load("application")) - // If there are more workers than agents, then set the worker number to the same as agents - val workersPerMachine: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.workers-per-machine").render().toInt - val totalMachines: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.total-machines").render().toInt - var totalWorkers = workersPerMachine * totalMachines - println(f"${totalMachines} total machines, ${totalWorkers} total workers") - - val actorSystem = ActorSystem(AkkaExp.materializedMachine(mid, totalTurn, totalWorkers, actors), "SimsCluster", config) - Await.ready(actorSystem.whenTerminated, 10.days) - println("Simulation ends!") - SimulationSnapshot(stoppedAgents, lastWords) - } - - def apply(actors: IndexedSeq[Actor], totalTurn: Long, - role: String= "Standalone", port: Int = 25251): SimulationSnapshot = { - initialize() - val config = ConfigFactory.parseString(s""" - akka.remote.artery.canonical.port=$port - akka.cluster.roles = [$role] - """).withFallback(ConfigFactory.load("application")) - // If there are more workers than agents, then set the worker number to the same as agents val workersPerMachine: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.workers-per-machine").render().toInt val totalMachines: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.total-machines").render().toInt var totalWorkers = workersPerMachine * totalMachines + println(f"${totalMachines} total machines, ${totalWorkers} total workers, and ${actors.size} actors") - // well-formedness check - val machinePrefix = "Machine-" - val workerPrefix = "Worker-" - try { - role match { - case "Standalone" => totalWorkers = workersPerMachine // ignore totalMachine setting - case "Driver" => - case s if s.startsWith(machinePrefix) && s.stripPrefix(machinePrefix).toInt < totalMachines => - case s if s.startsWith(workerPrefix) && s.stripPrefix(workerPrefix).toInt < totalWorkers => - case _ => throw new Exception("Invalid role!") - } - } catch { - case e: Exception => throw new Exception(f"Invalid role ${role}. Available roles are Standalone, Driver, Machine-id, or Worker-id. Replacing id with 0-based int (less than total machines or workers)") - } if (totalWorkers > actors.size){ println(f"Found more workers than agents! Set total workers from ${totalWorkers} to ${actors.size}") totalWorkers = actors.size } - val actorSystem = ActorSystem(AkkaExp(totalTurn, totalWorkers, actors), "SimsCluster", config) + val machinePrefix = "Machine-" + val builder: SimulationDataBuilder = if (dataConf == "timeseries") { + new TimeseriesBuilder() + } else { + new SnapshotBuilder() + } + + val ip: String = conf.getOrElse("ip", "localhost").asInstanceOf[String] + + val actorSystem = role match { + case "Standalone" => { + // local mode + val config = ConfigFactory.parseString(s""" + akka.remote.artery.canonical.port=$port + akka.remote.artery.canonical.hostname=localhost + akka.cluster.roles = [$role] + akka.cluster.seed-nodes = ["akka://$name@localhost:$port"] + """).withFallback(ConfigFactory.load("application")) + ActorSystem(AkkaExp(totalTurn, workersPerMachine, builder, actors, cond), name, config) + } + case "Driver" => { + require(conf.isDefinedAt("ip")) + // By default, driver is also the seed node + val config = ConfigFactory.parseString(s""" + akka.remote.artery.canonical.hostname=$ip + akka.remote.artery.canonical.port=$port + akka.cluster.roles = [$role] + akka.cluster.seed-nodes = ["akka://$name@$ip:$port"] + """).withFallback(ConfigFactory.load("application")) + ActorSystem(AkkaExp(totalTurn, totalWorkers, builder, Vector[Actor](), None), name, config) + } + case s if s.startsWith(machinePrefix) => { + require(conf.isDefinedAt("ip")) + require(conf.isDefinedAt("seed")) // ip:port + val seed: String = conf("seed").asInstanceOf[String] + val config = ConfigFactory.parseString(s""" + akka.remote.artery.canonical.hostname=$ip + akka.remote.artery.canonical.port=$port + akka.cluster.roles = [$role] + akka.cluster.seed-nodes = ["akka://$name@$seed"] + """).withFallback(ConfigFactory.load("application")) + + // 0-based + val mid = s.stripPrefix(machinePrefix).toInt + assert(mid < totalMachines) + ActorSystem(AkkaExp.materializedMachine(mid, totalTurn, totalWorkers, builder, actors), name, config) + } + case _ => throw new Exception("Invalid role! Supported roles are Standalone, Driver, and Machine-$id (o-based)") + } Await.ready(actorSystem.whenTerminated, 10.days) + println("Simulation ends!") - SimulationSnapshot(stoppedAgents, lastWords) + builder.build() } -} +} \ No newline at end of file diff --git a/Akka/src/main/scala/API/SimulateUntil.scala b/Akka/src/main/scala/API/SimulateUntil.scala deleted file mode 100644 index 7b0168fa..00000000 --- a/Akka/src/main/scala/API/SimulateUntil.scala +++ /dev/null @@ -1,82 +0,0 @@ -package simulation.akka.API - -import com.typesafe.config.ConfigFactory -import meta.API.SimulationSnapshot -import meta.runtime.{Actor, Message} -import scala.concurrent.Await -import scala.concurrent.duration._ -import akka.actor.typed.ActorSystem - -class SimulateUntil { - OptimizationConfig.logControllerEnabled = true - var timeseries: Iterable[Iterable[Serializable]] = null - - def driver(totalTurn: Long, port: Int = 25251): Unit = { - val config = ConfigFactory.parseString(s""" - akka.remote.artery.canonical.port=$port - akka.cluster.roles = [Driver] - """).withFallback(ConfigFactory.load("application")) - // If there are more workers than agents, then set the worker number to the same as agents - val workersPerMachine: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.workers-per-machine").render().toInt - val totalMachines: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.total-machines").render().toInt - var totalWorkers = workersPerMachine * totalMachines - println(f"${totalMachines} total machines, ${totalWorkers} total workers") - val actorSystem = ActorSystem(AkkaExp(totalTurn, totalWorkers), "SimsCluster", config) - Await.ready(actorSystem.whenTerminated, 10.days) - println("Simulation ends!") - } - - - // Materialized (actors are all containedActors) - def machine(mid: Int, actors: IndexedSeq[Actor], totalTurn: Long, port: Int = 0): Unit = { - val config = ConfigFactory.parseString(s""" - akka.remote.artery.canonical.port=$port - akka.cluster.roles = [Machine-$mid] - """).withFallback(ConfigFactory.load("application")) - // If there are more workers than agents, then set the worker number to the same as agents - val workersPerMachine: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.workers-per-machine").render().toInt - val totalMachines: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.total-machines").render().toInt - var totalWorkers = workersPerMachine * totalMachines - println(f"${totalMachines} total machines, ${totalWorkers} total workers") - - val actorSystem = ActorSystem(AkkaExp.materializedMachine(mid, totalTurn, totalWorkers, actors), "SimsCluster", config) - Await.ready(actorSystem.whenTerminated, 10.days) - println("Simulation ends!") - } - - def apply(actors: IndexedSeq[Actor], totalTurn: Long, - cond: Iterable[Iterable[Serializable]] => Boolean, role: String= "Standalone", port: Int = 25251): Unit = { - val config = ConfigFactory.parseString(s""" - akka.remote.artery.canonical.port=$port - akka.cluster.roles = [$role] - """).withFallback(ConfigFactory.load("application")) - // If there are more workers than agents, then set the worker number to the same as agents - val workersPerMachine: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.workers-per-machine").render().toInt - val totalMachines: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.total-machines").render().toInt - var totalWorkers = workersPerMachine * totalMachines - println(f"${totalMachines} total machines, ${totalWorkers} total workers, and ${actors.size} actors") - // well-formedness check - val machinePrefix = "Machine-" - val workerPrefix = "Worker-" - try { - role match { - case "Standalone" => totalWorkers = workersPerMachine // ignore totalMachine setting - case "Driver" => - case s if s.startsWith(machinePrefix) && s.stripPrefix(machinePrefix).toInt < totalMachines => - case s if s.startsWith(workerPrefix) && s.stripPrefix(workerPrefix).toInt < totalWorkers => - case _ => throw new Exception("Invalid role!") - } - } catch { - case e: Exception => throw new Exception(f"Invalid role ${role}. Available roles are Standalone, Driver, Machine-id, or Worker-id. Replacing id with 0-based int (less than total machines or workers)") - } - - if (totalWorkers > actors.size){ - println(f"Found more workers than agents! Set total workers from ${totalWorkers} to ${actors.size}") - totalWorkers = actors.size - } - - val actorSystem = ActorSystem(AkkaExp(totalTurn, totalWorkers, actors, cond), "SimsCluster", config) - Await.ready(actorSystem.whenTerminated, 10.days) - println("Simulation ends!") - } -} diff --git a/Akka/src/main/scala/core/DriverImpl.scala b/Akka/src/main/scala/core/DriverImpl.scala index ece1d68b..b2774749 100644 --- a/Akka/src/main/scala/core/DriverImpl.scala +++ b/Akka/src/main/scala/core/DriverImpl.scala @@ -24,13 +24,12 @@ class Driver { private var acceptedInterval: Int = 0 private var availability: Int = simulation.akka.API.OptimizationConfig.availability - private val logControllerEnabled = simulation.akka.API.OptimizationConfig.logControllerEnabled var start: Long = 0 var end: Long = 0 var initialStart: Long = 0 - def apply(workers: Int, maxTurn: Long): Behavior[DriverEvent] = Behaviors.setup {ctx => + def apply(workers: Int, maxTurn: Long, logControllerEnabled: Boolean): Behavior[DriverEvent] = Behaviors.setup {ctx => totalWorkers = workers totalTurn = maxTurn currentTurn = 0 @@ -71,10 +70,10 @@ class Driver { if (logControllerEnabled) { ctx.system.receptionist ! Receptionist.Subscribe(LogControllerSpec.LoggerStopServiceKey, workerSub) } - driver() + driver(logControllerEnabled) } - def driver(): Behavior[DriverEvent] = + def driver(logControllerEnabled: Boolean): Behavior[DriverEvent] = Behaviors.receive[DriverEvent] { (ctx, message) => message match { case InitializeWorkers() => @@ -109,7 +108,7 @@ class Driver { RoundEnd() }, timeout=1000.seconds).apply()) - driver() + driver(logControllerEnabled) } case RoundEnd() => @@ -125,7 +124,7 @@ class Driver { } else { ctx.self ! RoundStart() } - driver() + driver(logControllerEnabled) case LogControllerFinished() => Behaviors.stopped {() => diff --git a/Akka/src/main/scala/core/LogControllerImpl.scala b/Akka/src/main/scala/core/LogControllerImpl.scala index 4ca37b40..f4afaf75 100644 --- a/Akka/src/main/scala/core/LogControllerImpl.scala +++ b/Akka/src/main/scala/core/LogControllerImpl.scala @@ -1,5 +1,6 @@ package simulation.akka.core +import meta.API.SimulationDataBuilder import java.util.concurrent.{ConcurrentHashMap} import akka.actor.typed.receptionist.{Receptionist} import java.util.concurrent.atomic.AtomicInteger @@ -28,15 +29,15 @@ class LogController { var interruptDriver: Set[ActorRef[DriverSpec.InterruptDriver]] = Set() var haltCond: Iterable[Iterable[Serializable]] => Boolean = null - def apply(workers: Int): Behavior[LogControllerEvent] = Behaviors.setup {ctx => + def apply(workers: Int, builder: SimulationDataBuilder): Behavior[LogControllerEvent] = Behaviors.setup {ctx => totalWorkers = workers // Let workers and driver discover the log controller ctx.system.receptionist ! Receptionist.Register(LoggerAggregateServiceKey, ctx.self) ctx.system.receptionist ! Receptionist.Register(LoggerStopServiceKey, ctx.self) - logController() + logController(builder) } - def apply(workers: Int, haltCond: Iterable[Iterable[Serializable]] => Boolean): Behavior[LogControllerEvent] = Behaviors.setup {ctx => + def apply(workers: Int, haltCond: Iterable[Iterable[Serializable]] => Boolean, builder: SimulationDataBuilder): Behavior[LogControllerEvent] = Behaviors.setup {ctx => totalWorkers = workers this.haltCond = haltCond // Let workers and driver discover the log controller @@ -53,10 +54,10 @@ class LogController { } ctx.system.receptionist ! Receptionist.Subscribe(DriverSpec.InterruptDriverServiceKey, logControllerSub) - logControllerWithInterrupt() + logControllerWithInterrupt(builder) } - def logController(): Behavior[LogControllerEvent] = + def logController(builder: SimulationDataBuilder): Behavior[LogControllerEvent] = Behaviors.receive[LogControllerEvent] { (ctx, message) => message match { case AggregateLog(wid, time, agents) => @@ -76,7 +77,7 @@ class LogController { } // wait up to $timeout$ ms for the rest of log to arrive from workers if ((reducedTimeseries.containsKey(time)) || ((System.currentTimeMillis() - firstReceivedStop) > timeout)) { - simulation.akka.API.Simulate.timeseries = reducedTimeseries.toSeq.sortBy(_._1).map(_._2) + builder.addTimeseries(reducedTimeseries.toSeq.sortBy(_._1).map(_._2)) replyTo ! LogControllerFinished() Behaviors.stopped {() => ctx.log.info("Time series has " + indexedTimeseries.size + " entries") @@ -84,11 +85,11 @@ class LogController { } } ctx.self ! Stop(time, replyTo) - logController() + logController(builder) } } - def logControllerWithInterrupt(): Behavior[LogControllerEvent] = + def logControllerWithInterrupt(builder: SimulationDataBuilder): Behavior[LogControllerEvent] = Behaviors.receive[LogControllerEvent] { (ctx, message) => message match { case RegisterDriverInterrupt() => @@ -118,7 +119,7 @@ class LogController { } // wait up to $timeout$ ms for the rest of log to arrive from workers if ((reducedTimeseries.containsKey(time)) || ((System.currentTimeMillis() - firstReceivedStop) > timeout)) { - simulation.akka.API.Simulate.timeseries = timeseries.toList + builder.addTimeseries(timeseries) replyTo ! LogControllerFinished() Behaviors.stopped {() => ctx.log.info("Time series has " + indexedTimeseries.size + " entries") @@ -126,7 +127,7 @@ class LogController { } } ctx.self ! Stop(time, replyTo) - logController() + logController(builder) } } } diff --git a/Akka/src/main/scala/core/WorkerImpl.scala b/Akka/src/main/scala/core/WorkerImpl.scala index a7593cf6..bdecd393 100644 --- a/Akka/src/main/scala/core/WorkerImpl.scala +++ b/Akka/src/main/scala/core/WorkerImpl.scala @@ -36,9 +36,8 @@ class Worker { private var completedAgents: Long = 0 private var registeredWorkers: AtomicInteger = new AtomicInteger(0) - private val logControllerEnabled = simulation.akka.API.OptimizationConfig.logControllerEnabled - def apply(id: Int, sims: Seq[Actor], totalWorkers: Int): Behavior[WorkerEvent] = Behaviors.setup { ctx => + def apply(id: Int, sims: Seq[Actor], totalWorkers: Int, logControllerEnabled: Boolean): Behavior[WorkerEvent] = Behaviors.setup { ctx => localSims = sims.map(x => (x.id, x)).toMap totalAgents = sims.size this.totalWorkers = totalWorkers @@ -80,15 +79,15 @@ class Worker { if (logControllerEnabled) { ctx.system.receptionist ! Receptionist.Subscribe(LogControllerSpec.LoggerAggregateServiceKey, workerSub) } - worker() + worker(logControllerEnabled) } // Consider replacing receivedWorkers with a total workers - private def worker(): Behavior[WorkerEvent] = + private def worker(logControllerEnabled: Boolean): Behavior[WorkerEvent] = Behaviors.receive[WorkerEvent] { (ctx, message) => message match { case Prepare() => - worker() + worker(logControllerEnabled) case ReceiveAgentMap(wid, nameIds, reply) => peerWorkers.computeIfAbsent(wid, x => { @@ -129,7 +128,7 @@ class Worker { if (receivedWorkers.keys().size == totalWorkers-1){ ctx.self ! RoundStart() } - worker() + worker(logControllerEnabled) case RoundStart() => ctx.log.debug(f"Worker ${workerId} starts! Received from ${receivedWorkers.keys().toSet}") @@ -170,7 +169,7 @@ class Worker { }) } ctx.self ! AgentsCompleted() - worker() + worker(logControllerEnabled) case ExpectedReceives(replyTo, acceptedInterval, availability) => // send out messages to other workers only at the beginning of a round to avoid race condition @@ -188,7 +187,7 @@ class Worker { if (receivedWorkers.keys().size == totalWorkers-1){ ctx.self ! RoundStart() } - worker() + worker(logControllerEnabled) case AgentsCompleted() => end = System.currentTimeMillis() diff --git a/Akka/src/test/scala/GoLTileTest.scala b/Akka/src/test/scala/GoLTileTest.scala index 3292dfd7..e73a4feb 100644 --- a/Akka/src/test/scala/GoLTileTest.scala +++ b/Akka/src/test/scala/GoLTileTest.scala @@ -222,6 +222,7 @@ class GoLTileTest extends FlatSpec { agents.foreach(a => { a.msgGenerator = a.connectedAgentIds.map(i => (i, a.tile.tbs(tiles(i.toInt)))).toMap }) - val snapshot1 = API.Simulate(agents, 200) + val conf = Map("role" -> "Standalone", "port" -> 25100, "name" -> "GoLTile", "data" -> "snapshot") + val snapshot1 = API.Simulate(agents, 200, conf) } } \ No newline at end of file diff --git a/Akka/src/test/scala/piccoloTest.scala b/Akka/src/test/scala/piccoloTest.scala index a0933bc7..311d4aeb 100644 --- a/Akka/src/test/scala/piccoloTest.scala +++ b/Akka/src/test/scala/piccoloTest.scala @@ -3,15 +3,16 @@ package test import simulation.akka.API._ import org.scalatest.FlatSpec +import meta.API.{SimulationData} class piccolo extends FlatSpec { val totalRounds: Int = 100 f"The page rank algorithm with vertices, sequential workers" should f"complete" in { val agents = generated.example.piccolo.InitData() - API.OptimizationConfig.logControllerEnabled = true API.OptimizationConfig.timeseriesSchema = FullTimeseries - val snapshot1 = API.Simulate(agents, totalRounds) - API.Simulate.timeseries.foreach(t => { println(t) }) + val conf = Map("role" -> "Standalone", "port" ->25400, "name" -> "Piccolo", "data" -> "timeseries") + val ts: SimulationData = API.Simulate(agents, totalRounds, conf) + ts.timeseries.foreach(t => { println(t) }) } } \ No newline at end of file diff --git a/Akka/src/test/scala/shortestPathTest.scala b/Akka/src/test/scala/shortestPathTest.scala index 84188974..c0082039 100644 --- a/Akka/src/test/scala/shortestPathTest.scala +++ b/Akka/src/test/scala/shortestPathTest.scala @@ -3,6 +3,7 @@ package test import org.scalatest.FlatSpec import simulation.akka.API._ +import meta.API.{SimulationData, Timeseries} class shortestPath extends FlatSpec { val totalVertices: Int = 50 @@ -23,10 +24,12 @@ class shortestPath extends FlatSpec { f"The single source shortest path algorithm over a linked list with ${totalVertices} vertices, sequential workers" should f"update the distance of all vertices in ${totalVertices} rounds" in { val agents = generated.core.test.shortestPath.InitData() - API.OptimizationConfig.logControllerEnabled = true API.OptimizationConfig.timeseriesSchema = ShortestPathTimeseries - val snapshot1 = API.Simulate(agents, totalRounds) - // assert(snapshot1.sims.map(i => i.asInstanceOf[generated.core.test.shortestPath.Vertex].dist).toSet == Range(0, totalVertices).toSet) - API.Simulate.timeseries.foreach(t => { println(t) }) + val conf = Map("role" -> "Standalone", + "port" -> 25300, + "name" -> "ShortestPath", + "data" -> "timeseries") + val ts = API.Simulate(agents, totalRounds, conf) + ts.timeseries.foreach(t => { println(t) }) } } \ No newline at end of file diff --git a/Akka/src/test/scala/simulateUntilTest.scala b/Akka/src/test/scala/simulateUntilTest.scala index f5592d51..a6eb2453 100644 --- a/Akka/src/test/scala/simulateUntilTest.scala +++ b/Akka/src/test/scala/simulateUntilTest.scala @@ -13,12 +13,16 @@ class simulateUntilTest extends FlatSpec { val population = 10000 val graph = cloudcity.lib.Graph.ErdosRenyiGraph(population, 0.01) val agents = generated.example.epidemic.InitData(graph) - (new API.SimulateUntil()).apply(agents, totalRounds, (ts: Iterable[Iterable[Serializable]]) => { + val conf = Map("role" -> "Standalone", + "port" -> 25200, + "name" -> "Epidemics", + "data" -> "timeseries") + Simulate.apply(agents, totalRounds, conf, Some((ts: Iterable[Iterable[Serializable]]) => { val x = ts.last.filter(i => i match { case y: generated.example.epidemic.Person => y.health == 1 }).size println("Total infected agents: " + x) x > population / 2 - }) + })) } } \ No newline at end of file diff --git a/core/src/main/scala/meta/API/SimulationData.scala b/core/src/main/scala/meta/API/SimulationData.scala new file mode 100644 index 00000000..d8fb77f9 --- /dev/null +++ b/core/src/main/scala/meta/API/SimulationData.scala @@ -0,0 +1,50 @@ +package meta.API + +import meta.runtime.{Actor, Message} +import scala.collection.mutable.Buffer + +// The distinction is for performance +sealed trait SimulationData { + def sims: Traversable[Actor] = ??? + def messages: Traversable[Message] = ??? + def timeseries: Iterable[Iterable[Serializable]] = ??? +} + +sealed case class Snapshot(override val sims: Traversable[Actor], override val messages: Traversable[Message]=List()) extends SimulationData +sealed case class Timeseries(override val timeseries: Iterable[Iterable[Serializable]]) extends SimulationData + +sealed trait SimulationDataBuilder { + def addAgents(agents: IndexedSeq[Actor]): Unit = {} + def addMessages(msgs: IndexedSeq[Message]): Unit = {} + def addTimeseries(ts: Iterable[Iterable[Serializable]]): Unit = {} + def build(): SimulationData +} + +sealed class SnapshotBuilder extends SimulationDataBuilder { + private val p_sims: Buffer[Actor] = Buffer[Actor]() + private val p_messages: Buffer[Message] = Buffer[Message]() + + override def addAgents(agents: IndexedSeq[Actor]): Unit = { + p_sims ++= agents + } + + override def addMessages(msgs: IndexedSeq[Message]): Unit = { + p_messages ++= msgs + } + + override def build(): Snapshot = { + Snapshot(p_sims.toList, p_messages.toList) + } +} + +sealed class TimeseriesBuilder extends SimulationDataBuilder { + private var timeseries: Iterable[Iterable[Serializable]] = Iterable.empty + + override def addTimeseries(ts: Iterable[Iterable[Serializable]]): Unit = { + timeseries = timeseries ++ ts + } + + override def build(): Timeseries = { + Timeseries(timeseries) + } +} \ No newline at end of file diff --git a/core/src/main/scala/meta/API/SimulationSnapshot.scala b/core/src/main/scala/meta/API/SimulationSnapshot.scala deleted file mode 100644 index 95914e67..00000000 --- a/core/src/main/scala/meta/API/SimulationSnapshot.scala +++ /dev/null @@ -1,11 +0,0 @@ -package meta.API - -import meta.runtime.{Actor, Message} - -/** - * Capture the state of the simulation. - * @param actors the Sims at the current state - * @param messages the list of in-transit messages - */ -case class SimulationSnapshot(val sims: Traversable[Actor], val messages: Traversable[Message]=List()){ -}