Skip to content

Commit

Permalink
[WIP] spark 3.0 (intel-analytics#3054)
Browse files Browse the repository at this point in the history
* spark 3.0
  • Loading branch information
Le-Zheng committed Sep 27, 2020
1 parent e8cb5ae commit 8570d4d
Show file tree
Hide file tree
Showing 18 changed files with 591 additions and 49 deletions.
4 changes: 2 additions & 2 deletions spark/dl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@
<dependency>
<groupId>com.github.scopt</groupId>
<artifactId>scopt_${scala.major.version}</artifactId>
<version>3.2.0</version>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
Expand Down Expand Up @@ -190,7 +190,7 @@
or shade plugin will be executed after assembly plugin. -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<version>3.2.1</version>
<configuration>
<filters>
<filter>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ class CaffeLoader[T: ClassTag](prototxtPath: String, modelPath: String,
parameterTable.foreach {
case (name: String, params: Table) =>
copyParameter(name, params)
case _ =>
throw new UnsupportedOperationException("unsupported $name and $params")
}
model
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class Pooler[T: ClassTag] (
val num_rois = rois.size(1)
totalNum += num_rois

if (out.getOrElse(i + 1, null) == null) out(i + 1) = Tensor[T]()
if (!out.contains(i + 1)) out(i + 1) = Tensor[T]()
val outROI = out[Tensor[T]](i + 1)
outROI.resize(num_rois, num_channels, resolution, resolution)
.fill(ev.fromType[Float](Float.MinValue))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class RegionProposal(
val postNmsTopN = if (this.isTraining()) min(postNmsTopNTrain, bboxNumber)
else min(postNmsTopNTest, bboxNumber)

if (output.getOrElse(b, null) == null) {
if (!output.contains(b)) {
output(b) = Tensor[Float]()
}
output[Tensor[Float]](b).resize(postNmsTopN, 4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,8 @@ abstract class AbstractModule[A <: Activity: ClassTag, B <: Activity: ClassTag,
require(copiedModuleParamTable.get(name) != None, s"cloned module should have for $name")
setLayerWeightAndBias(params,
copiedModuleParamTable.get(name).get.asInstanceOf[Table], deepCopy)
case _ =>
throw new UnsupportedOperationException("unsupported $name and $params")
}
}
}
Expand Down Expand Up @@ -1125,6 +1127,8 @@ abstract class AbstractModule[A <: Activity: ClassTag, B <: Activity: ClassTag,
} else {
if (matchAll) new Exception(s"module $name cannot find corresponding weight bias")
}
case _ =>
throw new UnsupportedOperationException("unsupported $name and $targetParams")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.commons.lang.exception.ExceptionUtils
import org.apache.log4j.Logger
import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Future
Expand Down Expand Up @@ -195,8 +196,8 @@ object DistriOptimizer extends AbstractOptimizer {
var dataRDD = dataset.data(train = true)

while (!endWhen(driverState)) {
val lossSum = sc.accumulator(0.0, "loss sum")
val recordsNum = sc.accumulator(0, "record number")
val lossSum = sc.doubleAccumulator("loss sum")
val recordsNum = sc.doubleAccumulator("record number")
metrics.set("computing time for each node", mutable.ArrayBuffer[Double](), sc)
metrics.set("get weights for each node", mutable.ArrayBuffer[Double](), sc)
metrics.set("computing time average", 0.0, sc, partitionNum)
Expand Down Expand Up @@ -293,10 +294,10 @@ object DistriOptimizer extends AbstractOptimizer {
driverMetrics.add("computing time for each node", computingTime)

val finishedThreads = trainingThreads.filter(!_.isCancelled).map(_.get())
recordsNum += finishedThreads.size * stackSize
recordsNum.add(finishedThreads.size * stackSize)
var i = 0
while (i < finishedThreads.size) {
lossSum += lossArray(finishedThreads(i))
lossSum.add(lossArray(finishedThreads(i)))
i += 1
}

Expand Down Expand Up @@ -409,7 +410,7 @@ object DistriOptimizer extends AbstractOptimizer {
}.count()

stateBroadcast.destroy()
recordsProcessedThisEpoch += recordsNum.value
recordsProcessedThisEpoch += (recordsNum.value).toInt
val end = System.nanoTime()
wallClockTime += end - start
driverState("isGradientUpdated") = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ object DistriOptimizerV2 extends AbstractOptimizer {
cacheOfMaster: MasterCache[T],
context: TrainingContext[T], trainingTrace: TrainingTrace
)(implicit ev: TensorNumeric[T]): Unit = {
val lossSum = sc.accumulator(0.0, "loss sum")
val recordsNum = sc.accumulator(0, "record number")
val lossSum = sc.doubleAccumulator("loss sum")
val recordsNum = sc.doubleAccumulator("record number")
val metrics = cacheOfMaster.metrics
val partitionNum = cacheOfMaster.partitionNum
initMetrics(sc, metrics, partitionNum)
Expand Down Expand Up @@ -202,16 +202,16 @@ object DistriOptimizerV2 extends AbstractOptimizer {

val results = train(cached, miniBatchBuffer, context, metrics)

lossSum += results.loss
recordsNum += results.records
lossSum.add(results.loss)
recordsNum.add(results.records)

Iterator.single(results.successed)
}.reduce(_ + _)

parameterSync(lossSum.value, successModels, cacheOfMaster, models, context)
})

driverStatesUpdate(cacheOfMaster, recordsNum.value,
driverStatesUpdate(cacheOfMaster, (recordsNum.value).toInt,
context, trainingTrace, metrics)
}

Expand Down Expand Up @@ -240,10 +240,6 @@ object DistriOptimizerV2 extends AbstractOptimizer {
parameterProcessers: Array[ParameterProcessor]
)

case class Replica(model: Module[T], weights: Tensor[T], gradients: Tensor[T],
criterion: Criterion[T], state: Table,
validationMethods: Option[Array[ValidationMethod[T]]])

val config = TrainingConfig(
cacheOfMaster.criterion,
cacheOfMaster.validationMethods,
Expand Down Expand Up @@ -1056,6 +1052,10 @@ private case object AGGREGATE_PARTITION_GRADIENT extends MetricEntry("aggregrate
// scalastyle:on
private case object SEND_WEIGHTS_AVERAGE extends MetricEntry("send weights average")

private case class Replica[T](model: Module[T], weights: Tensor[T], gradients: Tensor[T],
criterion: Criterion[T], state: Table,
validationMethods: Option[Array[ValidationMethod[T]]])

private class TrainingTrace(
private var _records: Int = 0,
private var _iterations: Int = 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package com.intel.analytics.bigdl.optim

import com.google.common.util.concurrent.AtomicDouble
import org.apache.spark.{Accumulable, Accumulator, SparkContext}
import org.apache.spark.SparkContext
import org.apache.spark.util.{AccumulatorV2, DoubleAccumulator}

import scala.collection.mutable.{ArrayBuffer, Map}

Expand All @@ -41,11 +42,12 @@ class Metrics extends Serializable {
}

if (aggregateDistributeMetricsMap.contains(name)) {
aggregateDistributeMetricsMap(name).value += value
aggregateDistributeMetricsMap(name).value.add(value)
}

if (distributeMetricsMap.contains(name)) {
distributeMetricsMap(name).value += value
var bufferValue = ArrayBuffer(value)
distributeMetricsMap(name).value.add(bufferValue)
}
this
}
Expand All @@ -65,11 +67,13 @@ class Metrics extends Serializable {
def set(name: String, value: Double, sc: SparkContext, parallel: Int): this.type = {
require(!localMetricsMap.contains(name), "duplicated local metric")
if (aggregateDistributeMetricsMap.contains(name)) {
aggregateDistributeMetricsMap(name).value.setValue(value)
aggregateDistributeMetricsMap(name).value.reset()
aggregateDistributeMetricsMap(name).value.add(value)
aggregateDistributeMetricsMap(name).parallel = parallel
} else {
aggregateDistributeMetricsMap(name) =
AggregateDistributeMetricsEntry(sc.accumulator(value, name), parallel)
AggregateDistributeMetricsEntry(sc.doubleAccumulator(name), parallel)
aggregateDistributeMetricsMap(name).value.add(value)
}
this
}
Expand All @@ -78,9 +82,13 @@ class Metrics extends Serializable {
require(!localMetricsMap.contains(name), "duplicated local metric")
require(!aggregateDistributeMetricsMap.contains(name), "duplicated distribute metric")
if (distributeMetricsMap.contains(name)) {
distributeMetricsMap(name).value.setValue(value)
distributeMetricsMap(name).value.reset()
distributeMetricsMap(name).value.add(value)
} else {
distributeMetricsMap(name) = DistributeMetricsEntry(sc.accumulableCollection(value))
val accumulableCollection = new ArrayBufferAccumulator
sc.register(accumulableCollection)
distributeMetricsMap(name) = DistributeMetricsEntry(accumulableCollection)
distributeMetricsMap(name).value.add(value)
}
this
}
Expand Down Expand Up @@ -118,6 +126,36 @@ class Metrics extends Serializable {

private case class LocalMetricsEntry(value: AtomicDouble, var parallel: Int)

private case class AggregateDistributeMetricsEntry(value: Accumulator[Double], var parallel: Int)
private case class AggregateDistributeMetricsEntry(value: DoubleAccumulator, var parallel: Int)

private case class DistributeMetricsEntry(value: Accumulable[ArrayBuffer[Double], Double])
private case class DistributeMetricsEntry(value: ArrayBufferAccumulator)

class ArrayBufferAccumulator extends AccumulatorV2[ArrayBuffer[Double], ArrayBuffer[Double]] {
private var values = new ArrayBuffer[Double]()

def reset(): Unit = {
values.clear()
}

def value: ArrayBuffer[Double] = values

def add(v: ArrayBuffer[Double]): Unit = {
values ++= v
}

def copy(): ArrayBufferAccumulator = {
val newArrayBufferAccumulator = new ArrayBufferAccumulator
newArrayBufferAccumulator.values = this.values
newArrayBufferAccumulator
}

def isZero: Boolean = {values.isEmpty}

def merge(other: AccumulatorV2[ArrayBuffer[Double], ArrayBuffer[Double]]): Unit = other match {
case o: ArrayBufferAccumulator => values ++= o.values
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}"
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,12 @@ class PredictionService[T: ClassTag] private[optim](
tensor.clone()
case table: Table =>
val clonedMap = mutable.HashMap[Any, Any]()
table.getState().foreach {
table.getState().foreach { x => (x: @unchecked) match {
case (k: Tensor[_], v: Tensor[_]) =>
clonedMap += k.clone() -> v.clone()
case (k, v: Tensor[_]) =>
clonedMap += k -> v.clone()
}
}
new Table(clonedMap)
}
Expand Down Expand Up @@ -190,32 +191,32 @@ object PredictionService {
val tensorState: Array[(Tensor[_], Tensor[_])] = firstKey match {
case _: Tensor[_] =>
keyIsPrimitive = false
table.getState().map { case (k: Tensor[_], v: Tensor[_]) =>
k -> v }.toArray
table.getState().map { x => (x: @unchecked) match { case (k: Tensor[_], v: Tensor[_]) =>
k -> v }}.toArray
case _: Int =>
table.getState().map { case (k: Int, v: Tensor[_]) =>
Tensor.scalar(k) -> v }.toArray
table.getState().map { x => (x: @unchecked) match { case (k: Int, v: Tensor[_]) =>
Tensor.scalar(k) -> v }}.toArray
case _: Long =>
table.getState().map { case (k: Long, v: Tensor[_]) =>
Tensor.scalar(k) -> v }.toArray
table.getState().map { x => (x: @unchecked) match { case (k: Long, v: Tensor[_]) =>
Tensor.scalar(k) -> v }}.toArray
case _: Char =>
table.getState().map { case (k: Char, v: Tensor[_]) =>
Tensor.scalar(k) -> v }.toArray
table.getState().map { x => (x: @unchecked) match { case (k: Char, v: Tensor[_]) =>
Tensor.scalar(k) -> v }}.toArray
case _: Short =>
table.getState().map { case (k: Short, v: Tensor[_]) =>
Tensor.scalar(k) -> v }.toArray
table.getState().map {x => (x: @unchecked) match { case (k: Short, v: Tensor[_]) =>
Tensor.scalar(k) -> v }}.toArray
case _: Float =>
table.getState().map { case (k: Float, v: Tensor[_]) =>
Tensor.scalar(k) -> v }.toArray
table.getState().map { x => (x: @unchecked) match { case (k: Float, v: Tensor[_]) =>
Tensor.scalar(k) -> v }}.toArray
case _: Double =>
table.getState().map { case (k: Double, v: Tensor[_]) =>
Tensor.scalar(k) -> v }.toArray
table.getState().map { x => (x: @unchecked) match { case (k: Double, v: Tensor[_]) =>
Tensor.scalar(k) -> v }}.toArray
case _: Boolean =>
table.getState().map { case (k: Boolean, v: Tensor[_]) =>
Tensor.scalar(k) -> v }.toArray
table.getState().map { x => (x: @unchecked) match { case (k: Boolean, v: Tensor[_]) =>
Tensor.scalar(k) -> v }}.toArray
case _: String =>
table.getState().map { case (k: String, v: Tensor[_]) =>
Tensor.scalar(k) -> v }.toArray
table.getState().map { x => (x: @unchecked) match { case (k: String, v: Tensor[_]) =>
Tensor.scalar(k) -> v }}.toArray
case key =>
throw new UnsupportedOperationException(s"Unsupported Table key: $key!")
}
Expand Down
12 changes: 10 additions & 2 deletions spark/dl/src/test/integration-test.robot
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ Test template BigDL Test
4 Spark2.3 on Yarn Test Suite
5 Quantization Test Suite
6 PySpark2.2 Test Suite

7 PySpark3.0 Test Suite
8 Spark3.0 on Yarn Test Suite

*** Keywords ***
Build SparkJar
Expand Down Expand Up @@ -112,6 +113,9 @@ Spark1.6 on Yarn Test Suite
Spark2.3 on Yarn Test Suite
Yarn Test Suite spark_2.x /opt/work/spark-2.3.1-bin-hadoop2.7

Spark3.0 on Yarn Test Suite
Yarn Test Suite spark_3.x /opt/work/spark-3.0.0-bin-hadoop2.7

Yarn Test Suite
[Arguments] ${bigdl_spark_version} ${spark_home}
DownLoad Input
Expand Down Expand Up @@ -147,4 +151,8 @@ PySpark2.2 Test Suite
${submit}= Catenate SEPARATOR=/ /opt/work/spark-2.2.0-bin-hadoop2.7/bin spark-submit
Run Shell ${submit} --master ${spark_22_master} --conf "spark.serializer=org.apache.spark.serializer.JavaSerializer" --driver-memory 10g --executor-cores 14 --total-executor-cores 28 --py-files ${curdir}/dist/lib/bigdl-${version}-python-api.zip --jars ${jar_path} --properties-file ${curdir}/dist/conf/spark-bigdl.conf ${curdir}/pyspark/bigdl/models/lenet/lenet5.py -b 224 --action train --endTriggerType epoch --endTriggerNum 1


PySpark3.0 Test Suite
Build SparkJar spark_3.x
Set Environment Variable SPARK_HOME /opt/work/spark-3.0.0-bin-hadoop2.7
${submit}= Catenate SEPARATOR=/ /opt/work/spark-3.0.0-bin-hadoop2.7/bin spark-submit
Run Shell ${submit} --master ${spark_30_master} --conf "spark.serializer=org.apache.spark.serializer.JavaSerializer" --driver-memory 10g --executor-cores 14 --total-executor-cores 28 --py-files ${curdir}/dist/lib/bigdl-${version}-python-api.zip --jars ${jar_path} --properties-file ${curdir}/dist/conf/spark-bigdl.conf ${curdir}/pyspark/bigdl/models/lenet/lenet5.py -b 224 --action train --endTriggerType epoch --endTriggerNum 1
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,8 @@ class LSTMPeepholeSpec extends TorchRNNSpec {
assert(abs(v1 - v2) <= 1e-8)
v1
})
case _ =>
throw new UnsupportedOperationException("unsupported $key and $value type")
}

luaOutput.map(output, (v1, v2) => {
Expand Down
Loading

0 comments on commit 8570d4d

Please sign in to comment.