Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
ZiluTian committed Sep 2, 2024
1 parent 440bc2a commit c925626
Show file tree
Hide file tree
Showing 13 changed files with 189 additions and 240 deletions.
69 changes: 34 additions & 35 deletions Akka/src/main/scala/API/AkkaExp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,30 @@ package simulation.akka.API

import akka.cluster.typed.Cluster
import meta.runtime.Actor
import meta.API.{SimulationDataBuilder, TimeseriesBuilder}
import com.typesafe.config.ConfigFactory
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue}
import scala.collection.JavaConversions._
import akka.actor.typed.{Behavior}
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.NoSerializationVerificationNeeded

// import akka.actor.typed.DispatcherSelector

object AkkaExp {
sealed trait Command extends NoSerializationVerificationNeeded
final case class SpawnDriver(totalWorkers: Int, totalTurn: Long) extends Command
final case class SpawnWorker(workerId: Int, sims: Seq[Actor], totalWorkers: Int) extends Command
final case class SpawnDriver(totalWorkers: Int, totalTurn: Long, logControllerOn: Boolean) extends Command
final case class SpawnWorker(workerId: Int, sims: Seq[Actor], totalWorkers: Int, logControllerOn: Boolean) extends Command
final case class SpawnLogController(totalWorkers: Int) extends Command
final case class DriverStopped() extends Command
final case class WorkerStopped(workerId: Int, sims: Seq[Actor]) extends Command
final case class LogControllerStopped() extends Command

var cluster: Cluster = null
var totalWorkers: Int = 0
val stoppedWorkers: ConcurrentLinkedQueue[Int] = new ConcurrentLinkedQueue[Int]()
var activeWorkers: ConcurrentLinkedQueue[Int] = new ConcurrentLinkedQueue[Int]()
var finalAgents: ConcurrentLinkedQueue[Actor] = new ConcurrentLinkedQueue[Actor]()
val defaultHaltCond: Iterable[Iterable[Serializable]] => Boolean = (x: Iterable[Iterable[Serializable]]) => false
var haltCond: Iterable[Iterable[Serializable]] => Boolean = null

def materializedMachine(mid: Int, totalTurn: Long, totalWorkers: Int, actors: IndexedSeq[Actor]=Vector[Actor]()): Behavior[Command] =
def materializedMachine(mid: Int, totalTurn: Long, totalWorkers: Int, builder: SimulationDataBuilder, actors: IndexedSeq[Actor]): Behavior[Command] =
Behaviors.setup { ctx =>
cluster = Cluster(ctx.system)
this.totalWorkers = totalWorkers
Expand All @@ -49,30 +46,28 @@ object AkkaExp {
} else {
actors.slice(i*actorsPerWorker, (i+1)*actorsPerWorker)
}
ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers)
ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers, false)
}
waitTillFinish(Vector.empty)
// simulateUntil supports only Standalone mode for now
waitTillFinish(Vector.empty, builder, None)
}

def apply(totalTurn: Long, totalWorkers: Int, actors: IndexedSeq[Actor]=Vector[Actor](),
cond: Iterable[Iterable[Serializable]] => Boolean = defaultHaltCond): Behavior[Command] =
def apply(totalTurn: Long, totalWorkers: Int, builder: SimulationDataBuilder, actors: IndexedSeq[Actor], haltCond: Option[Iterable[Iterable[Serializable]] => Boolean]): Behavior[Command] =
Behaviors.setup { ctx =>
cluster = Cluster(ctx.system)
this.totalWorkers = totalWorkers
val roles: Set[String] = cluster.selfMember.getRoles.toSet
val totalActors = actors.size
var actorsPerWorker = totalActors/totalWorkers

if (cond != defaultHaltCond) {
haltCond = cond
}

stoppedWorkers.clear()
activeWorkers.clear()
finalAgents.clear()

ctx.log.debug(f"${actorsPerWorker} actors per worker")

val logControllerOn = haltCond.isDefined || builder.isInstanceOf[TimeseriesBuilder]

// Worker id is 0-indexed
if (roles.exists(p => p.startsWith("Worker"))) {
ctx.log.debug(f"Creating a worker!")
Expand All @@ -82,7 +77,7 @@ object AkkaExp {
} else {
actors.slice(wid*actorsPerWorker, (wid+1)*actorsPerWorker)
}
ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers)
ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers, logControllerOn)
}

// Machine id is 0-indexed
Expand All @@ -97,56 +92,60 @@ object AkkaExp {
} else {
actors.slice(wid*actorsPerWorker, (wid+1)*actorsPerWorker)
}
ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers)
ctx.self ! SpawnWorker(wid, containedAgents, totalWorkers, logControllerOn)
}
}

if (cluster.selfMember.hasRole("Driver")) {
ctx.log.debug(f"Creating a driver!")
ctx.self ! SpawnDriver(totalWorkers, totalTurn)
ctx.self ! SpawnDriver(totalWorkers, totalTurn, logControllerOn)
// Co-locate the log controller with driver
if (simulation.akka.API.OptimizationConfig.logControllerEnabled) {
if (logControllerOn) {
ctx.self ! SpawnLogController(totalWorkers)
}
}

if (cluster.selfMember.hasRole("Standalone")) {
ctx.log.debug(f"Standalone mode")
ctx.self ! SpawnDriver(totalWorkers, totalTurn)
if (simulation.akka.API.OptimizationConfig.logControllerEnabled) {
ctx.self ! SpawnDriver(totalWorkers, totalTurn, logControllerOn)

if (logControllerOn) {
ctx.self ! SpawnLogController(totalWorkers)
}

for (i <- Range(0, totalWorkers)){
val containedAgents = if (i == totalWorkers-1){
actors.slice(i*actorsPerWorker, totalActors)
} else {
actors.slice(i*actorsPerWorker, (i+1)*actorsPerWorker)
}
ctx.self ! SpawnWorker(i, containedAgents, totalWorkers)
ctx.self ! SpawnWorker(i, containedAgents, totalWorkers, logControllerOn)
}
}
waitTillFinish(Vector.empty)
waitTillFinish(Vector.empty, builder, haltCond)
}

def waitTillFinish(finalAgents: IndexedSeq[Actor]): Behavior[Command] = {
def waitTillFinish(finalAgents: IndexedSeq[Actor], builder: SimulationDataBuilder, haltCond: Option[Iterable[Iterable[Serializable]] => Boolean]): Behavior[Command] = {
Behaviors.receive { (ctx, message) =>
message match {
case SpawnDriver(totalWorkers, totalTurn) =>
val driver = ctx.spawn((new simulation.akka.core.Driver).apply(totalWorkers, totalTurn), "driver")
case SpawnDriver(totalWorkers, totalTurn, logControllerOn) =>
val driver = ctx.spawn((new simulation.akka.core.Driver).apply(totalWorkers, totalTurn, logControllerOn), "driver")
ctx.watchWith(driver, DriverStopped())
Behaviors.same

case SpawnLogController(totalWorkers) =>
val logController = if (haltCond != null) {
ctx.spawn((new simulation.akka.core.LogController).apply(totalWorkers, haltCond), "logController")
val logController = if (haltCond.isDefined) {
// ctx.log.info("Conditional termination is defined!")
ctx.spawn((new simulation.akka.core.LogController).apply(totalWorkers, haltCond.get, builder), "logController")
} else {
ctx.spawn((new simulation.akka.core.LogController).apply(totalWorkers), "logController")
// ctx.log.info("Conditional termination is nto defined!")
ctx.spawn((new simulation.akka.core.LogController).apply(totalWorkers, builder), "logController")
}
ctx.watchWith(logController, LogControllerStopped())
Behaviors.same

case SpawnWorker(workerId, agents, totalWorkers) =>
val sim = ctx.spawn((new simulation.akka.core.Worker).apply(workerId, agents, totalWorkers), f"worker${workerId}")
case SpawnWorker(workerId, agents, totalWorkers, logControllerOn) =>
val sim = ctx.spawn((new simulation.akka.core.Worker).apply(workerId, agents, totalWorkers, logControllerOn), f"worker${workerId}")
activeWorkers.add(workerId)
ctx.watchWith(sim, WorkerStopped(workerId, agents))
Behaviors.same
Expand Down Expand Up @@ -175,20 +174,20 @@ object AkkaExp {
if (!stoppedWorkers.contains(workerId)){
stoppedWorkers.add(workerId)
if (activeWorkers.toSet.diff(stoppedWorkers.toSet).isEmpty){
Simulate.addStoppedAgents(finalAgents ++ agents)
builder.addAgents(finalAgents ++ agents)
Behaviors.stopped {() =>
ctx.system.terminate()
}
} else {
waitTillFinish(finalAgents ++ agents)
waitTillFinish(finalAgents ++ agents, builder, haltCond)
}
} else {
if (activeWorkers.toSet.diff(stoppedWorkers.toSet).isEmpty){
Behaviors.stopped {() =>
ctx.system.terminate()
}
} else {
waitTillFinish(finalAgents)
waitTillFinish(finalAgents, builder, haltCond)
}
}
} else {
Expand Down
2 changes: 0 additions & 2 deletions Akka/src/main/scala/API/Optimization.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ case object MergedWorker extends Optimization
object OptimizationConfig {
var conf: Optimization = MergedWorker

var logControllerEnabled: Boolean = false

var timeseriesSchema: SimulationTimeseries = FullTimeseries

// todo: tmp, fix with proper availability input
Expand Down
139 changes: 63 additions & 76 deletions Akka/src/main/scala/API/Simulate.scala
Original file line number Diff line number Diff line change
@@ -1,101 +1,88 @@
package simulation.akka.API

import com.typesafe.config.ConfigFactory
import meta.API.SimulationSnapshot
import meta.API.{SimulationData, SimulationDataBuilder, SnapshotBuilder, TimeseriesBuilder}
import meta.runtime.{Actor, Message}
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.typed.ActorSystem

object Simulate {
private var stoppedAgents = IndexedSeq[Actor]()
def apply(actors: IndexedSeq[Actor], totalTurn: Long, conf: Map[String, Any], cond: Option[Iterable[Iterable[Serializable]] => Boolean] = None): SimulationData = {

var lastWords: IndexedSeq[Message] = IndexedSeq[Message]()
require(conf.isDefinedAt("role")) // Standalone, Driver, Machine-$id
require(conf.isDefinedAt("port")) // network port
require(conf.isDefinedAt("name")) // name of the actor system, to allow concurrent simulations
require(conf.isDefinedAt("data")) // timeseries or snapshot

def addStoppedAgents(agents: IndexedSeq[Actor]): Unit = {
stoppedAgents = agents
}

var timeseries: Iterable[Iterable[Serializable]] = null

def initialize(): Unit = {
stoppedAgents=IndexedSeq[Actor]()
lastWords=IndexedSeq[Message]()
}

def driver(totalTurn: Long, port: Int = 25251): SimulationSnapshot = {
initialize()
val role: String = conf("role").asInstanceOf[String]
val port: Int = conf("port").asInstanceOf[Int]
val name: String = conf("name").asInstanceOf[String]
val dataConf: String = conf("data").asInstanceOf[String]

val config = ConfigFactory.parseString(s"""
akka.remote.artery.canonical.port=$port
akka.cluster.roles = [Driver]
""").withFallback(ConfigFactory.load("application"))
// If there are more workers than agents, then set the worker number to the same as agents
val workersPerMachine: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.workers-per-machine").render().toInt
val totalMachines: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.total-machines").render().toInt
var totalWorkers = workersPerMachine * totalMachines
println(f"${totalMachines} total machines, ${totalWorkers} total workers")
val actorSystem = ActorSystem(AkkaExp(totalTurn, totalWorkers), "SimsCluster", config)
Await.ready(actorSystem.whenTerminated, 10.days)
println("Simulation ends!")
SimulationSnapshot(stoppedAgents, lastWords)
}


// Materialized (actors are all containedActors)
def machine(mid: Int, actors: IndexedSeq[Actor], totalTurn: Long, port: Int = 0): SimulationSnapshot = {
initialize()
val config = ConfigFactory.parseString(s"""
akka.remote.artery.canonical.port=$port
akka.cluster.roles = [Machine-$mid]
""").withFallback(ConfigFactory.load("application"))
// If there are more workers than agents, then set the worker number to the same as agents
val workersPerMachine: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.workers-per-machine").render().toInt
val totalMachines: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.total-machines").render().toInt
var totalWorkers = workersPerMachine * totalMachines
println(f"${totalMachines} total machines, ${totalWorkers} total workers")

val actorSystem = ActorSystem(AkkaExp.materializedMachine(mid, totalTurn, totalWorkers, actors), "SimsCluster", config)
Await.ready(actorSystem.whenTerminated, 10.days)
println("Simulation ends!")
SimulationSnapshot(stoppedAgents, lastWords)
}

def apply(actors: IndexedSeq[Actor], totalTurn: Long,
role: String= "Standalone", port: Int = 25251): SimulationSnapshot = {
initialize()
val config = ConfigFactory.parseString(s"""
akka.remote.artery.canonical.port=$port
akka.cluster.roles = [$role]
""").withFallback(ConfigFactory.load("application"))
// If there are more workers than agents, then set the worker number to the same as agents
val workersPerMachine: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.workers-per-machine").render().toInt
val totalMachines: Int = ConfigFactory.load("driver-worker").getValue("driver-worker.total-machines").render().toInt
var totalWorkers = workersPerMachine * totalMachines

println(f"${totalMachines} total machines, ${totalWorkers} total workers, and ${actors.size} actors")
// well-formedness check
val machinePrefix = "Machine-"
val workerPrefix = "Worker-"
try {
role match {
case "Standalone" => totalWorkers = workersPerMachine // ignore totalMachine setting
case "Driver" =>
case s if s.startsWith(machinePrefix) && s.stripPrefix(machinePrefix).toInt < totalMachines =>
case s if s.startsWith(workerPrefix) && s.stripPrefix(workerPrefix).toInt < totalWorkers =>
case _ => throw new Exception("Invalid role!")
}
} catch {
case e: Exception => throw new Exception(f"Invalid role ${role}. Available roles are Standalone, Driver, Machine-id, or Worker-id. Replacing id with 0-based int (less than total machines or workers)")
}

if (totalWorkers > actors.size){
println(f"Found more workers than agents! Set total workers from ${totalWorkers} to ${actors.size}")
totalWorkers = actors.size
}

val actorSystem = ActorSystem(AkkaExp(totalTurn, totalWorkers, actors), "SimsCluster", config)
val machinePrefix = "Machine-"
val builder: SimulationDataBuilder = if (dataConf == "timeseries") {
new TimeseriesBuilder()
} else {
new SnapshotBuilder()
}

val ip: String = conf.getOrElse("ip", "localhost").asInstanceOf[String]

val actorSystem = role match {
case "Standalone" => {
// local mode
val config = ConfigFactory.parseString(s"""
akka.remote.artery.canonical.port=$port
akka.remote.artery.canonical.hostname=localhost
akka.cluster.roles = [$role]
akka.cluster.seed-nodes = ["akka://$name@localhost:$port"]
""").withFallback(ConfigFactory.load("application"))
ActorSystem(AkkaExp(totalTurn, workersPerMachine, builder, actors, cond), name, config)
}
case "Driver" => {
require(conf.isDefinedAt("ip"))
// By default, driver is also the seed node
val config = ConfigFactory.parseString(s"""
akka.remote.artery.canonical.hostname=$ip
akka.remote.artery.canonical.port=$port
akka.cluster.roles = [$role]
akka.cluster.seed-nodes = ["akka://$name@$ip:$port"]
""").withFallback(ConfigFactory.load("application"))
ActorSystem(AkkaExp(totalTurn, totalWorkers, builder, Vector[Actor](), None), name, config)
}
case s if s.startsWith(machinePrefix) => {
require(conf.isDefinedAt("ip"))
require(conf.isDefinedAt("seed")) // ip:port
val seed: String = conf("seed").asInstanceOf[String]
val config = ConfigFactory.parseString(s"""
akka.remote.artery.canonical.hostname=$ip
akka.remote.artery.canonical.port=$port
akka.cluster.roles = [$role]
akka.cluster.seed-nodes = ["akka://$name@$seed"]
""").withFallback(ConfigFactory.load("application"))

// 0-based
val mid = s.stripPrefix(machinePrefix).toInt
assert(mid < totalMachines)
ActorSystem(AkkaExp.materializedMachine(mid, totalTurn, totalWorkers, builder, actors), name, config)
}
case _ => throw new Exception("Invalid role! Supported roles are Standalone, Driver, and Machine-$id (o-based)")
}
Await.ready(actorSystem.whenTerminated, 10.days)

println("Simulation ends!")
SimulationSnapshot(stoppedAgents, lastWords)
builder.build()
}
}
}
Loading

0 comments on commit c925626

Please sign in to comment.