From 490e96abf4438ef7283e9709342ea460e8cca6b4 Mon Sep 17 00:00:00 2001 From: Le-Zheng <30695225+Le-Zheng@users.noreply.github.com> Date: Sun, 27 Sep 2020 02:10:54 +0100 Subject: [PATCH] [WIP] spark 3.0 (#3054) * spark 3.0 --- spark/dl/pom.xml | 4 +- .../com/intel/analytics/bigdl/nn/Pooler.scala | 2 +- .../analytics/bigdl/nn/RegionProposal.scala | 2 +- .../bigdl/nn/abstractnn/AbstractModule.scala | 4 + .../bigdl/optim/DistriOptimizer.scala | 11 +- .../bigdl/optim/DistriOptimizerV2.scala | 18 +- .../intel/analytics/bigdl/optim/Metrics.scala | 56 +++++- .../bigdl/optim/PredictionService.scala | 39 +++-- .../bigdl/utils/caffe/CaffeLoader.scala | 2 + spark/dl/src/test/integration-test.robot | 12 +- .../integration/torch/LSTMPeepholeSpec.scala | 2 + spark/spark-version/3.0/pom.xml | 55 ++++++ .../org/apache/spark/ml/DLEstimatorBase.scala | 66 +++++++ .../apache/spark/ml/DLTransformerBase.scala | 41 +++++ .../rdd/ZippedPartitionsWithLocalityRDD.scala | 135 +++++++++++++++ .../org/apache/spark/sql/SqlAdapter.scala | 27 +++ .../spark/storage/BlockManagerWrapper.scala | 162 ++++++++++++++++++ spark/spark-version/pom.xml | 2 +- 18 files changed, 591 insertions(+), 49 deletions(-) create mode 100644 spark/spark-version/3.0/pom.xml create mode 100644 spark/spark-version/3.0/src/main/scala/org/apache/spark/ml/DLEstimatorBase.scala create mode 100644 spark/spark-version/3.0/src/main/scala/org/apache/spark/ml/DLTransformerBase.scala create mode 100644 spark/spark-version/3.0/src/main/scala/org/apache/spark/rdd/ZippedPartitionsWithLocalityRDD.scala create mode 100644 spark/spark-version/3.0/src/main/scala/org/apache/spark/sql/SqlAdapter.scala create mode 100644 spark/spark-version/3.0/src/main/scala/org/apache/spark/storage/BlockManagerWrapper.scala diff --git a/spark/dl/pom.xml b/spark/dl/pom.xml index 087eda80717..d06ea66ef94 100644 --- a/spark/dl/pom.xml +++ b/spark/dl/pom.xml @@ -135,7 +135,7 @@ com.github.scopt scopt_${scala.major.version} - 3.2.0 + 3.5.0 it.unimi.dsi @@ -190,7 +190,7 @@ or shade plugin will be executed after assembly plugin. --> org.apache.maven.plugins maven-shade-plugin - 3.0.0 + 3.2.1 diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/nn/Pooler.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/nn/Pooler.scala index 19deb3b5039..9c2e6759689 100644 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/nn/Pooler.scala +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/nn/Pooler.scala @@ -121,7 +121,7 @@ class Pooler[T: ClassTag] ( val num_rois = rois.size(1) totalNum += num_rois - if (out.getOrElse(i + 1, null) == null) out(i + 1) = Tensor[T]() + if (!out.contains(i + 1)) out(i + 1) = Tensor[T]() val outROI = out[Tensor[T]](i + 1) outROI.resize(num_rois, num_channels, resolution, resolution) .fill(ev.fromType[Float](Float.MinValue)) diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/nn/RegionProposal.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/nn/RegionProposal.scala index a79f7d058df..ae7114c3607 100644 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/nn/RegionProposal.scala +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/nn/RegionProposal.scala @@ -143,7 +143,7 @@ class RegionProposal( val postNmsTopN = if (this.isTraining()) min(postNmsTopNTrain, bboxNumber) else min(postNmsTopNTest, bboxNumber) - if (output.getOrElse(b, null) == null) { + if (!output.contains(b)) { output(b) = Tensor[Float]() } output[Tensor[Float]](b).resize(postNmsTopN, 4) diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/nn/abstractnn/AbstractModule.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/nn/abstractnn/AbstractModule.scala index 44dd26eb1f1..1c85c9a6628 100644 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/nn/abstractnn/AbstractModule.scala +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/nn/abstractnn/AbstractModule.scala @@ -1064,6 +1064,8 @@ abstract class AbstractModule[A <: Activity: ClassTag, B <: Activity: ClassTag, require(copiedModuleParamTable.get(name) != None, s"cloned module should have for $name") setLayerWeightAndBias(params, copiedModuleParamTable.get(name).get.asInstanceOf[Table], deepCopy) + case _ => + throw new UnsupportedOperationException("unsupported $name and $params") } } } @@ -1125,6 +1127,8 @@ abstract class AbstractModule[A <: Activity: ClassTag, B <: Activity: ClassTag, } else { if (matchAll) new Exception(s"module $name cannot find corresponding weight bias") } + case _ => + throw new UnsupportedOperationException("unsupported $name and $targetParams") } } diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/optim/DistriOptimizer.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/optim/DistriOptimizer.scala index 19fbfa57dca..e909f506d8c 100644 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/optim/DistriOptimizer.scala +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/optim/DistriOptimizer.scala @@ -35,6 +35,7 @@ import org.apache.commons.lang.exception.ExceptionUtils import org.apache.log4j.Logger import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD +import org.apache.spark.util.AccumulatorV2 import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.concurrent.Future @@ -195,8 +196,8 @@ object DistriOptimizer extends AbstractOptimizer { var dataRDD = dataset.data(train = true) while (!endWhen(driverState)) { - val lossSum = sc.accumulator(0.0, "loss sum") - val recordsNum = sc.accumulator(0, "record number") + val lossSum = sc.doubleAccumulator("loss sum") + val recordsNum = sc.doubleAccumulator("record number") metrics.set("computing time for each node", mutable.ArrayBuffer[Double](), sc) metrics.set("get weights for each node", mutable.ArrayBuffer[Double](), sc) metrics.set("computing time average", 0.0, sc, partitionNum) @@ -293,10 +294,10 @@ object DistriOptimizer extends AbstractOptimizer { driverMetrics.add("computing time for each node", computingTime) val finishedThreads = trainingThreads.filter(!_.isCancelled).map(_.get()) - recordsNum += finishedThreads.size * stackSize + recordsNum.add(finishedThreads.size * stackSize) var i = 0 while (i < finishedThreads.size) { - lossSum += lossArray(finishedThreads(i)) + lossSum.add(lossArray(finishedThreads(i))) i += 1 } @@ -409,7 +410,7 @@ object DistriOptimizer extends AbstractOptimizer { }.count() stateBroadcast.destroy() - recordsProcessedThisEpoch += recordsNum.value + recordsProcessedThisEpoch += (recordsNum.value).toInt val end = System.nanoTime() wallClockTime += end - start driverState("isGradientUpdated") = true diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/optim/DistriOptimizerV2.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/optim/DistriOptimizerV2.scala index 68c774d9a3a..ec20e4646d4 100644 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/optim/DistriOptimizerV2.scala +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/optim/DistriOptimizerV2.scala @@ -168,8 +168,8 @@ object DistriOptimizerV2 extends AbstractOptimizer { cacheOfMaster: MasterCache[T], context: TrainingContext[T], trainingTrace: TrainingTrace )(implicit ev: TensorNumeric[T]): Unit = { - val lossSum = sc.accumulator(0.0, "loss sum") - val recordsNum = sc.accumulator(0, "record number") + val lossSum = sc.doubleAccumulator("loss sum") + val recordsNum = sc.doubleAccumulator("record number") val metrics = cacheOfMaster.metrics val partitionNum = cacheOfMaster.partitionNum initMetrics(sc, metrics, partitionNum) @@ -202,8 +202,8 @@ object DistriOptimizerV2 extends AbstractOptimizer { val results = train(cached, miniBatchBuffer, context, metrics) - lossSum += results.loss - recordsNum += results.records + lossSum.add(results.loss) + recordsNum.add(results.records) Iterator.single(results.successed) }.reduce(_ + _) @@ -211,7 +211,7 @@ object DistriOptimizerV2 extends AbstractOptimizer { parameterSync(lossSum.value, successModels, cacheOfMaster, models, context) }) - driverStatesUpdate(cacheOfMaster, recordsNum.value, + driverStatesUpdate(cacheOfMaster, (recordsNum.value).toInt, context, trainingTrace, metrics) } @@ -240,10 +240,6 @@ object DistriOptimizerV2 extends AbstractOptimizer { parameterProcessers: Array[ParameterProcessor] ) - case class Replica(model: Module[T], weights: Tensor[T], gradients: Tensor[T], - criterion: Criterion[T], state: Table, - validationMethods: Option[Array[ValidationMethod[T]]]) - val config = TrainingConfig( cacheOfMaster.criterion, cacheOfMaster.validationMethods, @@ -1056,6 +1052,10 @@ private case object AGGREGATE_PARTITION_GRADIENT extends MetricEntry("aggregrate // scalastyle:on private case object SEND_WEIGHTS_AVERAGE extends MetricEntry("send weights average") +private case class Replica[T](model: Module[T], weights: Tensor[T], gradients: Tensor[T], + criterion: Criterion[T], state: Table, + validationMethods: Option[Array[ValidationMethod[T]]]) + private class TrainingTrace( private var _records: Int = 0, private var _iterations: Int = 0, diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/optim/Metrics.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/optim/Metrics.scala index 88e839065c3..65393ad5861 100644 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/optim/Metrics.scala +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/optim/Metrics.scala @@ -17,7 +17,8 @@ package com.intel.analytics.bigdl.optim import com.google.common.util.concurrent.AtomicDouble -import org.apache.spark.{Accumulable, Accumulator, SparkContext} +import org.apache.spark.SparkContext +import org.apache.spark.util.{AccumulatorV2, DoubleAccumulator} import scala.collection.mutable.{ArrayBuffer, Map} @@ -41,11 +42,12 @@ class Metrics extends Serializable { } if (aggregateDistributeMetricsMap.contains(name)) { - aggregateDistributeMetricsMap(name).value += value + aggregateDistributeMetricsMap(name).value.add(value) } if (distributeMetricsMap.contains(name)) { - distributeMetricsMap(name).value += value + var bufferValue = ArrayBuffer(value) + distributeMetricsMap(name).value.add(bufferValue) } this } @@ -65,11 +67,13 @@ class Metrics extends Serializable { def set(name: String, value: Double, sc: SparkContext, parallel: Int): this.type = { require(!localMetricsMap.contains(name), "duplicated local metric") if (aggregateDistributeMetricsMap.contains(name)) { - aggregateDistributeMetricsMap(name).value.setValue(value) + aggregateDistributeMetricsMap(name).value.reset() + aggregateDistributeMetricsMap(name).value.add(value) aggregateDistributeMetricsMap(name).parallel = parallel } else { aggregateDistributeMetricsMap(name) = - AggregateDistributeMetricsEntry(sc.accumulator(value, name), parallel) + AggregateDistributeMetricsEntry(sc.doubleAccumulator(name), parallel) + aggregateDistributeMetricsMap(name).value.add(value) } this } @@ -78,9 +82,13 @@ class Metrics extends Serializable { require(!localMetricsMap.contains(name), "duplicated local metric") require(!aggregateDistributeMetricsMap.contains(name), "duplicated distribute metric") if (distributeMetricsMap.contains(name)) { - distributeMetricsMap(name).value.setValue(value) + distributeMetricsMap(name).value.reset() + distributeMetricsMap(name).value.add(value) } else { - distributeMetricsMap(name) = DistributeMetricsEntry(sc.accumulableCollection(value)) + val accumulableCollection = new ArrayBufferAccumulator + sc.register(accumulableCollection) + distributeMetricsMap(name) = DistributeMetricsEntry(accumulableCollection) + distributeMetricsMap(name).value.add(value) } this } @@ -118,6 +126,36 @@ class Metrics extends Serializable { private case class LocalMetricsEntry(value: AtomicDouble, var parallel: Int) -private case class AggregateDistributeMetricsEntry(value: Accumulator[Double], var parallel: Int) +private case class AggregateDistributeMetricsEntry(value: DoubleAccumulator, var parallel: Int) -private case class DistributeMetricsEntry(value: Accumulable[ArrayBuffer[Double], Double]) +private case class DistributeMetricsEntry(value: ArrayBufferAccumulator) + +class ArrayBufferAccumulator extends AccumulatorV2[ArrayBuffer[Double], ArrayBuffer[Double]] { + private var values = new ArrayBuffer[Double]() + + def reset(): Unit = { + values.clear() + } + + def value: ArrayBuffer[Double] = values + + def add(v: ArrayBuffer[Double]): Unit = { + values ++= v + } + + def copy(): ArrayBufferAccumulator = { + val newArrayBufferAccumulator = new ArrayBufferAccumulator + newArrayBufferAccumulator.values = this.values + newArrayBufferAccumulator + } + + def isZero: Boolean = {values.isEmpty} + + def merge(other: AccumulatorV2[ArrayBuffer[Double], ArrayBuffer[Double]]): Unit = other match { + case o: ArrayBufferAccumulator => values ++= o.values + case _ => throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}" + ) + } + +} diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/optim/PredictionService.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/optim/PredictionService.scala index b027f2346a5..70dea48f0ff 100644 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/optim/PredictionService.scala +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/optim/PredictionService.scala @@ -94,11 +94,12 @@ class PredictionService[T: ClassTag] private[optim]( tensor.clone() case table: Table => val clonedMap = mutable.HashMap[Any, Any]() - table.getState().foreach { + table.getState().foreach { x => (x: @unchecked) match { case (k: Tensor[_], v: Tensor[_]) => clonedMap += k.clone() -> v.clone() case (k, v: Tensor[_]) => clonedMap += k -> v.clone() + } } new Table(clonedMap) } @@ -190,32 +191,32 @@ object PredictionService { val tensorState: Array[(Tensor[_], Tensor[_])] = firstKey match { case _: Tensor[_] => keyIsPrimitive = false - table.getState().map { case (k: Tensor[_], v: Tensor[_]) => - k -> v }.toArray + table.getState().map { x => (x: @unchecked) match { case (k: Tensor[_], v: Tensor[_]) => + k -> v }}.toArray case _: Int => - table.getState().map { case (k: Int, v: Tensor[_]) => - Tensor.scalar(k) -> v }.toArray + table.getState().map { x => (x: @unchecked) match { case (k: Int, v: Tensor[_]) => + Tensor.scalar(k) -> v }}.toArray case _: Long => - table.getState().map { case (k: Long, v: Tensor[_]) => - Tensor.scalar(k) -> v }.toArray + table.getState().map { x => (x: @unchecked) match { case (k: Long, v: Tensor[_]) => + Tensor.scalar(k) -> v }}.toArray case _: Char => - table.getState().map { case (k: Char, v: Tensor[_]) => - Tensor.scalar(k) -> v }.toArray + table.getState().map { x => (x: @unchecked) match { case (k: Char, v: Tensor[_]) => + Tensor.scalar(k) -> v }}.toArray case _: Short => - table.getState().map { case (k: Short, v: Tensor[_]) => - Tensor.scalar(k) -> v }.toArray + table.getState().map {x => (x: @unchecked) match { case (k: Short, v: Tensor[_]) => + Tensor.scalar(k) -> v }}.toArray case _: Float => - table.getState().map { case (k: Float, v: Tensor[_]) => - Tensor.scalar(k) -> v }.toArray + table.getState().map { x => (x: @unchecked) match { case (k: Float, v: Tensor[_]) => + Tensor.scalar(k) -> v }}.toArray case _: Double => - table.getState().map { case (k: Double, v: Tensor[_]) => - Tensor.scalar(k) -> v }.toArray + table.getState().map { x => (x: @unchecked) match { case (k: Double, v: Tensor[_]) => + Tensor.scalar(k) -> v }}.toArray case _: Boolean => - table.getState().map { case (k: Boolean, v: Tensor[_]) => - Tensor.scalar(k) -> v }.toArray + table.getState().map { x => (x: @unchecked) match { case (k: Boolean, v: Tensor[_]) => + Tensor.scalar(k) -> v }}.toArray case _: String => - table.getState().map { case (k: String, v: Tensor[_]) => - Tensor.scalar(k) -> v }.toArray + table.getState().map { x => (x: @unchecked) match { case (k: String, v: Tensor[_]) => + Tensor.scalar(k) -> v }}.toArray case key => throw new UnsupportedOperationException(s"Unsupported Table key: $key!") } diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/utils/caffe/CaffeLoader.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/utils/caffe/CaffeLoader.scala index 95946d17d62..bc20ba9211d 100644 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/utils/caffe/CaffeLoader.scala +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/utils/caffe/CaffeLoader.scala @@ -259,6 +259,8 @@ class CaffeLoader[T: ClassTag](prototxtPath: String, modelPath: String, parameterTable.foreach { case (name: String, params: Table) => copyParameter(name, params) + case _ => + throw new UnsupportedOperationException("unsupported $name and $params") } model } diff --git a/spark/dl/src/test/integration-test.robot b/spark/dl/src/test/integration-test.robot index 53d8e4033fe..a82b20225d1 100644 --- a/spark/dl/src/test/integration-test.robot +++ b/spark/dl/src/test/integration-test.robot @@ -12,7 +12,8 @@ Test template BigDL Test 4 Spark2.3 on Yarn Test Suite 5 Quantization Test Suite 6 PySpark2.2 Test Suite - +7 PySpark3.0 Test Suite +8 Spark3.0 on Yarn Test Suite *** Keywords *** Build SparkJar @@ -112,6 +113,9 @@ Spark1.6 on Yarn Test Suite Spark2.3 on Yarn Test Suite Yarn Test Suite spark_2.x /opt/work/spark-2.3.1-bin-hadoop2.7 +Spark3.0 on Yarn Test Suite + Yarn Test Suite spark_3.x /opt/work/spark-3.0.0-bin-hadoop2.7 + Yarn Test Suite [Arguments] ${bigdl_spark_version} ${spark_home} DownLoad Input @@ -147,4 +151,8 @@ PySpark2.2 Test Suite ${submit}= Catenate SEPARATOR=/ /opt/work/spark-2.2.0-bin-hadoop2.7/bin spark-submit Run Shell ${submit} --master ${spark_22_master} --conf "spark.serializer=org.apache.spark.serializer.JavaSerializer" --driver-memory 10g --executor-cores 14 --total-executor-cores 28 --py-files ${curdir}/dist/lib/bigdl-${version}-python-api.zip --jars ${jar_path} --properties-file ${curdir}/dist/conf/spark-bigdl.conf ${curdir}/pyspark/bigdl/models/lenet/lenet5.py -b 224 --action train --endTriggerType epoch --endTriggerNum 1 - +PySpark3.0 Test Suite + Build SparkJar spark_3.x + Set Environment Variable SPARK_HOME /opt/work/spark-3.0.0-bin-hadoop2.7 + ${submit}= Catenate SEPARATOR=/ /opt/work/spark-3.0.0-bin-hadoop2.7/bin spark-submit + Run Shell ${submit} --master ${spark_30_master} --conf "spark.serializer=org.apache.spark.serializer.JavaSerializer" --driver-memory 10g --executor-cores 14 --total-executor-cores 28 --py-files ${curdir}/dist/lib/bigdl-${version}-python-api.zip --jars ${jar_path} --properties-file ${curdir}/dist/conf/spark-bigdl.conf ${curdir}/pyspark/bigdl/models/lenet/lenet5.py -b 224 --action train --endTriggerType epoch --endTriggerNum 1 diff --git a/spark/dl/src/test/scala/com/intel/analytics/bigdl/integration/torch/LSTMPeepholeSpec.scala b/spark/dl/src/test/scala/com/intel/analytics/bigdl/integration/torch/LSTMPeepholeSpec.scala index c618143c837..49de2f305a2 100644 --- a/spark/dl/src/test/scala/com/intel/analytics/bigdl/integration/torch/LSTMPeepholeSpec.scala +++ b/spark/dl/src/test/scala/com/intel/analytics/bigdl/integration/torch/LSTMPeepholeSpec.scala @@ -668,6 +668,8 @@ class LSTMPeepholeSpec extends TorchRNNSpec { assert(abs(v1 - v2) <= 1e-8) v1 }) + case _ => + throw new UnsupportedOperationException("unsupported $key and $value type") } luaOutput.map(output, (v1, v2) => { diff --git a/spark/spark-version/3.0/pom.xml b/spark/spark-version/3.0/pom.xml new file mode 100644 index 00000000000..e65f16912dc --- /dev/null +++ b/spark/spark-version/3.0/pom.xml @@ -0,0 +1,55 @@ + + + + spark-version + com.intel.analytics.bigdl + 0.12.0-SNAPSHOT + + 4.0.0 + + com.intel.analytics.bigdl.spark-version + 3.0 + jar + + + + org.apache.spark + spark-core_2.12 + ${spark.version} + provided + + + org.apache.spark + spark-mllib_2.12 + ${spark.version} + provided + + + + + + org.scalastyle + scalastyle-maven-plugin + 0.8.0 + + false + true + false + ${basedir}/src/main/scala + ${project.parent.parent.parent.basedir}/scalastyle_config.xml + ${project.build.directory}/stylecheck/scalastyle-output.xml + UTF-8 + + + + + check + + + + + + + diff --git a/spark/spark-version/3.0/src/main/scala/org/apache/spark/ml/DLEstimatorBase.scala b/spark/spark-version/3.0/src/main/scala/org/apache/spark/ml/DLEstimatorBase.scala new file mode 100644 index 00000000000..bea73e45628 --- /dev/null +++ b/spark/spark-version/3.0/src/main/scala/org/apache/spark/ml/DLEstimatorBase.scala @@ -0,0 +1,66 @@ +/* + * Copyright 2016 The BigDL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ml + +import org.apache.spark.ml.param.ParamMap +import org.apache.spark.ml.param.shared.HasLabelCol +import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{DataFrame, Dataset, Row} + +/** + * Handle different Vector types in Spark 1.5/1.6 and Spark 2.0+. + * Support both ML Vector and MLlib Vector for Spark 2.0+. + */ +trait VectorCompatibility { + + val validVectorTypes = Seq(new VectorUDT, new org.apache.spark.mllib.linalg.VectorUDT) + + def getVectorSeq(row: Row, colType: DataType, index: Int): Seq[AnyVal] = { + if (colType == new VectorUDT) { + row.getAs[Vector](index).toArray.toSeq + } else if (colType == new org.apache.spark.mllib.linalg.VectorUDT) { + row.getAs[org.apache.spark.mllib.linalg.Vector](index).toArray.toSeq + } else { + throw new IllegalArgumentException( + s"$colType is not a supported vector type.") + } + } +} + + +/** + *A wrapper from org.apache.spark.ml.Estimator + * Extends MLEstimator and override process to gain compatibility with + * both spark 1.5 and spark 2.0. + */ +abstract class DLEstimatorBase[Learner <: DLEstimatorBase[Learner, M], + M <: DLTransformerBase[M]] + extends Estimator[M] with HasLabelCol { + + protected def internalFit(dataFrame: DataFrame): M + + override def fit(dataset: Dataset[_]): M = { + transformSchema(dataset.schema, logging = true) + internalFit(dataset.toDF()) + } + + override def copy(extra: ParamMap): Learner = defaultCopy(extra) + +} + + + diff --git a/spark/spark-version/3.0/src/main/scala/org/apache/spark/ml/DLTransformerBase.scala b/spark/spark-version/3.0/src/main/scala/org/apache/spark/ml/DLTransformerBase.scala new file mode 100644 index 00000000000..6be20bfa163 --- /dev/null +++ b/spark/spark-version/3.0/src/main/scala/org/apache/spark/ml/DLTransformerBase.scala @@ -0,0 +1,41 @@ +/* + * Copyright 2016 The BigDL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ml + +import org.apache.spark.ml.param.ParamMap +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Dataset} + +/** + * A wrapper for org.apache.spark.ml.Transformer. + * Extends MlTransformer and override process to gain compatibility with + * both spark 1.5 and spark 2.0. + */ +abstract class DLTransformerBase[M <: DLTransformerBase[M]] + extends Model[M] { + + /** + * convert feature columns(MLlib Vectors or Array) to Seq format + */ + protected def internalTransform(dataFrame: DataFrame): DataFrame + + override def transform(dataset: Dataset[_]): DataFrame = { + transformSchema(dataset.schema, logging = true) + internalTransform(dataset.toDF()) + } + + override def copy(extra: ParamMap): M = defaultCopy(extra) +} diff --git a/spark/spark-version/3.0/src/main/scala/org/apache/spark/rdd/ZippedPartitionsWithLocalityRDD.scala b/spark/spark-version/3.0/src/main/scala/org/apache/spark/rdd/ZippedPartitionsWithLocalityRDD.scala new file mode 100644 index 00000000000..d2643079f58 --- /dev/null +++ b/spark/spark-version/3.0/src/main/scala/org/apache/spark/rdd/ZippedPartitionsWithLocalityRDD.scala @@ -0,0 +1,135 @@ +/* + * Copyright 2016 The BigDL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import java.io.{IOException, ObjectOutputStream} + +import org.apache.log4j.Logger +import org.apache.spark.util.Utils +import org.apache.spark.{Partition, SparkContext} + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +object ZippedPartitionsWithLocalityRDD { + def apply[T: ClassTag, B: ClassTag, V: ClassTag] + (rdd1: RDD[T], rdd2: RDD[B], preservesPartitioning: Boolean = false) + (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = rdd1.withScope { + val sc = rdd1.sparkContext + new ZippedPartitionsWithLocalityRDD( + sc, sc.clean(f), rdd1, rdd2, preservesPartitioning) + } + + val logger: Logger = Logger.getLogger(getClass) +} + +/** + * Prefer to zip partitions of rdd1 and rdd2 in the same location. + * Remaining partitions not in same location will be zipped by order. + * For example: + * Say we have two RDDs, rdd1 and rdd2. The first partition of rdd1 is on node A, and the second + * is on node B. The first partition of rdd2 is on node B and the second one is on node A. + * If we just use rdd1.zipPartition(rdd2), the result will be the first partition of rdd1 is + * zipped with the first partition of rdd2, so there will be cross node communication. This is + * bad for performance. That's why we introduce the ZippedPartitionsWithLocalityRDD. + * In our method, the first partition of rdd1 will be zipped with the second partition of rdd2, + * as they are on the same node. This will reduce the network communication cost and result in + * a better performance. + * @param sc spark context + * @param _f + * @param _rdd1 + * @param _rdd2 + * @param preservesPartitioning + */ +class ZippedPartitionsWithLocalityRDD[A: ClassTag, B: ClassTag, V: ClassTag]( + sc: SparkContext, + _f: (Iterator[A], Iterator[B]) => Iterator[V], + _rdd1: RDD[A], + _rdd2: RDD[B], + preservesPartitioning: Boolean = false) + extends ZippedPartitionsRDD2[A, B, V](sc, _f, _rdd1, _rdd2, preservesPartitioning) { + + override def getPartitions: Array[Partition] = { + require(rdds.length == 2, "this is only for 2 rdd zip") + val numParts = rdds.head.partitions.length + if (!rdds.forall(rdd => rdd.partitions.length == numParts)) { + throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions") + } + + val candidateLocs = new ArrayBuffer[(Int, Seq[String])]() + (0 until numParts).foreach(p => { + candidateLocs.append((p, rdds(1) + .context.getPreferredLocs(rdds(1), p) + .map(_.toString).distinct)) + }) + val nonmatchPartitionId = new ArrayBuffer[Int]() + val parts = new Array[Partition](numParts) + + (0 until numParts).foreach { i => + val curPrefs = rdds(0).context.getPreferredLocs(rdds(0), i).map(_.toString).distinct + var p = 0 + var matchPartition: (Int, Seq[String]) = null + var locs: Seq[String] = null + while (p < candidateLocs.length) { + locs = candidateLocs(p)._2.intersect(curPrefs) + if (!locs.isEmpty) { + matchPartition = candidateLocs.remove(p) + p = Integer.MAX_VALUE - 1 + } + p += 1 + } + if (matchPartition != null) { + parts(i) = + new ZippedPartitionsLocalityPartition(i, Array(i, matchPartition._1), rdds, locs) + } else { + ZippedPartitionsWithLocalityRDD.logger.warn(s"can't find locality partition" + + s"for partition $i Partition locations are (${curPrefs}) Candidate partition" + + s" locations are\n" + s"${candidateLocs.mkString("\n")}.") + nonmatchPartitionId.append(i) + } + } + + require(nonmatchPartitionId.size == candidateLocs.size, + "unmatched partition size should be the same with candidateLocs size") + nonmatchPartitionId.foreach { i => + val locs = rdds(0).context.getPreferredLocs(rdds(0), i).map(_.toString).distinct + val matchPartition = candidateLocs.remove(0) + parts(i) = new ZippedPartitionsLocalityPartition(i, Array(i, matchPartition._1), rdds, locs) + } + parts + } +} + + +private[spark] class ZippedPartitionsLocalityPartition( + idx: Int, + @transient val indexes: Seq[Int], + @transient val rdds: Seq[RDD[_]], + @transient override val preferredLocations: Seq[String]) + extends ZippedPartitionsPartition(idx, rdds, preferredLocations) { + + override val index: Int = idx + var _partitionValues = rdds.zip(indexes).map{ case (rdd, i) => rdd.partitions(i) } + override def partitions: Seq[Partition] = _partitionValues + + @throws(classOf[IOException]) + private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { + // Update the reference to parent split at the time of task serialization + _partitionValues = rdds.zip(indexes).map{ case (rdd, i) => rdd.partitions(i) } + oos.defaultWriteObject() + } +} diff --git a/spark/spark-version/3.0/src/main/scala/org/apache/spark/sql/SqlAdapter.scala b/spark/spark-version/3.0/src/main/scala/org/apache/spark/sql/SqlAdapter.scala new file mode 100644 index 00000000000..724b719ed36 --- /dev/null +++ b/spark/spark-version/3.0/src/main/scala/org/apache/spark/sql/SqlAdapter.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2016 The BigDL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql + +import org.apache.spark.sql.expressions.SparkUserDefinedFunction +import org.apache.spark.sql.types.DataType + +object SqlAdapter { + + def getUDF(f: AnyRef, dataType: DataType): SparkUserDefinedFunction = { + SparkUserDefinedFunction(f, dataType) + } + +} diff --git a/spark/spark-version/3.0/src/main/scala/org/apache/spark/storage/BlockManagerWrapper.scala b/spark/spark-version/3.0/src/main/scala/org/apache/spark/storage/BlockManagerWrapper.scala new file mode 100644 index 00000000000..116d6f248ba --- /dev/null +++ b/spark/spark-version/3.0/src/main/scala/org/apache/spark/storage/BlockManagerWrapper.scala @@ -0,0 +1,162 @@ +/* + * Copyright 2016 The BigDL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.lang.{Boolean => JBoolean} +import java.nio.ByteBuffer + +import org.apache.spark.SparkEnv +import org.apache.spark.util.io.ChunkedByteBuffer + +import scala.reflect.ClassTag + +object BlockManagerWrapper { + + def putBytes( blockId: BlockId, + bytes: ByteBuffer, + level: StorageLevel): Unit = { + require(bytes != null, "Bytes is null") + putBytesFn(blockId, new ChunkedByteBuffer(bytes), level) + } + + def getLocal(blockId: BlockId): Option[BlockResult] = { + SparkEnv.get.blockManager.getLocalValues(blockId) + } + + def putSingle(blockId: BlockId, + value: Any, + level: StorageLevel, + tellMaster: Boolean = true): Unit = { + SparkEnv.get.blockManager.putSingle(blockId, value, level, tellMaster) + } + + def removeBlock(blockId: BlockId): Unit = { + SparkEnv.get.blockManager.removeBlock(blockId) + } + + def getLocalBytes(blockId: BlockId): Option[ByteBuffer] = { + getLocalBytesFn(blockId) + } + + def getLocalOrRemoteBytes(blockId: BlockId): Option[ByteBuffer] = { + val maybeLocalBytes = getLocalBytesFn(blockId) + if (maybeLocalBytes.isDefined) { + maybeLocalBytes + } else { + SparkEnv.get.blockManager.getRemoteBytes(blockId).map(_.toByteBuffer) + } + } + + def unlock(blockId : BlockId): Unit = { + val blockInfoManager = SparkEnv.get.blockManager.blockInfoManager + if (blockInfoManager.get(blockId).isDefined) { + unlockFn(blockId) + } + } + + private val getLocalBytesFn: (BlockId) => Option[ByteBuffer] = { + val bmClass = classOf[BlockManager] + val getLocalBytesMethod = bmClass.getMethod("getLocalBytes", classOf[BlockId]) + + // Spark versions before 2.2.0 declare: + // def getLocalBytes(blockId: BlockId): Option[ChunkedByteBuffer] + // Spark 2.2.0+ declares: + // def getLocalBytes(blockId: BlockId): Option[BlockData] + // Because the latter change happened in the commit that introduced BlockData, + // and because you can't discover the generic type of the return type by reflection, + // distinguish the cases by seeing if BlockData exists. + try { + val blockDataClass = Class.forName("org.apache.spark.storage.BlockData") + // newer method, apply reflection to transform BlockData after invoking + val toByteBufferMethod = blockDataClass.getMethod("toByteBuffer") + (blockId: BlockId) => + getLocalBytesMethod.invoke(SparkEnv.get.blockManager, blockId) + .asInstanceOf[Option[_]] + .map(blockData => toByteBufferMethod.invoke(blockData).asInstanceOf[ByteBuffer]) + } catch { + case _: ClassNotFoundException => + // older method, can be invoked directly + (blockId: BlockId) => + getLocalBytesMethod.invoke(SparkEnv.get.blockManager, blockId) + .asInstanceOf[Option[ChunkedByteBuffer]] + .map(_.toByteBuffer) + } + } + + private val putBytesFn: (BlockId, ChunkedByteBuffer, StorageLevel) => Unit = { + val bmClass = classOf[BlockManager] + // Spark 2.0.0 - 2.1.0, and 2.2.0+ (as of this writing), declare the method: + // def putBytes[T: ClassTag]( + // blockId: BlockId, + // bytes: ChunkedByteBuffer, + // level: StorageLevel, + // tellMaster: Boolean = true): Boolean + val putBytesMethod = + try { + bmClass.getMethod("putBytes", + classOf[BlockId], classOf[ChunkedByteBuffer], classOf[StorageLevel], + classOf[Boolean], classOf[ClassTag[_]]) + } catch { + case _: NoSuchMethodException => + // But Spark 2.1.1 and distros like Cloudera 2.0.0 / 2.1.0 had an extra boolean + // param: + // def putBytes[T: ClassTag]( + // blockId: BlockId, + // bytes: ChunkedByteBuffer, + // level: StorageLevel, + // tellMaster: Boolean = true, + // encrypt: Boolean = false): Boolean + bmClass.getMethod("putBytes", + classOf[BlockId], classOf[ChunkedByteBuffer], classOf[StorageLevel], + classOf[Boolean], classOf[Boolean], classOf[ClassTag[_]]) + } + putBytesMethod.getParameterTypes.length match { + case 5 => + (blockId: BlockId, bytes: ChunkedByteBuffer, level: StorageLevel) => + putBytesMethod.invoke(SparkEnv.get.blockManager, + blockId, bytes, level, JBoolean.TRUE, null) + case 6 => + (blockId: BlockId, bytes: ChunkedByteBuffer, level: StorageLevel) => + putBytesMethod.invoke(SparkEnv.get.blockManager, + blockId, bytes, level, JBoolean.TRUE, JBoolean.FALSE, null) + } + } + + private val unlockFn: (BlockId) => Unit = { + val bimClass = classOf[BlockInfoManager] + // Spark 2.0.0-2.0.2, 2.1.0-2.1.1 declare: + // def unlock(blockId: BlockId): Unit + val unlockMethod = + try { + bimClass.getMethod("unlock", classOf[BlockId]) + } catch { + case _: NoSuchMethodException => + // But 2.0.3+, 2.1.2+, 2.2.0+ declare: + // def unlock(blockId: BlockId, taskAttemptId: Option[TaskAttemptId] = None): Unit + bimClass.getMethod("unlock", classOf[BlockId], classOf[Option[_]]) + } + unlockMethod.getParameterTypes.length match { + case 1 => + (blockId: BlockId) => + unlockMethod.invoke(SparkEnv.get.blockManager.blockInfoManager, blockId) + case 2 => + (blockId: BlockId) => + unlockMethod.invoke(SparkEnv.get.blockManager.blockInfoManager, blockId, None) + } + } + +} diff --git a/spark/spark-version/pom.xml b/spark/spark-version/pom.xml index dbb5ac6555e..f5e0ac30403 100644 --- a/spark/spark-version/pom.xml +++ b/spark/spark-version/pom.xml @@ -22,7 +22,7 @@ org.apache.maven.plugins maven-shade-plugin - 3.0.0 + 3.2.1