diff --git a/scala/common/utils/src/main/scala/org/apache/spark/ml/DLClassifier.scala b/scala/common/utils/src/main/scala/org/apache/spark/ml/DLClassifier.scala deleted file mode 100644 index 689bfc1f8ec..00000000000 --- a/scala/common/utils/src/main/scala/org/apache/spark/ml/DLClassifier.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.ml - -import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric -import com.intel.analytics.bigdl.{Criterion, Module} -import org.apache.spark.ml.util.Identifiable - -import scala.reflect.ClassTag - -/** - * Deprecated. Please refer to package com.intel.analytics.bigdl.dlframes. - * - * [[DLClassifier]] is a specialized [[DLEstimator]] that simplifies the data format for - * classification tasks. It only supports label column of DoubleType. - * and the fitted [[DLClassifierModel]] will have the prediction column of DoubleType. - * - * @param model BigDL module to be optimized - * @param criterion BigDL criterion method - * @param featureSize The size (Tensor dimensions) of the feature data. - */ -@deprecated("`DLClassifier` has been migrated to package `com.intel.analytics.bigdl.dlframes`." + - "This will be removed in BigDL 0.6.", "0.5.0") -class DLClassifier[T: ClassTag]( - @transient override val model: Module[T], - override val criterion : Criterion[T], - override val featureSize : Array[Int], - override val uid: String = Identifiable.randomUID("dlClassifier") - )(implicit ev: TensorNumeric[T]) - extends com.intel.analytics.bigdl.dlframes.DLClassifier[T](model, criterion, featureSize) { - - override protected def wrapBigDLModel( - m: Module[T], featureSize: Array[Int]): DLClassifierModel[T] = { - val dlModel = new DLClassifierModel[T](m, featureSize) - copyValues(dlModel.setParent(this)).asInstanceOf[DLClassifierModel[T]] - } -} - -/** - * Deprecated. Please refer to package com.intel.analytics.bigdl.dlframes. - * - * [[DLClassifierModel]] is a specialized [[DLModel]] for classification tasks. - * The prediction column will have the datatype of Double. - * - * @param model BigDL module to be optimized - * @param featureSize The size (Tensor dimensions) of the feature data. - */ -@deprecated("`DLClassifierModel` is migrated to package `com.intel.analytics.bigdl.dlframes`." + - "This will be removed in BigDL 0.6.", "0.5.0") -class DLClassifierModel[T: ClassTag]( - @transient override val model: Module[T], - featureSize : Array[Int], - override val uid: String = "DLClassifierModel" - )(implicit ev: TensorNumeric[T]) - extends com.intel.analytics.bigdl.dlframes.DLClassifierModel[T](model, featureSize) diff --git a/scala/common/utils/src/main/scala/org/apache/spark/ml/DLEstimator.scala b/scala/common/utils/src/main/scala/org/apache/spark/ml/DLEstimator.scala deleted file mode 100644 index 02ea21f577d..00000000000 --- a/scala/common/utils/src/main/scala/org/apache/spark/ml/DLEstimator.scala +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.ml - -import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric -import com.intel.analytics.bigdl.{Criterion, Module} - -import scala.reflect.ClassTag - - -/** - * Deprecated. Please refer to package com.intel.analytics.bigdl.dlframes. - * - * [[DLEstimator]] helps to train a BigDL Model with the Spark ML Estimator/Transfomer pattern, - * thus Spark users can conveniently fit BigDL into Spark ML pipeline. - * - * [[DLEstimator]] supports feature and label data in the format of - * Array[Double], Array[Float], org.apache.spark.mllib.linalg.{Vector, VectorUDT}, - * org.apache.spark.ml.linalg.{Vector, VectorUDT}, Double and Float. - * - * User should specify the feature data dimensions and label data dimensions via the constructor - * parameters featureSize and labelSize respectively. Internally the feature and label data are - * converted to BigDL tensors, to further train a BigDL model efficiently. - * - * For details usage, please refer to examples in package - * com.intel.analytics.bigdl.example.MLPipeline - * - * @param model BigDL module to be optimized - * @param criterion BigDL criterion method - * @param featureSize The size (Tensor dimensions) of the feature data. e.g. an image may be with - * width * height = 28 * 28, featureSize = Array(28, 28). - * @param labelSize The size (Tensor dimensions) of the label data. - */ -@deprecated("`DLEstimator` has been migrated to package `com.intel.analytics.bigdl.dlframes`." + - "org.apache.spark.ml.DLEstimator will be removed in BigDL 0.6.", "0.5.0") -class DLEstimator[T: ClassTag]( - @transient override val model: Module[T], - override val criterion : Criterion[T], - featureSize : Array[Int], - override val labelSize : Array[Int], - override val uid: String = "DLEstimator")(implicit ev: TensorNumeric[T]) - extends com.intel.analytics.bigdl.dlframes.DLEstimator[T]( - model, criterion, featureSize, labelSize) { - - override protected def wrapBigDLModel(m: Module[T], featureSize: Array[Int]): DLModel[T] = { - val dlModel = new DLModel[T](m, featureSize) - copyValues(dlModel.setParent(this)).asInstanceOf[DLModel[T]] - } -} - - -/** - * Deprecated. Please refer to package com.intel.analytics.bigdl.dlframes. - * - * [[DLModel]] helps embed a BigDL model into a Spark Transformer, thus Spark users can - * conveniently merge BigDL into Spark ML pipeline. - * [[DLModel]] supports feature data in the format of - * Array[Double], Array[Float], org.apache.spark.mllib.linalg.{Vector, VectorUDT}, - * org.apache.spark.ml.linalg.{Vector, VectorUDT}, Double and Float. - * Internally [[DLModel]] use features column as storage of the feature data, and create - * Tensors according to the constructor parameter featureSize. - * - * [[DLModel]] is compatible with both spark 1.5-plus and 2.0 by extending ML Transformer. - * @param model trainned BigDL models to use in prediction. - * @param featureSize The size (Tensor dimensions) of the feature data. (e.g. an image may be with - * featureSize = 28 * 28). - */ -@deprecated("`DLModel` has been migrated to package `com.intel.analytics.bigdl.dlframes`." + - "This will be removed in BigDL 0.6.", "0.5.0") -class DLModel[T: ClassTag]( - @transient override val model: Module[T], - featureSize : Array[Int], - override val uid: String = "DLModel" - )(implicit ev: TensorNumeric[T]) - extends com.intel.analytics.bigdl.dlframes.DLModel[T](model, featureSize) - - -// TODO, add save/load -object DLModel { - -} - diff --git a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/MLPipeline/DLClassifierLeNet.scala b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/MLPipeline/DLClassifierLeNet.scala deleted file mode 100644 index c472a93c729..00000000000 --- a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/MLPipeline/DLClassifierLeNet.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.intel.analytics.bigdl.example.MLPipeline - -import com.intel.analytics.bigdl._ -import com.intel.analytics.bigdl.dataset.image.{BytesToGreyImg, GreyImgNormalizer, GreyImgToBatch} -import com.intel.analytics.bigdl.dataset.{DataSet, DistributedDataSet, MiniBatch, _} -import com.intel.analytics.bigdl.dlframes.DLClassifier -import com.intel.analytics.bigdl.models.lenet.LeNet5 -import com.intel.analytics.bigdl.models.lenet.Utils._ -import com.intel.analytics.bigdl.nn.ClassNLLCriterion -import com.intel.analytics.bigdl.tensor.Tensor -import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric.NumericFloat -import com.intel.analytics.bigdl.utils.{Engine, LoggerFilter} -import org.apache.log4j.{Level, Logger} -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SQLContext - -/** - * An example to show how to use DLEstimator fit to be compatible with ML Pipeline - * refer to README.md on how to run this example - */ -object DLClassifierLeNet { - - LoggerFilter.redirectSparkInfoLogs() - - def main(args: Array[String]): Unit = { - val inputs = Array[String]("Feature data", "Label data") - trainParser.parse(args, new TrainParams()).foreach(param => { - val conf = Engine.createSparkConf() - .setAppName("MLPipeline Example") - .set("spark.task.maxFailures", "1") - val sc = new SparkContext(conf) - val sqLContext = SQLContext.getOrCreate(sc) - Engine.init - - val trainData = param.folder + "/train-images-idx3-ubyte" - val trainLabel = param.folder + "/train-labels-idx1-ubyte" - val validationData = param.folder + "/t10k-images-idx3-ubyte" - val validationLabel = param.folder + "/t10k-labels-idx1-ubyte" - - val trainSet = DataSet.array(load(trainData, trainLabel), sc) -> - BytesToGreyImg(28, 28) -> GreyImgNormalizer(trainMean, trainStd) -> GreyImgToBatch(1) - - val trainingRDD : RDD[Data[Float]] = trainSet. - asInstanceOf[DistributedDataSet[MiniBatch[Float]]].data(false).map(batch => { - val feature = batch.getInput().asInstanceOf[Tensor[Float]] - val label = batch.getTarget().asInstanceOf[Tensor[Float]] - Data[Float](feature.storage().array(), label.storage().array()) - }) - val trainingDF = sqLContext.createDataFrame(trainingRDD).toDF(inputs: _*) - - val model = LeNet5(classNum = 10) - val criterion = ClassNLLCriterion[Float]() - val featureSize = Array(28, 28) - val estimator = new DLClassifier[Float](model, criterion, featureSize) - .setFeaturesCol(inputs(0)) - .setLabelCol(inputs(1)) - .setBatchSize(param.batchSize) - .setMaxEpoch(param.maxEpoch) - val transformer = estimator.fit(trainingDF) - - val validationSet = DataSet.array(load(validationData, validationLabel), sc) -> - BytesToGreyImg(28, 28) -> GreyImgNormalizer(testMean, testStd) -> GreyImgToBatch(1) - - val validationRDD: RDD[Data[Float]] = validationSet. - asInstanceOf[DistributedDataSet[MiniBatch[Float]]].data(false).map{batch => - val feature = batch.getInput().asInstanceOf[Tensor[Float]] - val label = batch.getTarget().asInstanceOf[Tensor[Float]] - Data[Float](feature.storage().array(), label.storage().array()) - } - val validationDF = sqLContext.createDataFrame(validationRDD).toDF(inputs: _*) - val transformed = transformer.transform(validationDF) - transformed.show() - sc.stop() - }) - } -} - -private case class Data[T](featureData : Array[T], labelData : Array[T]) diff --git a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/MLPipeline/DLClassifierLogisticRegression.scala b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/MLPipeline/DLClassifierLogisticRegression.scala deleted file mode 100644 index 7d6f5553dbf..00000000000 --- a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/MLPipeline/DLClassifierLogisticRegression.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.intel.analytics.bigdl.example.MLPipeline - -import com.intel.analytics.bigdl.dlframes.DLClassifier -import com.intel.analytics.bigdl.nn.{ClassNLLCriterion, Linear, LogSoftMax, Sequential} -import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric.NumericFloat -import com.intel.analytics.bigdl.utils.Engine -import org.apache.spark.SparkContext -import org.apache.spark.sql.SQLContext - -/** - * Logistic Regression with BigDL layers and DLClassifier - */ -object DLClassifierLogisticRegression { - - def main(args: Array[String]): Unit = { - val conf = Engine.createSparkConf() - .setAppName("DLClassifierLogisticRegression") - .setMaster("local[1]") - val sc = new SparkContext(conf) - val sqlContext = SQLContext.getOrCreate(sc) - Engine.init - - val model = Sequential().add(Linear(2, 2)).add(LogSoftMax()) - val criterion = ClassNLLCriterion() - val estimator = new DLClassifier(model, criterion, Array(2)) - .setBatchSize(4) - .setMaxEpoch(10) - val data = sc.parallelize(Seq( - (Array(0.0, 1.0), 1.0), - (Array(1.0, 0.0), 2.0), - (Array(0.0, 1.0), 1.0), - (Array(1.0, 0.0), 2.0))) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - val dlModel = estimator.fit(df) - dlModel.transform(df).show(false) - } -} diff --git a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/MLPipeline/DLEstimatorMultiLabelLR.scala b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/MLPipeline/DLEstimatorMultiLabelLR.scala deleted file mode 100644 index 59d5f2e7701..00000000000 --- a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/MLPipeline/DLEstimatorMultiLabelLR.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.intel.analytics.bigdl.example.MLPipeline - -import com.intel.analytics.bigdl.dlframes.DLEstimator -import com.intel.analytics.bigdl.nn._ -import com.intel.analytics.bigdl.optim.LBFGS -import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric.NumericDouble -import com.intel.analytics.bigdl.utils.Engine -import org.apache.spark.SparkContext -import org.apache.spark.sql.SQLContext - -/** - * Multi-label regression with BigDL layers and DLEstimator - */ -object DLEstimatorMultiLabelLR { - - def main(args: Array[String]): Unit = { - val conf = Engine.createSparkConf() - .setAppName("DLEstimatorMultiLabelLR") - .setMaster("local[1]") - val sc = new SparkContext(conf) - val sqlContext = SQLContext.getOrCreate(sc) - Engine.init - - val model = Sequential().add(Linear(2, 2)) - val criterion = MSECriterion() - val estimator = new DLEstimator(model, criterion, Array(2), Array(2)) - .setOptimMethod(new LBFGS[Double]()) - .setLearningRate(1.0) - .setBatchSize(4) - .setMaxEpoch(10) - val data = sc.parallelize(Seq( - (Array(2.0, 1.0), Array(1.0, 2.0)), - (Array(1.0, 2.0), Array(2.0, 1.0)), - (Array(2.0, 1.0), Array(1.0, 2.0)), - (Array(1.0, 2.0), Array(2.0, 1.0)))) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - val dlModel = estimator.fit(df) - dlModel.transform(df).show(false) - } -} diff --git a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/MLPipeline/README.md b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/MLPipeline/README.md deleted file mode 100644 index 059ab505df1..00000000000 --- a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/MLPipeline/README.md +++ /dev/null @@ -1,76 +0,0 @@ -## DLClassifierLogisticRegression - -DLClassifierLogisticRegression example demonstrates how to use BigDL DLClassifier to train a -Logistic Regression Model. DLClassifier extends Spark Estimator and can act as a stage in a -ML Pipeline. The feature column can be Array or Spark Vectors, while the label column data should -be Double. - -## DLEstimatorMultiLabelLR - -DLEstimatorMultiLabelLR example demonstrates how to use BigDL DLEstimator to train a -multi-label Logistic Regression Model. DLEstimator extends Spark Estimator and can act as a -stage in a ML Pipeline. Both the feature and label column can be Array or Spark Vectors. The -feature column may also be Double. - -## DLClassifierLeNet -DLClassifierLeNet example demonstrates how to use BigDL with Spark ML pipeline to train and predict LeNet5 model on MNIST dataset. - -Learn more about Spark ML please refer to -### Preparation - -To start with this example, you need prepare your dataset. - - -1. Prepare dataset - -You can download the MNIST Data from [here](http://yann.lecun.com/exdb/mnist/). Unzip all the -files and put them in one folder(e.g. mnist). - -There're four files. - -**train-images-idx3-ubyte** contains train images. -**train-labels-idx1-ubyte** is train label file. -**t10k-images-idx3-ubyte** has validation images. -**t10k-labels-idx1-ubyte** contains validation labels. - -For more detail, please refer to the download page. - -### Run this example - -Command to run the example in Spark local mode: -``` -spark-submit \ ---master local[physcial_core_number] \ ---class com.intel.analytics.bigdl.example.MLPipeline.DLClassifierLeNet \ -./dist/lib/bigdl-VERSION-jar-with-dependencies.jar \ --f path_to_mnist_folder \ --b batch_size -``` -Command to run the example in Spark standalone mode: -``` -spark-submit \ ---master spark://... \ ---executor-cores cores_per_executor \ ---total-executor-cores total_cores_for_the_job \ ---driver-class-path dist/lib/bigdl-VERSION-jar-with-dependencies.jar \ ---class com.intel.analytics.bigdl.example.MLPipeline.DLClassifierLeNet \ -dist/lib/bigdl-VERSION-jar-with-dependencies.jar \ --f path_to_mnist_folder \ --b batch_size -``` -Command to run the example in Spark yarn mode: -``` ---master yarn \ ---deploy-mode client \ ---executor-cores cores_per_executor \ ---num-executors executors_number \ ---driver-class-path dist/lib/bigdl-VERSION-jar-with-dependencies.jar \ ---class com.intel.analytics.bigdl.example.MLPipeline.DLClassifierLeNet \ -dist/lib/bigdl-VERSION-jar-with-dependencies.jar \ --f path_to_mnist_folder \ --b batch_size -``` -where - -* -f: where you put your MNIST data -* -b: The mini-batch size. It is expected that the mini-batch size is a multiple of node_number * core_number diff --git a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/dlframes/imageInference/ImageInference.scala b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/dlframes/imageInference/ImageInference.scala deleted file mode 100644 index f5f9041b481..00000000000 --- a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/dlframes/imageInference/ImageInference.scala +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.intel.analytics.bigdl.example.dlframes.imageInference - -import com.intel.analytics.bigdl.dlframes.{DLClassifierModel, DLModel} -import org.apache.spark.sql.DataFrame -import scopt.OptionParser -import com.intel.analytics.bigdl.dataset.Sample -import com.intel.analytics.bigdl.nn.Module -import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric.NumericFloat -import com.intel.analytics.bigdl.transform.vision.image.augmentation._ -import com.intel.analytics.bigdl.transform.vision.image._ -import com.intel.analytics.bigdl.utils.Engine -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SQLContext - -object ImageInference { - - def main(args: Array[String]): Unit = { - - val defaultParams = Utils.LocalParams() - Utils.parser.parse(args, defaultParams).map { params => - - val conf = Engine.createSparkConf().setAppName("ModelInference") - val sc = SparkContext.getOrCreate(conf) - val sqlContext = new SQLContext(sc) - Engine.init - - val imagesDF = Utils.loadImages(params.folder, params.batchSize, sqlContext).cache() - - imagesDF.show(10) - imagesDF.printSchema() - - val model = Module.loadCaffeModel[Float](params.caffeDefPath, params.modelPath) - val dlmodel: DLModel[Float] = new DLClassifierModel[Float]( - model, Array(3, 224, 224)) - .setBatchSize(params.batchSize) - .setFeaturesCol("features") - .setPredictionCol("prediction") - - val count = imagesDF.count().toInt - val tranDF = dlmodel.transform(imagesDF.limit(count)) - - tranDF.select("imageName", "prediction").show(100, false) - } - } -} - -object Utils { - - case class LocalParams(caffeDefPath: String = " ", - modelPath: String = " ", - folder: String = " ", - batchSize: Int = 16, - nEpochs: Int = 10 - ) - - val defaultParams = LocalParams() - - val parser = new OptionParser[LocalParams]("BigDL Example") { - opt[String]("caffeDefPath") - .text(s"caffeDefPath") - .action((x, c) => c.copy(caffeDefPath = x)) - opt[String]("modelPath") - .text(s"modelPath") - .action((x, c) => c.copy(modelPath = x)) - opt[String]("folder") - .text(s"folder") - .action((x, c) => c.copy(folder = x)) - opt[Int]('b', "batchSize") - .text(s"batchSize") - .action((x, c) => c.copy(batchSize = x.toInt)) - opt[Int]('e', "nEpochs") - .text("epoch numbers") - .action((x, c) => c.copy(nEpochs = x)) - } - - def loadImages(path: String, partitionNum: Int, sqlContext: SQLContext): DataFrame = { - - val imageFrame: ImageFrame = ImageFrame.read(path, sqlContext.sparkContext) - val transformer = Resize(256, 256) -> CenterCrop(224, 224) -> - ChannelNormalize(123, 117, 104, 1, 1, 1) -> MatToTensor() -> ImageFrameToSample() - val transformed: ImageFrame = transformer(imageFrame) - val imageRDD = transformed.toDistributed().rdd.map { im => - (im.uri, im[Sample[Float]](ImageFeature.sample).getData()) - } - val imageDF = sqlContext.createDataFrame(imageRDD) - .withColumnRenamed("_1", "imageName") - .withColumnRenamed("_2", "features") - imageDF - } - -} diff --git a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/dlframes/imageInference/README.md b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/dlframes/imageInference/README.md deleted file mode 100644 index 6a837109c3f..00000000000 --- a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/dlframes/imageInference/README.md +++ /dev/null @@ -1,66 +0,0 @@ -## Overview - Deep Learning Frames provides high-level APIs for scalable deep learning in Scala with Apache Spark. - The current version of Deep Learning Frames provides a suite of tools around working with and processing images using deep learning. - This exmaple demostrates how to use BigDL to apply popular iamge deep learning models at scale. - -## Image Model Inference - 1. You can apply your own or known popular models to image data to make predictions or transform them into features. - - val imagesDF = loadImages(param.folder, param.batchSize, spark.sqlContext) - val model = Module.loadCaffeModel[Float](param.caffeDefPath, param.modelPath) - val dlmodel: DLModel[Float] = new DLClassifierModel[Float]( - model, Array(3, 224, 224)) - .setBatchSize(param.batchSize) - .setFeaturesCol("features") - .setPredictionCol("predict") - - val tranDF = dlmodel.transform(imagesDF) - - tranDF.select("predict", "imageName").show(5) - - 2. You can run the full ModelInference example by following steps. - - 2.1 Prepare pre-trained model and defenition file. - Download [caffe inception v1](http://dl.caffe.berkeleyvision.org/bvlc_googlenet.caffemodel) and [deploy.proxfile](https://github.com/BVLC/caffe/blob/master/models/bvlc_googlenet/deploy.prototxt) - then put the trained model in $modelPath, and set corresponding $caffeDefPath. - - 2.2 Prepare predict dataset - Put your image data for prediction in the ./predict folder. Alternatively, you may also use imagenet-2012 validation dataset to run the example, which can be found from . After you download the file (ILSVRC2012_img_val.tar), run the follow commands to prepare the data. - - ```bash - mkdir predict - tar -xvf ILSVRC2012_img_val.tar -C ./folder/ - ``` - - 2.3 Run this example - - Command to run the example in Spark local mode: - ``` - spark-submit \ - --master local[physcial_core_number] \ - --driver-memory 10g --executor-memory 20g \ - --class com.intel.analytics.bigdl.example.DLFrames.ImageInference \ - ./dist/lib/bigdl-VERSION-jar-with-dependencies.jar \ - --modelPath ./model/bvlc_googlenet.caffemodel \ - --caffeDefPath ./model/deploy.prototxt \ - --batchSize 32 \ - --folder ./predict \ - --nEpochs 10 - - ``` - - Command to run the example in Spark yarn mode(TODO): - ``` - spark-submit \ - --master yarn \ - --deploy-mode client \ - --executor-cores 8 \ - --num-executors 4 \ - --class com.intel.analytics.bigdl.example.DLFrames.ImageInference \ - ./dist/lib/bigdl-VERSION-jar-with-dependencies.jar \ - --modelPath ./model/bvlc_googlenet.caffemodel \ - --caffeDefPath ./model/deploy.prototxt \ - --batchSize 32 \ - --folder ./predict \ - --nEpochs 10 - ``` \ No newline at end of file diff --git a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/dlframes/imageTransferLearning/ImageTransferLearning.scala b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/dlframes/imageTransferLearning/ImageTransferLearning.scala deleted file mode 100644 index ddaa84df580..00000000000 --- a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/dlframes/imageTransferLearning/ImageTransferLearning.scala +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.intel.analytics.bigdl.example.dlframes.imageTransferLearning - -import com.intel.analytics.bigdl.dataset.Sample -import com.intel.analytics.bigdl.dlframes.{DLClassifier, DLModel} -import com.intel.analytics.bigdl.nn._ -import com.intel.analytics.bigdl.nn.abstractnn.{AbstractModule, Activity} -import com.intel.analytics.bigdl.transform.vision.image._ -import com.intel.analytics.bigdl.transform.vision.image.augmentation._ -import com.intel.analytics.bigdl.utils.Engine -import org.apache.log4j.{Level, Logger} -import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator - -import org.apache.spark.ml.{Pipeline, Transformer} -import org.apache.spark.sql.functions.{col, udf} -import org.apache.spark.sql.{DataFrame, SQLContext} -import scopt.OptionParser -import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric.NumericFloat -import org.apache.spark.SparkContext - -object ImageTransferLearning { - - def main(args: Array[String]): Unit = { - - val defaultParams = Utils.LocalParams() - - Utils.parser.parse(args, defaultParams).map { params => - - val conf = Engine.createSparkConf().setAppName("TransferLearning") - val sc = SparkContext.getOrCreate(conf) - val sqlContext = new SQLContext(sc) - Engine.init - - val createLabel = udf((name: String) => if (name.contains("cat")) 1.0 else 2.0) - val imagesDF: DataFrame = Utils.loadImages(params.folder, params.batchSize, sqlContext) - .withColumn("label", createLabel(col("imageName"))) - .withColumnRenamed("features", "imageFeatures") - .drop("features") - - val Array(validationDF, trainingDF) = imagesDF.randomSplit(Array(0.20, 0.80), seed = 1L) - - validationDF.persist() - trainingDF.persist() - - val loadedModel = Module - .loadCaffeModel[Float](params.caffeDefPath, params.modelPath) - - val featurizer = new DLModel[Float](loadedModel, Array(3, 224, 224)) - .setBatchSize(params.batchSize) - .setFeaturesCol("imageFeatures") - .setPredictionCol("features") - - val lrModel = Sequential().add(Linear(1000, 2)).add(LogSoftMax()) - - val classifier = new DLClassifier(lrModel, ClassNLLCriterion[Float](), Array(1000)) - .setLearningRate(0.003).setBatchSize(params.batchSize) - .setMaxEpoch(20) - - val pipeline = new Pipeline().setStages( - Array(featurizer, classifier)) - - val pipelineModel = pipeline.fit(trainingDF) - trainingDF.unpersist() - - val predictions = pipelineModel.transform(validationDF) - - predictions.show(200) - predictions.printSchema() - - val evaluation = new MulticlassClassificationEvaluator().setPredictionCol("prediction") - .setMetricName("weightedPrecision").evaluate(predictions) - println("evaluation result on validationDF: " + evaluation) - - validationDF.unpersist() - } - } - -} - - -object Utils { - - case class LocalParams(caffeDefPath: String = " ", - modelPath: String = " ", - folder: String = " ", - batchSize: Int = 16, - nEpochs: Int = 10 - ) - - val defaultParams = LocalParams() - - val parser = new OptionParser[LocalParams]("BigDL Example") { - opt[String]("caffeDefPath") - .text(s"caffeDefPath") - .action((x, c) => c.copy(caffeDefPath = x)) - opt[String]("modelPath") - .text(s"modelPath") - .action((x, c) => c.copy(modelPath = x)) - opt[String]("folder") - .text(s"folder") - .action((x, c) => c.copy(folder = x)) - opt[Int]('b', "batchSize") - .text(s"batchSize") - .action((x, c) => c.copy(batchSize = x.toInt)) - opt[Int]('e', "nEpochs") - .text("epoch numbers") - .action((x, c) => c.copy(nEpochs = x)) - } - - def loadImages(path: String, partitionNum: Int, sqlContext: SQLContext): DataFrame = { - - val imageFrame: ImageFrame = ImageFrame.read(path, sqlContext.sparkContext) - val transformer = Resize(256, 256) -> CenterCrop(224, 224) -> - ChannelNormalize(123, 117, 104, 1, 1, 1) -> MatToTensor() -> ImageFrameToSample() - val transformed: ImageFrame = transformer(imageFrame) - val imageRDD = transformed.toDistributed().rdd.map { im => - (im.uri, im[Sample[Float]](ImageFeature.sample).getData()) - } - val imageDF = sqlContext.createDataFrame(imageRDD) - .withColumnRenamed("_1", "imageName") - .withColumnRenamed("_2", "features") - imageDF - } - -} diff --git a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/dlframes/imageTransferLearning/README.md b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/dlframes/imageTransferLearning/README.md deleted file mode 100644 index de5b505f353..00000000000 --- a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/dlframes/imageTransferLearning/README.md +++ /dev/null @@ -1,76 +0,0 @@ -## Overview - Deep Learning Frames provides high-level APIs for scalable deep learning in Scala with Apache Spark. - The current version of Deep Learning Frames provides a suite of tools around working with and processing images using deep learning. - this exmaple demostrates how to use BigDL for transfer learning. - -## Transfer Learning - 1. DLFrames provides utilities to perform transfer learning on images, which is one of the fastest (code and run-time-wise) ways to start using deep learning. - - val imagesDF: DataFrame = Utils.loadImages(params.folder, params.batchSize, spark.sqlContext) - .withColumn("label", createLabel(col("imageName"))) - .withColumnRenamed("features", "imageFeatures") - val Array(validationDF, trainingDF) = imagesDF.randomSplit(Array(0.90, 0.10), seed = 1L) - - val loadedModel: AbstractModule[Activity, Activity, Float] = Module - .loadCaffeModel[Float](params.caffeDefPath, params.modelPath) - val featurizer = new DLModel[Float](loadedModel, Array(3, 224, 224)) - .setFeaturesCol("imageFeatures") - .setPredictionCol("tmp1") - - val lrModel = Sequential().add(Linear(1000, 2)).add(LogSoftMax()) - val classifier = new DLClassifier(lrModel, ClassNLLCriterion[Float](), Array(1000)) - .setLearningRate(0.003).setBatchSize(params.batchSize) - .setMaxEpoch(20) - - val pipeline = new Pipeline().setStages( - Array(featurizer, classifier)) - - val pipelineModel = pipeline.fit(trainingDF) - val predictions = pipelineModel.transform(validationDF) - - 2. You can run the full ImageTransferLearning example by following steps. - - 2.1 Prepare pre-trained model and defenition file. - Download [caffe inception v1](http://dl.caffe.berkeleyvision.org/bvlc_googlenet.caffemodel) and [deploy.proxfile](https://github.com/BVLC/caffe/blob/master/models/bvlc_googlenet/deploy.prototxt) - then put the trained model in $modelPath, and set corresponding $caffeDefPath. - - 2.2 Prepare dataset - Put your image data for training and validation in the ./data folder. Alternatively, you may also use kaggle [Dogs vs. Cats](https://www.kaggle.com/c/dogs-vs-cats/data) train dataset to run the example. After you download the file (train.zip), run the follow commands to prepare the data. - - ``` - bash - mkdir data - unzip -xvf train.tar -C ./data/ - ``` - - 2.3 Run this example - - Command to run the example in Spark local mode: - ``` - spark-submit \ - --master local[physcial_core_number] \ - --driver-memory 10g --executor-memory 20g \ - --class com.intel.analytics.bigdl.example.DLFrames.imageTransferLearning.ImageTransferLearning \ - ./dist/lib/bigdl-VERSION-jar-with-dependencies.jar \ - --modelPath ./model/bvlc_googlenet.caffemodel \ - --caffeDefPath ./model/deploy.prototxt \ - --batchSize 32 \ - --folder ./data \ - --nEpochs 10 - ``` - - Command to run the example in Spark yarn mode(TODO): - ``` - spark-submit \ - --master yarn \ - --deploy-mode client \ - --executor-cores 8 \ - --num-executors 4 \ - --class com.intel.analytics.bigdl.example.DLFrames.imageTransferLearning.ImageTransferLearning \ - ./dist/lib/bigdl-VERSION-jar-with-dependencies.jar \ - --modelPath ./model/bvlc_googlenet.caffemodel \ - --caffeDefPath ./model/deploy.prototxt \ - --batchSize 32 \ - --folder ./data \ - --nEpochs 10 - ``` \ No newline at end of file diff --git a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/imageclassification/ImagePredictor.scala b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/imageclassification/ImagePredictor.scala deleted file mode 100644 index 8500eee8605..00000000000 --- a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/imageclassification/ImagePredictor.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.intel.analytics.bigdl.example.imageclassification - -import java.nio.file.Paths - -import com.intel.analytics.bigdl.dataset.image._ -import com.intel.analytics.bigdl.dlframes.DLClassifierModel -import com.intel.analytics.bigdl.example.imageclassification.MlUtils._ -import com.intel.analytics.bigdl.numeric.NumericFloat -import com.intel.analytics.bigdl.utils.{Engine, LoggerFilter} -import org.apache.log4j.{Level, Logger} -import org.apache.spark.SparkContext -import org.apache.spark.sql.SQLContext - -/** - * An example to show how to use DLClassifier Transform - */ -object ImagePredictor { - LoggerFilter.redirectSparkInfoLogs() - Logger.getLogger("com.intel.analytics.bigdl.example").setLevel(Level.INFO) - - def main(args: Array[String]): Unit = { - predictParser.parse(args, new PredictParams()).map(param => { - val conf = Engine.createSparkConf() - conf.setAppName("Predict with trained model") - val sc = new SparkContext(conf) - Engine.init - val sqlContext = new SQLContext(sc) - - val partitionNum = Engine.nodeNumber() * Engine.coreNumber() - val model = loadModel(param) - val valTrans = new DLClassifierModel(model, Array(3, imageSize, imageSize)) - .setBatchSize(param.batchSize) - .setFeaturesCol("features") - .setPredictionCol("predict") - - val valRDD = if (param.isHdfs) { - // load image set from hdfs - imagesLoadSeq(param.folder, sc, param.classNum).coalesce(partitionNum, true) - } else { - // load image set from local - val paths = LocalImageFiles.readPaths(Paths.get(param.folder), hasLabel = false) - sc.parallelize(imagesLoad(paths, 256), partitionNum) - } - - val transf = RowToByteRecords() -> - BytesToBGRImg() -> - BGRImgCropper(imageSize, imageSize) -> - BGRImgNormalizer(testMean, testStd) -> - BGRImgToImageVector() - - val valDF = transformDF(sqlContext.createDataFrame(valRDD), transf) - - valTrans.transform(valDF) - .select("imageName", "predict") - .collect() - .take(param.showNum) - .foreach(println) - sc.stop() - }) - } -} diff --git a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/utils/python/api/PythonBigDL.scala b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/utils/python/api/PythonBigDL.scala index 42fb5dd913b..b9da56db713 100644 --- a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/utils/python/api/PythonBigDL.scala +++ b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/utils/python/api/PythonBigDL.scala @@ -34,7 +34,6 @@ import java.lang.{Boolean => JBoolean} import java.nio.ByteOrder import com.intel.analytics.bigdl.dataset.image.{CropCenter, CropRandom, CropperMethod} -import com.intel.analytics.bigdl.dlframes._ import com.intel.analytics.bigdl.nn.Graph._ import com.intel.analytics.bigdl.nn.keras.{KerasLayer, KerasModel} import com.intel.analytics.bigdl.optim.SGD.{LearningRateSchedule, SequentialSchedule} @@ -2704,86 +2703,6 @@ class PythonBigDL[T: ClassTag](implicit ev: TensorNumeric[T]) extends Serializab module.quantize() } - def createDLEstimator(model: Module[T], criterion: Criterion[T], - featureSize: JArrayList[Int], - labelSize: JArrayList[Int]): DLEstimator[T] = { - new DLEstimator[T](model, criterion, featureSize.asScala.toArray, labelSize.asScala.toArray) - } - - def createDLClassifier(model: Module[T], criterion: Criterion[T], - featureSize: JArrayList[Int], - labelSize: JArrayList[Int]): DLClassifier[T] = { - new DLClassifier[T](model, criterion, featureSize.asScala.toArray) - } - - def fitEstimator(estimator: DLEstimator[T], dataSet: DataFrame): DLModel[T] = { - estimator.fit(dataSet) - } - - def fitClassifier(classifier: DLClassifier[T], dataSet: DataFrame): DLModel[T] = { - classifier.fit(dataSet) - } - - def setBatchSizeDLEstimator(estimator: DLEstimator[T], batchSize: Int): DLEstimator[T] = { - estimator.setBatchSize(batchSize) - } - - def setBatchSizeDLClassifier(classifier: DLClassifier[T], batchSize: Int): DLClassifier[T] = { - classifier.setBatchSize(batchSize) - } - - def setMaxEpochDLEstimator(estimator: DLEstimator[T], maxEpoch: Int): DLEstimator[T] = { - estimator.setMaxEpoch(maxEpoch) - } - - def setMaxEpochDLClassifier(classifier: DLClassifier[T], maxEpoch: Int): DLClassifier[T] = { - classifier.setMaxEpoch(maxEpoch) - } - - def setLearningRateDLEstimator(estimator: DLEstimator[T], lr: Double): DLEstimator[T] = { - estimator.setLearningRate(lr) - } - - def setLearningRateDLClassifier(classifier: DLClassifier[T], lr: Double): DLClassifier[T] = { - classifier.setLearningRate(lr) - } - - def createDLModel(model: Module[T], featureSize: JArrayList[Int]): DLModel[T] = { - new DLModel[T](model, featureSize.asScala.toArray) - } - - def createDLClassifierModel(model: Module[T], - featureSize: JArrayList[Int]): DLClassifierModel[T] = { - new DLClassifierModel[T](model, featureSize.asScala.toArray) - } - - def dlModelTransform(dlModel: DLModel[T], dataSet: DataFrame): DataFrame = { - dlModel.transform(dataSet) - } - - def dlClassifierModelTransform(dlClassifierModel: DLClassifierModel[T], - dataSet: DataFrame): DataFrame = { - dlClassifierModel.transform(dataSet) - } - - def setFeatureSizeDLModel(dlModel: DLModel[T], featureSize: JArrayList[Int]): DLModel[T] = { - dlModel.setFeatureSize(featureSize.asScala.toArray) - } - - def setFeatureSizeDLClassifierModel(dlClassifierModel: DLClassifierModel[T], - featureSize: JArrayList[Int]): DLClassifierModel[T] = { - dlClassifierModel.setFeatureSize(featureSize.asScala.toArray) - } - - def setBatchSizeDLModel(dlModel: DLModel[T], batchSize: Int): DLModel[T] = { - dlModel.setBatchSize(batchSize) - } - - def setBatchSizeDLClassifierModel(dlClassifierModel: DLClassifierModel[T], - batchSize: Int): DLClassifierModel[T] = { - dlClassifierModel.setBatchSize(batchSize) - } - def findGraphNode(model: Graph[T], name: String): ModuleNode[T] = { model.node(name) } @@ -3319,19 +3238,6 @@ class PythonBigDL[T: ClassTag](implicit ev: TensorNumeric[T]) extends Serializab DataSet.imageFrame(imageFrame) } - def dlReadImage(path: String, sc: JavaSparkContext, minParitions: Int): DataFrame = { - val df = DLImageReader.readImages(path, sc.sc, minParitions) - df - } - - def createDLImageTransformer(transformer: FeatureTransformer): DLImageTransformer = { - new DLImageTransformer(transformer) - } - - def dlImageTransform(dlImageTransformer: DLImageTransformer, dataSet: DataFrame): DataFrame = { - dlImageTransformer.transform(dataSet) - } - def getRealClassNameOfJValue(module: AbstractModule[Activity, Activity, T]): String = { module.getClass.getCanonicalName } diff --git a/scala/dllib/src/test/scala/com/intel/analytics/bigdl/dllib/optim/DLClassifierSpec.scala b/scala/dllib/src/test/scala/com/intel/analytics/bigdl/dllib/optim/DLClassifierSpec.scala deleted file mode 100644 index 59480fb31e3..00000000000 --- a/scala/dllib/src/test/scala/com/intel/analytics/bigdl/dllib/optim/DLClassifierSpec.scala +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.intel.analytics.bigdl.optim - -import com.intel.analytics.bigdl.models.lenet.LeNet5 -import com.intel.analytics.bigdl.nn._ -import com.intel.analytics.bigdl.tensor.Tensor -import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric.NumericFloat -import com.intel.analytics.bigdl.utils.Engine -import com.intel.analytics.bigdl.utils.RandomGenerator.RNG -import com.intel.analytics.bigdl.visualization.ValidationSummary -import org.apache.log4j.{Level, Logger} -import org.apache.spark.ml.feature.MinMaxScaler -import org.apache.spark.SparkContext -import org.apache.spark.ml._ -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.sql.{DataFrame, SQLContext} -import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} - -import scala.collection.mutable.ArrayBuffer -import scala.util.Random - -@deprecated("`DLClassifier` has been migrated to package `com.intel.analytics.bigdl.dlframes`." + - "This will be removed in BigDL 0.6.", "0.5.0") -class DLClassifierSpec extends FlatSpec with Matchers with BeforeAndAfter { - var sc : SparkContext = _ - var sqlContext : SQLContext = _ - var smallData: Seq[(Array[Double], Double)] = _ - val nRecords = 100 - val maxEpoch = 20 - - before { - val conf = Engine.createSparkConf().setAppName("Test DLEstimator").setMaster("local[1]") - sc = SparkContext.getOrCreate(conf) - sqlContext = new SQLContext(sc) - Random.setSeed(42) - RNG.setSeed(42) - smallData = DLEstimatorSpec.generateTestInput( - nRecords, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0), -1.0, 42L) - Engine.init - } - - after{ - if (sc != null) { - sc.stop() - } - } - - "An DLClassifier" should "has correct default params" in { - val model = Linear[Float](10, 1) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLClassifier[Float](model, criterion, Array(10)) - assert(estimator.getFeaturesCol == "features") - assert(estimator.getLabelCol == "label") - assert(estimator.getMaxEpoch == 50) - assert(estimator.getBatchSize == 1) - assert(estimator.getLearningRate == 1e-3) - assert(estimator.getLearningRateDecay == 0) - } - - "An DLClassifier" should "get reasonale accuracy" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val classifier = new DLClassifier[Float](model, criterion, Array(6)) - .setOptimMethod(new LBFGS[Float]()) - .setLearningRate(0.1) - .setBatchSize(nRecords) - .setMaxEpoch(maxEpoch) - val data = sc.parallelize(smallData) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - - val dlModel = classifier.fit(df) - dlModel.isInstanceOf[DLClassifierModel[_]] should be(true) - assert(dlModel.transform(df).where("prediction=label").count() > nRecords * 0.8) - } - - "An DLClassifier" should "support different FEATURE types" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val classifier = new DLClassifier[Float](model, criterion, Array(6)) - .setLearningRate(0.1) - .setBatchSize(2) - .setEndWhen(Trigger.maxIteration(2)) - - Array( - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, p._2)))) - .toDF("features", "label"), // Array[Double] - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.map(_.toFloat), p._2)))) - .toDF("features", "label"), // Array[Float] - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (Vectors.dense(p._1), p._2)))) - .toDF("features", "label") // MLlib Vector - // TODO: add ML Vector when ut for Spark 2.0+ is ready - ).foreach { df => - val dlModel = classifier.fit(df) - dlModel.transform(df).collect() - } - } - - "An DLClassifier" should "support scalar FEATURE" in { - val model = new Sequential().add(Linear[Float](1, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val classifier = new DLClassifier[Float](model, criterion, Array(1)) - .setLearningRate(0.1) - .setBatchSize(2) - .setEndWhen(Trigger.maxIteration(2)) - - Array( - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.head.toFloat, p._2)))) - .toDF("features", "label"), // Float - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.head, p._2)))) - .toDF("features", "label") // Double - // TODO: add ML Vector when ut for Spark 2.0+ is ready - ).foreach { df => - val dlModel = classifier.fit(df) - dlModel.transform(df).collect() - } - } - - "An DLClassifier" should "fit with adam and LBFGS" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - Seq(new LBFGS[Float], new Adam[Float]).foreach { optimMethod => - val classifier = new DLClassifier[Float](model, criterion, Array(6)) - .setBatchSize(nRecords) - .setMaxEpoch(2) - .setOptimMethod(optimMethod) - .setLearningRate(0.1) - val data = sc.parallelize(smallData) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - val dlModel = classifier.fit(df) - dlModel.isInstanceOf[DLClassifierModel[_]] should be(true) - } - } - - "An DLClassifier" should "supports validation data and summary" in { - val data = sc.parallelize(smallData) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - - val logdir = com.google.common.io.Files.createTempDir() - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val classifier = new DLClassifier[Float](model, criterion, Array(6)) - .setBatchSize(nRecords) - .setEndWhen(Trigger.maxIteration(5)) - .setOptimMethod(new Adam[Float]) - .setLearningRate(0.1) - .setValidation(Trigger.severalIteration(1), df, Array(new Loss[Float]()), 2) - .setValidationSummary(ValidationSummary(logdir.getPath, "DLEstimatorValidation")) - - classifier.fit(df) - val validationSummary = classifier.getValidationSummary.get - val losses = validationSummary.readScalar("Loss") - validationSummary.close() - logdir.deleteOnExit() - } - - "An DLClassifier" should "get the same classification result with BigDL model" in { - Logger.getLogger("org").setLevel(Level.WARN) - Logger.getLogger("akka").setLevel(Level.WARN) - - val model = LeNet5(10) - - // init - val valTrans = new DLClassifierModel[Float](model, Array(28, 28)) - .setBatchSize(4) - - val tensorBuffer = new ArrayBuffer[Data]() - // generate test data with BigDL - val input = Tensor[Float](10, 28, 28).apply1(e => Random.nextFloat()) - val target = model.forward(input).toTensor[Float] - - // test against DLClassifierModel - val inputArr = input.storage().array() - val targetArr = target.max(2)._2.squeeze().storage().array() - (0 until 10).foreach(i => - tensorBuffer.append( - Data(targetArr(i), inputArr.slice(i * 28 * 28, (i + 1) * 28 * 28).map(_.toDouble)))) - val rowRDD = sc.parallelize(tensorBuffer) - val testData = sqlContext.createDataFrame(rowRDD) - assert(valTrans.transform(testData).where("prediction=label").count() == testData.count()) - tensorBuffer.clear() - } - - "An DLClassifier" should "works in ML pipeline" in { - var appSparkVersion = org.apache.spark.SPARK_VERSION - if (appSparkVersion.trim.startsWith("1")) { - val data = sc.parallelize( - smallData.map(p => (org.apache.spark.mllib.linalg.Vectors.dense(p._1), p._2))) - val df: DataFrame = sqlContext.createDataFrame(data).toDF("features", "label") - - val scaler = new MinMaxScaler().setInputCol("features").setOutputCol("scaled") - .setMax(1).setMin(-1) - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLClassifier[Float](model, criterion, Array(6)) - .setBatchSize(nRecords) - .setOptimMethod(new LBFGS[Float]()) - .setLearningRate(0.1) - .setMaxEpoch(maxEpoch) - .setFeaturesCol("scaled") - val pipeline = new Pipeline().setStages(Array(scaler, estimator)) - - val pipelineModel = pipeline.fit(df) - pipelineModel.isInstanceOf[PipelineModel] should be(true) - assert(pipelineModel.transform(df).where("prediction=label").count() > nRecords * 0.8) - } - } -} - -private case class Data(label: Double, features: Array[Double]) diff --git a/scala/dllib/src/test/scala/com/intel/analytics/bigdl/dllib/optim/DLEstimatorSpec.scala b/scala/dllib/src/test/scala/com/intel/analytics/bigdl/dllib/optim/DLEstimatorSpec.scala deleted file mode 100644 index 60712075202..00000000000 --- a/scala/dllib/src/test/scala/com/intel/analytics/bigdl/dllib/optim/DLEstimatorSpec.scala +++ /dev/null @@ -1,351 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.intel.analytics.bigdl.optim - -import com.intel.analytics.bigdl.nn._ -import com.intel.analytics.bigdl.tensor.Tensor -import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric.NumericFloat -import com.intel.analytics.bigdl.utils.Engine -import com.intel.analytics.bigdl.utils.RandomGenerator.RNG -import com.intel.analytics.bigdl.visualization.{TrainSummary, ValidationSummary} -import org.apache.log4j.{Level, Logger} -import org.apache.spark.SparkContext -import org.apache.spark.ml.feature.MinMaxScaler -import org.apache.spark.ml.{DLEstimator, DLModel, Pipeline, PipelineModel} -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Row, SQLContext} -import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} - -import scala.util.Random - -@deprecated("`DLEstimator` has been migrated to package `com.intel.analytics.bigdl.dlframes`." + - "This will be removed in BigDL 0.6.", "0.5.0") -class DLEstimatorSpec extends FlatSpec with Matchers with BeforeAndAfter { - val model = new Sequential[Float]() - var sc : SparkContext = _ - var sqlContext : SQLContext = _ - var smallData: Seq[(Array[Double], Double)] = _ - val nRecords = 100 - val maxEpoch = 20 - - before { - Random.setSeed(42) - RNG.setSeed(42) - val conf = Engine.createSparkConf().setAppName("Test DLEstimator").setMaster("local[1]") - sc = SparkContext.getOrCreate(conf) - sqlContext = new SQLContext(sc) - smallData = DLEstimatorSpec.generateTestInput( - nRecords, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0), -1.0, 42L) - Engine.init - } - - after{ - if (sc != null) { - sc.stop() - } - } - - "An DLEstimator" should "has correct default params" in { - val model = Linear[Float](10, 1) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(10), Array(1)) - assert(estimator.getFeaturesCol == "features") - assert(estimator.getLabelCol == "label") - assert(estimator.getMaxEpoch == 50) - assert(estimator.getBatchSize == 1) - assert(estimator.getLearningRate == 1e-3) - assert(estimator.getLearningRateDecay == 0) - - } - - "An DLEstimator" should "get reasonable accuracy" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setBatchSize(nRecords) - .setOptimMethod(new LBFGS[Float]()) - .setLearningRate(0.1) - .setMaxEpoch(maxEpoch) - val data = sc.parallelize(smallData) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - - val dlModel = estimator.fit(df) - dlModel.isInstanceOf[DLModel[_]] should be(true) - val correct = dlModel.transform(df).select("label", "prediction").rdd.filter { - case Row(label: Double, prediction: Seq[_]) => - label == prediction.indexOf(prediction.asInstanceOf[Seq[Double]].max) + 1 - }.count() - assert(correct > nRecords * 0.8) - } - - "An DLEstimator" should "support different FEATURE types" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setBatchSize(2) - // intentionally set low since this only validates data format compatibility - .setEndWhen(Trigger.maxIteration(1)) - - Array( - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, p._2)))) - .toDF("features", "label"), // Array[Double] - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.map(_.toFloat), p._2)))) - .toDF("features", "label"), // Array[Float] - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (Vectors.dense(p._1), p._2)))) - .toDF("features", "label") // MLlib Vector - // TODO: add ML Vector when ut for Spark 2.0+ is ready - ).foreach { df => - val dlModel = estimator.fit(df) - dlModel.transform(df).collect() - } - } - - "An DLEstimator" should "support scalar FEATURE types" in { - val model = new Sequential().add(Linear[Float](1, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(1), Array(1)) - .setBatchSize(2) - // intentionally set low since this only validates data format compatibility - .setEndWhen(Trigger.maxIteration(1)) - - Array( - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.head.toFloat, p._2)))) - .toDF("features", "label"), // Float - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.head, p._2)))) - .toDF("features", "label") // Double - // TODO: add ML Vector when ut for Spark 2.0+ is ready - ).foreach { df => - val dlModel = estimator.fit(df) - dlModel.transform(df).collect() - } - } - - "An DLEstimator" should "support different LABEL types" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = MultiLabelSoftMarginCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(2)) - // intentionally set low since this only validates data format compatibitliy - .setEndWhen(Trigger.maxIteration(1)) - .setBatchSize(2) - - Array( - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, Array(p._2, p._2))))) - .toDF("features", "label"), // Array[Double] - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, - Array(p._2.toFloat, p._2.toFloat))))).toDF("features", "label"), // Array[Float] - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, - Vectors.dense(p._2, p._2))))).toDF("features", "label") // MLlib Vector - // TODO: add ML Vector when ut for Spark 2.0+ is ready - ).foreach { df => - val dlModel = estimator.fit(df) - dlModel.transform(df).collect() - } - } - - "An DLEstimator" should "support scalar LABEL types" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - // intentionally set low since this only validates data format compatibitliy - .setEndWhen(Trigger.maxIteration(1)) - .setBatchSize(2) - - Array( - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, p._2.toFloat)))) - .toDF("features", "label"), // Float - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, p._2)))) - .toDF("features", "label") // Double - // TODO: add ML Vector when ut for Spark 2.0+ is ready - ).foreach { df => - val dlModel = estimator.fit(df) - dlModel.transform(df).collect() - } - } - - "An DLEstimator" should "work with tensor data" in { - - val model = Linear[Float](10, 1) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(10), Array(1)) - .setMaxEpoch(1) - .setBatchSize(nRecords) - - val featureData = Array.tabulate(100)(_ => Tensor(10)) - val labelData = Array.tabulate(100)(_ => Tensor(1).fill(1.0f)) - val miniBatch = sc.parallelize( - featureData.zip(labelData).map(v => - MinibatchData(v._1.storage.array, v._2.storage.array)) - ) - val trainingDF: DataFrame = sqlContext.createDataFrame(miniBatch).toDF("features", "label") - - val dlModel = estimator.fit(trainingDF) - dlModel.transform(trainingDF).collect() - } - - "An DLEstimator" should "support different batchSize" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setBatchSize(51) - .setMaxEpoch(maxEpoch) - val data = sc.parallelize(smallData) - val df: DataFrame = sqlContext.createDataFrame(data).toDF("features", "label") - - val dlModel = estimator.fit(df) - dlModel.isInstanceOf[DLModel[_]] should be(true) - dlModel.transform(df).count() - } - - "An DLModel" should "support transform with different batchSize" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setBatchSize(nRecords) - .setMaxEpoch(maxEpoch) - val data = sc.parallelize(smallData) - val df: DataFrame = sqlContext.createDataFrame(data).toDF("features", "label") - val dlModel = estimator.fit(df) - assert(df.count() == dlModel.setBatchSize(51).transform(df).count()) - } - - "An DLEstimator" should "throws exception without correct inputs" in { - val model = Linear[Float](10, 1) - val criterion = ClassNLLCriterion[Float]() - val inputs = Array[String]("Feature data", "Label data") - var estimator = new DLEstimator[Float](model, criterion, Array(10), Array(2, 1)). - setFeaturesCol(inputs(0)).setLabelCol(inputs(1)) - - val featureData = Tensor(2, 10) - val labelData = Tensor(2, 1) - val miniBatch = sc.parallelize(Seq( - MinibatchData[Float](featureData.storage().array(), labelData.storage().array()) - )) - var df: DataFrame = sqlContext.createDataFrame(miniBatch).toDF(inputs: _*) - // Spark 1.6 and 2.0 throws different exception here - intercept[Exception] { - estimator.fit(df) - } - } - - "An DLEstimator" should "supports training summary" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val logdir = com.google.common.io.Files.createTempDir() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setBatchSize(nRecords) - .setMaxEpoch(5) - .setTrainSummary(TrainSummary(logdir.getPath, "DLEstimatorTrain")) - val data = sc.parallelize(smallData) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - - val dlModel = estimator.fit(df) - val trainSummary = estimator.getTrainSummary.get - val losses = trainSummary.readScalar("Loss") - assert(losses.length == 5) - trainSummary.close() - logdir.deleteOnExit() - } - - "An DLEstimator" should "supports validation data and summary" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val logdir = com.google.common.io.Files.createTempDir() - val data = sc.parallelize(smallData) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setBatchSize(4) - .setEndWhen(Trigger.maxIteration(5)) - .setValidation(Trigger.severalIteration(1), df, Array(new Loss[Float]()), 2) - .setValidationSummary(ValidationSummary(logdir.getPath, "DLEstimatorValidation")) - - val dlModel = estimator.fit(df) - val validationSummary = estimator.getValidationSummary.get - val losses = validationSummary.readScalar("Loss") - assert(losses.length == 5) - validationSummary.close() - logdir.deleteOnExit() - } - - "An DLEstimator" should "throws exception when EndWhen and MaxEpoch are set" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val logdir = com.google.common.io.Files.createTempDir() - - val data = sc.parallelize(smallData) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setBatchSize(4) - .setEndWhen(Trigger.maxIteration(5)) - .setMaxEpoch(5) - - intercept[Exception] { - estimator.fit(df) - } - } - - "An DLEstimator" should "works in ML pipeline" in { - var appSparkVersion = org.apache.spark.SPARK_VERSION - if (appSparkVersion.trim.startsWith("1")) { - val data = sc.parallelize( - smallData.map(p => (org.apache.spark.mllib.linalg.Vectors.dense(p._1), p._2))) - val df: DataFrame = sqlContext.createDataFrame(data).toDF("features", "label") - - val scaler = new MinMaxScaler().setInputCol("features").setOutputCol("scaled") - .setMax(1).setMin(-1) - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setOptimMethod(new LBFGS[Float]()) - .setLearningRate(0.1) - .setBatchSize(nRecords) - .setMaxEpoch(maxEpoch) - .setFeaturesCol("scaled") - val pipeline = new Pipeline().setStages(Array(scaler, estimator)) - - val pipelineModel = pipeline.fit(df) - pipelineModel.isInstanceOf[PipelineModel] should be(true) - val correct = pipelineModel.transform(df).select("label", "prediction").rdd.filter { - case Row(label: Double, prediction: Seq[_]) => - label == prediction.indexOf(prediction.asInstanceOf[Seq[Double]].max) + 1 - }.count() - assert(correct > nRecords * 0.8) - } - } -} - -private case class MinibatchData[T](featureData : Array[T], labelData : Array[T]) - -object DLEstimatorSpec { - // Generate noisy input of the form Y = signum(x.dot(weights) + intercept + noise) - def generateTestInput( - numRecords: Int, - weight: Array[Double], - intercept: Double, - seed: Long): Seq[(Array[Double], Double)] = { - val rnd = new Random(seed) - val data = (1 to numRecords) - .map( i => Array.tabulate(weight.length)(index => rnd.nextDouble() * 2 - 1)) - .map { record => - val y = record.zip(weight).map(t => t._1 * t._2).sum - +intercept + 0.01 * rnd.nextGaussian() - val label = if (y > 0) 2.0 else 1.0 - (record, label) - } - data - } -} diff --git a/scala/dllib/src/test/scala/com/intel/analytics/bigdl/dllib/optim/dlframes/DLClassifierSpec.scala b/scala/dllib/src/test/scala/com/intel/analytics/bigdl/dllib/optim/dlframes/DLClassifierSpec.scala deleted file mode 100644 index 2d9eb786a2c..00000000000 --- a/scala/dllib/src/test/scala/com/intel/analytics/bigdl/dllib/optim/dlframes/DLClassifierSpec.scala +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.intel.analytics.bigdl.dlframes - -import com.intel.analytics.bigdl.models.lenet.LeNet5 -import com.intel.analytics.bigdl.nn._ -import com.intel.analytics.bigdl.optim.{Adam, LBFGS, Loss, Trigger} -import com.intel.analytics.bigdl.tensor.Tensor -import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric.NumericFloat -import com.intel.analytics.bigdl.utils.Engine -import com.intel.analytics.bigdl.utils.RandomGenerator.RNG -import com.intel.analytics.bigdl.visualization.ValidationSummary -import org.apache.log4j.{Level, Logger} -import org.apache.spark.ml.feature.MinMaxScaler -import org.apache.spark.SparkContext -import org.apache.spark.ml._ -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.sql.{DataFrame, SQLContext} -import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} - -import scala.collection.mutable.ArrayBuffer -import scala.util.Random - -class DLClassifierSpec extends FlatSpec with Matchers with BeforeAndAfter { - var sc : SparkContext = _ - var sqlContext : SQLContext = _ - var smallData: Seq[(Array[Double], Double)] = _ - val nRecords = 100 - val maxEpoch = 20 - - before { - val conf = Engine.createSparkConf().setAppName("Test DLEstimator").setMaster("local[1]") - sc = SparkContext.getOrCreate(conf) - sqlContext = new SQLContext(sc) - Random.setSeed(42) - RNG.setSeed(42) - smallData = DLEstimatorSpec.generateTestInput( - nRecords, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0), -1.0, 42L) - Engine.init - } - - after{ - if (sc != null) { - sc.stop() - } - } - - "An DLClassifier" should "has correct default params" in { - val model = Linear[Float](10, 1) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLClassifier[Float](model, criterion, Array(10)) - assert(estimator.getFeaturesCol == "features") - assert(estimator.getLabelCol == "label") - assert(estimator.getMaxEpoch == 50) - assert(estimator.getBatchSize == 1) - assert(estimator.getLearningRate == 1e-3) - assert(estimator.getLearningRateDecay == 0) - } - - "An DLClassifier" should "get reasonale accuracy" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val classifier = new DLClassifier[Float](model, criterion, Array(6)) - .setOptimMethod(new LBFGS[Float]()) - .setLearningRate(0.1) - .setBatchSize(nRecords) - .setMaxEpoch(maxEpoch) - val data = sc.parallelize(smallData) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - - val dlModel = classifier.fit(df) - dlModel.isInstanceOf[DLClassifierModel[_]] should be(true) - assert(dlModel.transform(df).where("prediction=label").count() > nRecords * 0.8) - } - - "An DLClassifier" should "support different FEATURE types" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val classifier = new DLClassifier[Float](model, criterion, Array(6)) - .setLearningRate(0.1) - .setBatchSize(2) - .setEndWhen(Trigger.maxIteration(2)) - - Array( - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, p._2)))) - .toDF("features", "label"), // Array[Double] - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.map(_.toFloat), p._2)))) - .toDF("features", "label"), // Array[Float] - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (Vectors.dense(p._1), p._2)))) - .toDF("features", "label") // MLlib Vector - // TODO: add ML Vector when ut for Spark 2.0+ is ready - ).foreach { df => - val dlModel = classifier.fit(df) - dlModel.transform(df).collect() - } - } - - "An DLClassifier" should "support scalar FEATURE" in { - val model = new Sequential().add(Linear[Float](1, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val classifier = new DLClassifier[Float](model, criterion, Array(1)) - .setLearningRate(0.1) - .setBatchSize(2) - .setEndWhen(Trigger.maxIteration(2)) - - Array( - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.head.toFloat, p._2)))) - .toDF("features", "label"), // Float - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.head, p._2)))) - .toDF("features", "label") // Double - // TODO: add ML Vector when ut for Spark 2.0+ is ready - ).foreach { df => - val dlModel = classifier.fit(df) - dlModel.transform(df).collect() - } - } - - "An DLClassifier" should "support binary classification" in { - val model = new Sequential().add(Linear[Float](6, 1)).add(Sigmoid[Float]) - val criterion = BCECriterion[Float]() - val classifier = new DLClassifier[Float](model, criterion, Array(6)) - .setLearningRate(0.1) - .setBatchSize(2) - .setEndWhen(Trigger.maxIteration(10)) - - Array( - sqlContext.createDataFrame(sc.parallelize( - smallData.map(p => (p._1.map(_.toFloat), p._2.toFloat - 1)))) - .toDF("features", "label"), // Float - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, p._2 - 1)))) - .toDF("features", "label") // Double - // TODO: add ML Vector when ut for Spark 2.0+ is ready - ).foreach { df => - val dlModel = classifier.fit(df) - val result = dlModel.transform(df).collect() - val accuracy = result.count(v => v.get(1) == v.get(2)).toDouble / smallData.size - accuracy should be > math.max(smallData.count(_._2 == 1), - smallData.count(_._2 == 2)).toDouble / smallData.size - } - } - - "An DLClassifier" should "fit with adam and LBFGS" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - Seq(new LBFGS[Float], new Adam[Float]).foreach { optimMethod => - val classifier = new DLClassifier[Float](model, criterion, Array(6)) - .setBatchSize(nRecords) - .setMaxEpoch(2) - .setOptimMethod(optimMethod) - .setLearningRate(0.1) - val data = sc.parallelize(smallData) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - val dlModel = classifier.fit(df) - dlModel.isInstanceOf[DLClassifierModel[_]] should be(true) - } - } - - "An DLClassifier" should "supports validation data and summary" in { - val data = sc.parallelize(smallData) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - - val logdir = com.google.common.io.Files.createTempDir() - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val classifier = new DLClassifier[Float](model, criterion, Array(6)) - .setBatchSize(nRecords) - .setEndWhen(Trigger.maxIteration(5)) - .setOptimMethod(new Adam[Float]) - .setLearningRate(0.1) - .setValidation(Trigger.severalIteration(1), df, Array(new Loss[Float]()), 2) - .setValidationSummary(ValidationSummary(logdir.getPath, "DLEstimatorValidation")) - - classifier.fit(df) - val validationSummary = classifier.getValidationSummary.get - val losses = validationSummary.readScalar("Loss") - validationSummary.close() - logdir.deleteOnExit() - } - - "An DLClassifier" should "get the same classification result with BigDL model" in { - Logger.getLogger("org").setLevel(Level.WARN) - Logger.getLogger("akka").setLevel(Level.WARN) - - val model = LeNet5(10) - - // init - val valTrans = new DLClassifierModel[Float](model, Array(28, 28)) - .setBatchSize(4) - - val tensorBuffer = new ArrayBuffer[Data]() - // generate test data with BigDL - val input = Tensor[Float](10, 28, 28).apply1(e => Random.nextFloat()) - val target = model.forward(input).toTensor[Float] - - // test against DLClassifierModel - val inputArr = input.storage().array() - val targetArr = target.max(2)._2.squeeze().storage().array() - (0 until 10).foreach(i => - tensorBuffer.append( - Data(targetArr(i), inputArr.slice(i * 28 * 28, (i + 1) * 28 * 28).map(_.toDouble)))) - val rowRDD = sc.parallelize(tensorBuffer) - val testData = sqlContext.createDataFrame(rowRDD) - assert(valTrans.transform(testData).where("prediction=label").count() == testData.count()) - tensorBuffer.clear() - } - - "An DLClassifier" should "works in ML pipeline" in { - var appSparkVersion = org.apache.spark.SPARK_VERSION - if (appSparkVersion.trim.startsWith("1")) { - val data = sc.parallelize( - smallData.map(p => (org.apache.spark.mllib.linalg.Vectors.dense(p._1), p._2))) - val df: DataFrame = sqlContext.createDataFrame(data).toDF("features", "label") - - val scaler = new MinMaxScaler().setInputCol("features").setOutputCol("scaled") - .setMax(1).setMin(-1) - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLClassifier[Float](model, criterion, Array(6)) - .setBatchSize(nRecords) - .setOptimMethod(new LBFGS[Float]()) - .setLearningRate(0.1) - .setMaxEpoch(maxEpoch) - .setFeaturesCol("scaled") - val pipeline = new Pipeline().setStages(Array(scaler, estimator)) - - val pipelineModel = pipeline.fit(df) - pipelineModel.isInstanceOf[PipelineModel] should be(true) - assert(pipelineModel.transform(df).where("prediction=label").count() > nRecords * 0.8) - } - } -} - -private case class Data(label: Double, features: Array[Double]) diff --git a/scala/dllib/src/test/scala/com/intel/analytics/bigdl/dllib/optim/dlframes/DLEstimatorSpec.scala b/scala/dllib/src/test/scala/com/intel/analytics/bigdl/dllib/optim/dlframes/DLEstimatorSpec.scala deleted file mode 100644 index 2126f556c14..00000000000 --- a/scala/dllib/src/test/scala/com/intel/analytics/bigdl/dllib/optim/dlframes/DLEstimatorSpec.scala +++ /dev/null @@ -1,348 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.intel.analytics.bigdl.dlframes - -import com.intel.analytics.bigdl.nn._ -import com.intel.analytics.bigdl.optim.{LBFGS, Loss, Trigger} -import com.intel.analytics.bigdl.tensor.Tensor -import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric.NumericFloat -import com.intel.analytics.bigdl.utils.Engine -import com.intel.analytics.bigdl.utils.RandomGenerator.RNG -import com.intel.analytics.bigdl.visualization.{TrainSummary, ValidationSummary} -import org.apache.spark.SparkContext -import org.apache.spark.ml.feature.MinMaxScaler -import org.apache.spark.ml.{Pipeline, PipelineModel} -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.sql.{DataFrame, Row, SQLContext} -import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} - -import scala.util.Random - -class DLEstimatorSpec extends FlatSpec with Matchers with BeforeAndAfter { - val model = new Sequential[Float]() - var sc : SparkContext = _ - var sqlContext : SQLContext = _ - var smallData: Seq[(Array[Double], Double)] = _ - val nRecords = 100 - val maxEpoch = 20 - - before { - Random.setSeed(42) - RNG.setSeed(42) - val conf = Engine.createSparkConf().setAppName("Test DLEstimator").setMaster("local[1]") - sc = SparkContext.getOrCreate(conf) - sqlContext = new SQLContext(sc) - smallData = DLEstimatorSpec.generateTestInput( - nRecords, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0), -1.0, 42L) - Engine.init - } - - after{ - if (sc != null) { - sc.stop() - } - } - - "An DLEstimator" should "has correct default params" in { - val model = Linear[Float](10, 1) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(10), Array(1)) - assert(estimator.getFeaturesCol == "features") - assert(estimator.getLabelCol == "label") - assert(estimator.getMaxEpoch == 50) - assert(estimator.getBatchSize == 1) - assert(estimator.getLearningRate == 1e-3) - assert(estimator.getLearningRateDecay == 0) - - } - - "An DLEstimator" should "get reasonable accuracy" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setBatchSize(nRecords) - .setOptimMethod(new LBFGS[Float]()) - .setLearningRate(0.1) - .setMaxEpoch(maxEpoch) - val data = sc.parallelize(smallData) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - - val dlModel = estimator.fit(df) - dlModel.isInstanceOf[DLModel[_]] should be(true) - val correct = dlModel.transform(df).select("label", "prediction").rdd.filter { - case Row(label: Double, prediction: Seq[_]) => - label == prediction.indexOf(prediction.asInstanceOf[Seq[Double]].max) + 1 - }.count() - assert(correct > nRecords * 0.8) - } - - "An DLEstimator" should "support different FEATURE types" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setBatchSize(2) - // intentionally set low since this only validates data format compatibility - .setEndWhen(Trigger.maxIteration(1)) - - Array( - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, p._2)))) - .toDF("features", "label"), // Array[Double] - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.map(_.toFloat), p._2)))) - .toDF("features", "label"), // Array[Float] - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (Vectors.dense(p._1), p._2)))) - .toDF("features", "label") // MLlib Vector - // TODO: add ML Vector when ut for Spark 2.0+ is ready - ).foreach { df => - val dlModel = estimator.fit(df) - dlModel.transform(df).collect() - } - } - - "An DLEstimator" should "support scalar FEATURE types" in { - val model = new Sequential().add(Linear[Float](1, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(1), Array(1)) - .setBatchSize(2) - // intentionally set low since this only validates data format compatibility - .setEndWhen(Trigger.maxIteration(1)) - - Array( - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.head.toFloat, p._2)))) - .toDF("features", "label"), // Float - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1.head, p._2)))) - .toDF("features", "label") // Double - // TODO: add ML Vector when ut for Spark 2.0+ is ready - ).foreach { df => - val dlModel = estimator.fit(df) - dlModel.transform(df).collect() - } - } - - "An DLEstimator" should "support different LABEL types" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = MultiLabelSoftMarginCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(2)) - // intentionally set low since this only validates data format compatibitliy - .setEndWhen(Trigger.maxIteration(1)) - .setBatchSize(2) - - Array( - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, Array(p._2, p._2))))) - .toDF("features", "label"), // Array[Double] - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, - Array(p._2.toFloat, p._2.toFloat))))).toDF("features", "label"), // Array[Float] - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, - Vectors.dense(p._2, p._2))))).toDF("features", "label") // MLlib Vector - // TODO: add ML Vector when ut for Spark 2.0+ is ready - ).foreach { df => - val dlModel = estimator.fit(df) - dlModel.transform(df).collect() - } - } - - "An DLEstimator" should "support scalar LABEL types" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - // intentionally set low since this only validates data format compatibitliy - .setEndWhen(Trigger.maxIteration(1)) - .setBatchSize(2) - - Array( - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, p._2.toFloat)))) - .toDF("features", "label"), // Float - sqlContext.createDataFrame(sc.parallelize(smallData.map(p => (p._1, p._2)))) - .toDF("features", "label") // Double - // TODO: add ML Vector when ut for Spark 2.0+ is ready - ).foreach { df => - val dlModel = estimator.fit(df) - dlModel.transform(df).collect() - } - } - - "An DLEstimator" should "work with tensor data" in { - - val model = Linear[Float](10, 1) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(10), Array(1)) - .setMaxEpoch(1) - .setBatchSize(nRecords) - - val featureData = Array.tabulate(100)(_ => Tensor(10)) - val labelData = Array.tabulate(100)(_ => Tensor(1).fill(1.0f)) - val miniBatch = sc.parallelize( - featureData.zip(labelData).map(v => - MinibatchData(v._1.storage.array, v._2.storage.array)) - ) - val trainingDF: DataFrame = sqlContext.createDataFrame(miniBatch).toDF("features", "label") - - val dlModel = estimator.fit(trainingDF) - dlModel.transform(trainingDF).collect() - } - - "An DLEstimator" should "support different batchSize" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setBatchSize(51) - .setMaxEpoch(maxEpoch) - val data = sc.parallelize(smallData) - val df: DataFrame = sqlContext.createDataFrame(data).toDF("features", "label") - - val dlModel = estimator.fit(df) - dlModel.isInstanceOf[DLModel[_]] should be(true) - dlModel.transform(df).count() - } - - "An DLModel" should "support transform with different batchSize" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setBatchSize(nRecords) - .setMaxEpoch(maxEpoch) - val data = sc.parallelize(smallData) - val df: DataFrame = sqlContext.createDataFrame(data).toDF("features", "label") - val dlModel = estimator.fit(df) - assert(df.count() == dlModel.setBatchSize(51).transform(df).count()) - } - - "An DLEstimator" should "throws exception without correct inputs" in { - val model = Linear[Float](10, 1) - val criterion = ClassNLLCriterion[Float]() - val inputs = Array[String]("Feature data", "Label data") - var estimator = new DLEstimator[Float](model, criterion, Array(10), Array(2, 1)). - setFeaturesCol(inputs(0)).setLabelCol(inputs(1)) - - val featureData = Tensor(2, 10) - val labelData = Tensor(2, 1) - val miniBatch = sc.parallelize(Seq( - MinibatchData[Float](featureData.storage().array(), labelData.storage().array()) - )) - var df: DataFrame = sqlContext.createDataFrame(miniBatch).toDF(inputs: _*) - // Spark 1.6 and 2.0 throws different exception here - intercept[Exception] { - estimator.fit(df) - } - } - - "An DLEstimator" should "supports training summary" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val logdir = com.google.common.io.Files.createTempDir() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setBatchSize(nRecords) - .setMaxEpoch(5) - .setTrainSummary(TrainSummary(logdir.getPath, "DLEstimatorTrain")) - val data = sc.parallelize(smallData) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - - val dlModel = estimator.fit(df) - val trainSummary = estimator.getTrainSummary.get - val losses = trainSummary.readScalar("Loss") - assert(losses.length == 5) - trainSummary.close() - logdir.deleteOnExit() - } - - "An DLEstimator" should "supports validation data and summary" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val logdir = com.google.common.io.Files.createTempDir() - val data = sc.parallelize(smallData) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setBatchSize(4) - .setEndWhen(Trigger.maxIteration(5)) - .setValidation(Trigger.severalIteration(1), df, Array(new Loss[Float]()), 2) - .setValidationSummary(ValidationSummary(logdir.getPath, "DLEstimatorValidation")) - - val dlModel = estimator.fit(df) - val validationSummary = estimator.getValidationSummary.get - val losses = validationSummary.readScalar("Loss") - assert(losses.length == 5) - validationSummary.close() - logdir.deleteOnExit() - } - - "An DLEstimator" should "throws exception when EndWhen and MaxEpoch are set" in { - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val logdir = com.google.common.io.Files.createTempDir() - - val data = sc.parallelize(smallData) - val df = sqlContext.createDataFrame(data).toDF("features", "label") - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setBatchSize(4) - .setEndWhen(Trigger.maxIteration(5)) - .setMaxEpoch(5) - - intercept[Exception] { - estimator.fit(df) - } - } - - "An DLEstimator" should "works in ML pipeline" in { - var appSparkVersion = org.apache.spark.SPARK_VERSION - if (appSparkVersion.trim.startsWith("1")) { - val data = sc.parallelize( - smallData.map(p => (org.apache.spark.mllib.linalg.Vectors.dense(p._1), p._2))) - val df: DataFrame = sqlContext.createDataFrame(data).toDF("features", "label") - - val scaler = new MinMaxScaler().setInputCol("features").setOutputCol("scaled") - .setMax(1).setMin(-1) - val model = new Sequential().add(Linear[Float](6, 2)).add(LogSoftMax[Float]) - val criterion = ClassNLLCriterion[Float]() - val estimator = new DLEstimator[Float](model, criterion, Array(6), Array(1)) - .setOptimMethod(new LBFGS[Float]()) - .setLearningRate(0.1) - .setBatchSize(nRecords) - .setMaxEpoch(maxEpoch) - .setFeaturesCol("scaled") - val pipeline = new Pipeline().setStages(Array(scaler, estimator)) - - val pipelineModel = pipeline.fit(df) - pipelineModel.isInstanceOf[PipelineModel] should be(true) - val correct = pipelineModel.transform(df).select("label", "prediction").rdd.filter { - case Row(label: Double, prediction: Seq[_]) => - label == prediction.indexOf(prediction.asInstanceOf[Seq[Double]].max) + 1 - }.count() - assert(correct > nRecords * 0.8) - } - } -} - -private case class MinibatchData[T](featureData : Array[T], labelData : Array[T]) - -object DLEstimatorSpec { - // Generate noisy input of the form Y = signum(x.dot(weights) + intercept + noise) - def generateTestInput( - numRecords: Int, - weight: Array[Double], - intercept: Double, - seed: Long): Seq[(Array[Double], Double)] = { - val rnd = new Random(seed) - val data = (1 to numRecords) - .map( i => Array.tabulate(weight.length)(index => rnd.nextDouble() * 2 - 1)) - .map { record => - val y = record.zip(weight).map(t => t._1 * t._2).sum - +intercept + 0.01 * rnd.nextGaussian() - val label = if (y > 0) 2.0 else 1.0 - (record, label) - } - data - } -} diff --git a/scala/dllib/src/test/scala/com/intel/analytics/bigdl/dllib/optim/dlframes/DLImageReaderSpec.scala b/scala/dllib/src/test/scala/com/intel/analytics/bigdl/dllib/optim/dlframes/DLImageReaderSpec.scala deleted file mode 100644 index e6ba6d1e98e..00000000000 --- a/scala/dllib/src/test/scala/com/intel/analytics/bigdl/dllib/optim/dlframes/DLImageReaderSpec.scala +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.intel.analytics.bigdl.dlframes - -import com.intel.analytics.bigdl.transform.vision.image.{ImageFrame, MatToTensor} -import com.intel.analytics.bigdl.transform.vision.image.augmentation.Resize -import com.intel.analytics.bigdl.utils.Engine -import com.intel.analytics.bigdl.utils.RandomGenerator.RNG -import org.apache.spark.SparkContext -import org.apache.spark.sql.{Row, SQLContext} -import org.opencv.core.CvType -import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} - -import scala.util.Random - -class DLImageReaderSpec extends FlatSpec with Matchers with BeforeAndAfter { - - var sc : SparkContext = _ - var sQLContext: SQLContext = _ - val pascalResource = getClass.getClassLoader.getResource("pascal/") - private val imageNetResource = getClass.getClassLoader.getResource("imagenet/") - - before { - val conf = Engine.createSparkConf().setAppName("Test DLImageReader").setMaster("local[1]") - sc = SparkContext.getOrCreate(conf) - sQLContext = new SQLContext(sc) - - Random.setSeed(42) - RNG.setSeed(42) - - Engine.init - } - - after{ - if (sc != null) { - sc.stop() - } - } - - "DLImageReader" should "has correct result for pascal" in { - val imageDF = DLImageReader.readImages(pascalResource.getFile, sc) - assert(imageDF.count() == 1) - val r = imageDF.head().getAs[Row](0) - assert(r.getString(0).endsWith("000025.jpg")) - assert(r.getInt(1) == 375) - assert(r.getInt(2) == 500) - assert(r.getInt(3) == 3) - assert(r.getInt(4) == CvType.CV_8UC3) - assert(r.getAs[Array[Byte]](5).length == 95959) - } - - "DLImageReader" should "has correct result for imageNet" in { - val imageDirectory = imageNetResource + "n02110063/" - val imageDF = DLImageReader.readImages(imageDirectory, sc) - assert(imageDF.count() == 3) - val expectedRows = Seq( - (imageDirectory + "n02110063_8651.JPEG", 99, 129, 3, CvType.CV_8UC3), - (imageDirectory + "n02110063_11239.JPEG", 333, 500, 3, CvType.CV_8UC3), - (imageDirectory + "n02110063_15462.JPEG", 332, 500, 3, CvType.CV_8UC3) - ) - val actualRows = imageDF.rdd.collect().map(r => r.getAs[Row](0)).map { r => - (r.getString(0), r.getInt(1), r.getInt(2), r.getInt(3), r.getInt(4)) - } - assert (expectedRows.toSet == actualRows.toSet) - } - - "DLImageReader" should "has correct result for imageNet with channel 1 and 4" in { - val imageDirectory = imageNetResource + "n99999999/" - val imageDF = DLImageReader.readImages(imageDirectory, sc) - assert(imageDF.count() == 3) - val expectedRows = Seq( - (imageDirectory + "n02105855_2933.JPEG", 189, 213, 4, CvType.CV_8UC4), - (imageDirectory + "n02105855_test1.bmp", 527, 556, 1, CvType.CV_8UC1), - (imageDirectory + "n03000134_4970.JPEG", 480, 640, 3, CvType.CV_8UC3) - ) - val actualRows = imageDF.rdd.collect().map(r => r.getAs[Row](0)).map { r => - (r.getString(0), r.getInt(1), r.getInt(2), r.getInt(3), r.getInt(4)) - } - assert (expectedRows.toSet == actualRows.toSet) - } - - "DLImageReader" should "read recursively by wildcard path" in { - val imageDF = DLImageReader.readImages(imageNetResource.getFile + "*", sc) - assert(imageDF.count() == 11) - } - - "DLImageReader" should "read from multiple path" in { - val imageDirectory1 = imageNetResource + "n02110063/" - val imageDirectory2 = imageNetResource + "n99999999/" - val imageDF = DLImageReader.readImages(imageDirectory1 + "," + imageDirectory2, sc) - assert(imageDF.count() == 6) - } - - "read gray scale image" should "work" in { - val resource = getClass().getClassLoader().getResource("gray/gray.bmp") - val df = DLImageReader.readImages(resource.getFile, sc) - assert(df.count() == 1) - val r = df.head().getAs[Row](0) - assert(r.getString(0).endsWith("gray.bmp")) - assert(r.getInt(1) == 50) - assert(r.getInt(2) == 50) - assert(r.getInt(3) == 1) - assert(r.getInt(4) == CvType.CV_8UC1) - } -} diff --git a/scala/dllib/src/test/scala/com/intel/analytics/bigdl/dllib/optim/dlframes/DLImageTransformerSpec.scala b/scala/dllib/src/test/scala/com/intel/analytics/bigdl/dllib/optim/dlframes/DLImageTransformerSpec.scala deleted file mode 100644 index 4c2fd5cdcfd..00000000000 --- a/scala/dllib/src/test/scala/com/intel/analytics/bigdl/dllib/optim/dlframes/DLImageTransformerSpec.scala +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.intel.analytics.bigdl.dlframes - -import com.intel.analytics.bigdl.transform.vision.image.{ImageFrame, ImageFrameToSample, MatToTensor} -import com.intel.analytics.bigdl.transform.vision.image.augmentation.{CenterCrop, ChannelNormalize, Resize} -import com.intel.analytics.bigdl.utils.Engine -import com.intel.analytics.bigdl.utils.RandomGenerator.RNG -import org.apache.spark.SparkContext -import org.apache.spark.sql.{Row, SQLContext} -import org.opencv.core.CvType -import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} -import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric.NumericFloat - -import scala.util.Random - -class DLImageTransformerSpec extends FlatSpec with Matchers with BeforeAndAfter { - private var sc : SparkContext = _ - private var sqlContext : SQLContext = _ - private val pascalResource = getClass.getClassLoader.getResource("pascal/") - private val imageNetResource = getClass.getClassLoader.getResource("imagenet/") - - before { - val conf = Engine.createSparkConf().setAppName("Test DLImageTransfomer").setMaster("local[1]") - sc = SparkContext.getOrCreate(conf) - sqlContext = new SQLContext(sc) - Random.setSeed(42) - RNG.setSeed(42) - Engine.init - } - - after{ - if (sc != null) { - sc.stop() - } - } - - "DLTransformer" should "setters work" in { - val transformer = Resize(256, 256) -> CenterCrop(224, 224) -> - ChannelNormalize(123, 117, 104, 1, 1, 1) -> MatToTensor() - val trans = new DLImageTransformer(transformer) - .setInputCol("image1") - .setOutputCol("features1") - assert(trans.getInputCol == "image1") - assert(trans.getOutputCol == "features1") - } - - "DLTransformer" should "has correct result with pascal images" in { - val imageDF = DLImageReader.readImages(pascalResource.getFile, sc) - assert(imageDF.count() == 1) - val transformer = Resize(256, 256) -> CenterCrop(224, 224) -> - ChannelNormalize(123, 117, 104, 1, 1, 1) -> MatToTensor() - val transformedDF = new DLImageTransformer(transformer) - .setInputCol("image") - .setOutputCol("features") - .transform(imageDF) - val r = transformedDF.select("features").rdd.first().getAs[Row](0) - assert(r.getString(0).endsWith("pascal/000025.jpg")) - assert(r.getInt(1) == 224) - assert(r.getInt(2) == 224) - assert(r.getInt(3) == 3) - assert(r.getInt(4) == CvType.CV_32FC3) - assert(r.getSeq[Float](5).take(6).toArray.deep == Array(-30, -50, -69, -84, -46, -25).deep) - } - - "DLTransformer" should "has correct result without MatToTensor" in { - val imageDF = DLImageReader.readImages(pascalResource.getFile, sc) - assert(imageDF.count() == 1) - val transformer = Resize(256, 256) -> CenterCrop(224, 224) -> - ChannelNormalize(123, 117, 104, 1, 1, 1) - val transformedDF = new DLImageTransformer(transformer) - .setInputCol("image") - .setOutputCol("features") - .transform(imageDF) - val r = transformedDF.select("features").rdd.first().getAs[Row](0) - assert(r.getString(0).endsWith("pascal/000025.jpg")) - assert(r.getInt(1) == 224) - assert(r.getInt(2) == 224) - assert(r.getInt(3) == 3) - assert(r.getInt(4) == CvType.CV_32FC3) - assert(r.getSeq[Float](5).take(6).toArray.deep == Array(-30, -50, -69, -84, -46, -25).deep) - } - - "DLTransformer" should "ensure imf2Row and Row2Imf reversible" in { - val imageDF = DLImageReader.readImages(pascalResource.getFile, sc) - assert(imageDF.count() == 1) - val transformer = Resize(256, 256) -> CenterCrop(224, 224) -> - ChannelNormalize(123, 117, 104, 1, 1, 1) -> MatToTensor() - val transformedDF = new DLImageTransformer(transformer) - .setInputCol("image") - .setOutputCol("features") - .transform(imageDF) - val r = transformedDF.select("features").rdd.first().getAs[Row](0) - val convertedR = DLImageSchema.imf2Row(DLImageSchema.row2IMF(r)) - - assert(r.getSeq[Float](5).toArray.deep == convertedR.getAs[Array[Float]](5).deep) - } - - "DLTransformer" should "transform gray scale image" in { - val resource = getClass().getClassLoader().getResource("gray/gray.bmp") - val df = DLImageReader.readImages(resource.getFile, sc) - val dlTransformer = new DLImageTransformer(Resize(28, 28) -> MatToTensor[Float]()) - .setInputCol("image") - .setOutputCol("features") - val r = dlTransformer.transform(df).select("features").rdd.first().getAs[Row](0) - assert(r.getString(0).endsWith("gray.bmp")) - assert(r.getInt(1) == 28) - assert(r.getInt(2) == 28) - assert(r.getInt(3) == 1) - assert(r.getInt(4) == CvType.CV_32FC1) - } - - "DLTransformer" should "report error with same input and output columns" in { - val resource = getClass().getClassLoader().getResource("gray/gray.bmp") - val df = DLImageReader.readImages(resource.getFile, sc) - val dlTransformer = new DLImageTransformer(Resize(28, 28) -> MatToTensor[Float]()) - .setInputCol("image") - .setOutputCol("image") - intercept[IllegalArgumentException] { - val transformed = dlTransformer.transform(df) - } - } - -} diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLClassifier.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLClassifier.scala deleted file mode 100644 index e57ee03d168..00000000000 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLClassifier.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.intel.analytics.bigdl.dlframes - -import com.intel.analytics.bigdl.tensor.Tensor -import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric -import com.intel.analytics.bigdl.{Criterion, Module} -import org.apache.spark.ml.adapter.SchemaUtils -import org.apache.spark.ml.param.ParamMap -import org.apache.spark.ml.util.Identifiable -import org.apache.spark.sql.types._ - -import scala.reflect.ClassTag - -/** - * [[DLClassifier]] is a specialized [[DLEstimator]] that simplifies the data format for - * classification tasks. It only supports label column of DoubleType. - * and the fitted [[DLClassifierModel]] will have the prediction column of DoubleType. - * - * @param model BigDL module to be optimized - * @param criterion BigDL criterion method - * @param featureSize The size (Tensor dimensions) of the feature data. - */ -@deprecated("`DLClassifier` is deprecated." + - "com.intel.analytics.bigdl.dlframes is deprecated in BigDL 0.11, " + - "and will be removed in future releases", "0.10.0") -class DLClassifier[T: ClassTag]( - @transient override val model: Module[T], - override val criterion : Criterion[T], - override val featureSize : Array[Int], - override val uid: String = Identifiable.randomUID("dlClassifier") - )(implicit ev: TensorNumeric[T]) - extends DLEstimator[T](model, criterion, featureSize, Array(1)) { - - override protected def wrapBigDLModel( - m: Module[T], featureSize: Array[Int]): DLClassifierModel[T] = { - val dlModel = new DLClassifierModel[T](m, featureSize) - copyValues(dlModel.setParent(this)).asInstanceOf[DLClassifierModel[T]] - } - - override def transformSchema(schema : StructType): StructType = { - validateParams(schema) - SchemaUtils.appendColumn(schema, $(predictionCol), DoubleType) - } - - override def copy(extra: ParamMap): DLClassifier[T] = { - copyValues(new DLClassifier(model, criterion, featureSize), extra) - } -} - -/** - * [[DLClassifierModel]] is a specialized [[DLModel]] for classification tasks. - * The prediction column will have the datatype of Double. - * - * @param model BigDL module to be optimized - * @param featureSize The size (Tensor dimensions) of the feature data. - */ -@deprecated("`DLClassifierModel` is deprecated." + - "com.intel.analytics.bigdl.dlframes is deprecated in BigDL 0.11, " + - "and will be removed in future releases", "0.10.0") -class DLClassifierModel[T: ClassTag]( - @transient override val model: Module[T], - featureSize : Array[Int], - override val uid: String = "DLClassifierModel" - )(implicit ev: TensorNumeric[T]) extends DLModel[T](model, featureSize) { - - protected override def outputToPrediction(output: Tensor[T]): Any = { - if (output.size().deep == Array(1).deep) { - val raw = ev.toType[Double](output.toArray().head) - if (raw > 0.5) 1.0 else 0.0 - } else { - ev.toType[Double](output.max(1)._2.valueAt(1)) - } - } - - override def transformSchema(schema : StructType): StructType = { - validateDataType(schema, $(featuresCol)) - SchemaUtils.appendColumn(schema, $(predictionCol), DoubleType) - } -} - diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLEstimator.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLEstimator.scala deleted file mode 100644 index 9441be959f5..00000000000 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLEstimator.scala +++ /dev/null @@ -1,446 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.intel.analytics.bigdl.dlframes - -import com.intel.analytics.bigdl.{Criterion, Module} -import com.intel.analytics.bigdl.dataset._ -import com.intel.analytics.bigdl.models.utils.ModelBroadcast -import com.intel.analytics.bigdl.optim._ -import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric -import com.intel.analytics.bigdl.tensor.{Storage, Tensor} -import com.intel.analytics.bigdl.utils.T -import com.intel.analytics.bigdl.visualization.{TrainSummary, ValidationSummary} -import org.apache.spark.ml.adapter.{HasFeaturesCol, HasPredictionCol, SchemaUtils} -import org.apache.spark.ml.{DLEstimatorBase, DLTransformerBase, VectorCompatibility} -import org.apache.spark.ml.param.{IntParam, ParamMap, ParamValidators, _} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.types._ -import org.apache.spark.sql.{DataFrame, Row} - -import scala.reflect.ClassTag - -private[dlframes] trait HasBatchSize extends Params { - - final val batchSize: Param[Int] = new Param[Int](this, "batchSize", "batchSize") - - def getBatchSize: Int = $(batchSize) -} - -/** - * Common trait for DLEstimator and DLModel - */ -private[dlframes] trait DLParams[@specialized(Float, Double) T] extends HasFeaturesCol - with HasPredictionCol with VectorCompatibility with HasBatchSize { - - /** - * When to stop the training, passed in a [[Trigger]]. E.g. Trigger.maxIterations - */ - final val endWhen = new Param[Trigger](this, "endWhen", "Trigger to stop the training") - - def getEndWhen: Trigger = $(endWhen) - - /** - * learning rate for the optimizer in the DLEstimator. - * Default: 0.001 - */ - final val learningRate = new DoubleParam( - this, "learningRate", "learningRate", ParamValidators.gt(0)) - - def getLearningRate: Double = $(learningRate) - - /** - * learning rate decay for each iteration. - * Default: 0 - */ - final val learningRateDecay = new DoubleParam(this, "learningRateDecay", "learningRateDecay") - - def getLearningRateDecay: Double = $(learningRateDecay) - - /** - * Number of max Epoch for the training, an epoch refers to a traverse over the training data - * Default: 50 - */ - final val maxEpoch = new IntParam(this, "maxEpoch", "number of max Epoch", ParamValidators.gt(0)) - - def getMaxEpoch: Int = $(maxEpoch) - - /** - * optimization method to be used. BigDL supports many optimization methods like Adam, - * SGD and LBFGS. Refer to package com.intel.analytics.bigdl.optim for all the options. - * Default: SGD - */ - final val optimMethod = new Param[OptimMethod[T]](this, "optimMethod", "optimMethod") - - def getOptimMethod: OptimMethod[T] = $(optimMethod) - - setDefault(batchSize -> 1) - - /** - * Validate if feature and label columns are of supported data types. - * Default: 0 - */ - protected def validateDataType(schema: StructType, colName: String): Unit = { - val dataTypes = Seq( - new ArrayType(DoubleType, false), - new ArrayType(DoubleType, true), - new ArrayType(FloatType, false), - new ArrayType(FloatType, true), - DoubleType, - FloatType - ) ++ validVectorTypes - - // TODO use SchemaUtils.checkColumnTypes after convert to 2.0 - val actualDataType = schema(colName).dataType - require(dataTypes.exists(actualDataType.equals), - s"Column $colName must be of type equal to one of the following types: " + - s"${dataTypes.mkString("[", ", ", "]")} but was actually of type $actualDataType.") - } - - /** - * Get conversion function to extract data from original DataFrame - * Default: 0 - */ - protected def getConvertFunc(colType: DataType): (Row, Int) => Seq[AnyVal] = { - colType match { - case ArrayType(DoubleType, false) => - (row: Row, index: Int) => row.getSeq[Double](index) - case ArrayType(DoubleType, true) => - (row: Row, index: Int) => row.getSeq[Double](index) - case ArrayType(FloatType, false) => - (row: Row, index: Int) => row.getSeq[Float](index) - case ArrayType(FloatType, true) => - (row: Row, index: Int) => row.getSeq[Float](index) - case DoubleType => - (row: Row, index: Int) => Seq[Double](row.getDouble(index)) - case FloatType => - (row: Row, index: Int) => Seq[Float](row.getFloat(index)) - case _ => - if (colType.typeName.contains("vector")) { - (row: Row, index: Int) => getVectorSeq(row, colType, index) - } else { - throw new IllegalArgumentException( - s"$colType is not a supported (Unexpected path).") - } - } - } -} - - -/** - * [[DLEstimator]] helps to train a BigDL Model with the Spark ML Estimator/Transfomer pattern, - * thus Spark users can conveniently fit BigDL into Spark ML pipeline. - * - * [[DLEstimator]] supports feature and label data in the format of - * Array[Double], Array[Float], org.apache.spark.mllib.linalg.{Vector, VectorUDT}, - * org.apache.spark.ml.linalg.{Vector, VectorUDT}, Double and Float. - * - * User should specify the feature data dimensions and label data dimensions via the constructor - * parameters featureSize and labelSize respectively. Internally the feature and label data are - * converted to BigDL tensors, to further train a BigDL model efficiently. - * - * For details usage, please refer to examples in package - * com.intel.analytics.bigdl.example.MLPipeline - * - * @param model BigDL module to be optimized - * @param criterion BigDL criterion method - * @param featureSize The size (Tensor dimensions) of the feature data. e.g. an image may be with - * width * height = 28 * 28, featureSize = Array(28, 28). - * @param labelSize The size (Tensor dimensions) of the label data. - */ -@deprecated("`DLEstimator` is deprecated." + - "com.intel.analytics.bigdl.dlframes is deprecated in BigDL 0.11, " + - "and will be removed in future releases", "0.10.0") -class DLEstimator[@specialized(Float, Double) T: ClassTag]( - @transient val model: Module[T], - val criterion : Criterion[T], - val featureSize : Array[Int], - val labelSize : Array[Int], - override val uid: String = "DLEstimator")(implicit ev: TensorNumeric[T]) - extends DLEstimatorBase[DLEstimator[T], DLModel[T]] with DLParams[T] { - - def setFeaturesCol(featuresColName: String): this.type = set(featuresCol, featuresColName) - - def setLabelCol(labelColName : String) : this.type = set(labelCol, labelColName) - - def setPredictionCol(value: String): this.type = set(predictionCol, value) - - def setBatchSize(value: Int): this.type = set(batchSize, value) - - def setEndWhen(trigger: Trigger): this.type = set(endWhen, trigger) - - def setLearningRate(value: Double): this.type = set(learningRate, value) - setDefault(learningRate -> 1e-3) - - def setLearningRateDecay(value: Double): this.type = set(learningRateDecay, value) - setDefault(learningRateDecay -> 0.0) - - def setMaxEpoch(value: Int): this.type = set(maxEpoch, value) - setDefault(maxEpoch -> 50) - - def setOptimMethod(value: OptimMethod[T]): this.type = set(optimMethod, value) - set(optimMethod, new SGD[T]) - - @transient private var trainSummary: Option[TrainSummary] = None - - def getTrainSummary: Option[TrainSummary] = trainSummary - - /** - * Statistics (LearningRate, Loss, Throughput, Parameters) collected during training for the - * training data, which can be used for visualization via Tensorboard. - * Use setTrainSummary to enable train logger. Then the log will be saved to - * logDir/appName/train as specified by the parameters of TrainSummary. - * - * Default: Not enabled - */ - def setTrainSummary(value: TrainSummary): this.type = { - this.trainSummary = Some(value) - this - } - - @transient private var validationSummary: Option[ValidationSummary] = None - - /** - * Statistics (LearningRate, Loss, Throughput, Parameters) collected during training for the - * validation data if validation data is set, which can be used for visualization via - * Tensorboard. Use setValidationSummary to enable validation logger. Then the log will be - * saved to logDir/appName/ as specified by the parameters of validationSummary. - * - * Default: None - */ - def getValidationSummary: Option[ValidationSummary] = validationSummary - - /** - * Enable validation Summary - */ - def setValidationSummary(value: ValidationSummary): this.type = { - this.validationSummary = Some(value) - this - } - - @transient private var validationTrigger: Option[Trigger] = None - @transient private var validationDF: DataFrame = _ - @transient private var validationMethods: Array[ValidationMethod[T]] = _ - @transient private var validationBatchSize: Int = 0 - /** - * Set a validate evaluation during training - * - * @param trigger how often to evaluation validation set - * @param validationDF validate data set - * @param vMethods a set of validation method [[ValidationMethod]] - * @param batchSize batch size for validation - * @return this optimizer - */ - def setValidation(trigger: Trigger, validationDF: DataFrame, - vMethods : Array[ValidationMethod[T]], batchSize: Int) - : this.type = { - this.validationTrigger = Some(trigger) - this.validationDF = validationDF - this.validationMethods = vMethods - this.validationBatchSize = batchSize - this - } - - protected def validateParams(schema : StructType): Unit = { - validateDataType(schema, $(featuresCol)) - validateDataType(schema, $(labelCol)) - if(isSet(endWhen) && isSet(maxEpoch)) { - throw new IllegalArgumentException(s"endWhen and maxEpoch cannot be both set") - } - if (validationTrigger.isEmpty && validationSummary.isDefined) { - throw new IllegalArgumentException( - s"validationSummary is only valid if validation data is set.") - } - } - - override def transformSchema(schema : StructType): StructType = { - validateParams(schema) - SchemaUtils.appendColumn(schema, $(predictionCol), ArrayType(DoubleType, false)) - } - - protected override def internalFit(dataFrame: DataFrame): DLModel[T] = { - val localFeatureCol = $(featuresCol) - val localLabelCol = $(labelCol) - - def getSamples(dataFrame: DataFrame): RDD[Sample[T]] = { - val featureType = dataFrame.schema(localFeatureCol).dataType - val featureColIndex = dataFrame.schema.fieldIndex(localFeatureCol) - val labelType = dataFrame.schema(localLabelCol).dataType - val labelColIndex = dataFrame.schema.fieldIndex(localLabelCol) - - val featureFunc = getConvertFunc(featureType) - val labelFunc = getConvertFunc(labelType) - - val featureAndLabel: RDD[(Seq[AnyVal], Seq[AnyVal])] = dataFrame.rdd.map { row => - val features = featureFunc(row, featureColIndex) - val labels = labelFunc(row, labelColIndex) - (features, labels) - } - - val samples = featureAndLabel.map { case (f, l) => - // convert feature and label data type to the same type with model - // TODO: investigate to reduce memory consumption during conversion. - val feature = f.head match { - case dd: Double => f.asInstanceOf[Seq[Double]].map(ev.fromType(_)) - case ff: Float => f.asInstanceOf[Seq[Float]].map(ev.fromType(_)) - } - val label = l.head match { - case dd: Double => l.asInstanceOf[Seq[Double]].map(ev.fromType(_)) - case ff: Float => l.asInstanceOf[Seq[Float]].map(ev.fromType(_)) - } - (feature, label) - }.map { case (feature, label) => - Sample(Tensor(feature.toArray, featureSize), Tensor(label.toArray, labelSize)) - } - samples - } - - val trainingSamples = getSamples(dataFrame) - val state = T("learningRate" -> $(learningRate), "learningRateDecay" -> $(learningRateDecay)) - val endTrigger = if (isSet(endWhen)) $(endWhen) else Trigger.maxEpoch($(maxEpoch)) - val optimizer = Optimizer(model, trainingSamples, criterion, $(batchSize)) - .setState(state) - .setOptimMethod($(optimMethod)) - .setEndWhen(endTrigger) - - if (validationTrigger.isDefined) { - val validationSamples = getSamples(validationDF) - optimizer.setValidation( - validationTrigger.get, - validationSamples, - validationMethods, - validationBatchSize) - if (this.validationSummary.isDefined) { - optimizer.setValidationSummary(this.validationSummary.get) - } - } - - if (this.trainSummary.isDefined) { - optimizer.setTrainSummary(this.trainSummary.get) - } - - val optimizedModel = optimizer.optimize() - wrapBigDLModel(optimizedModel, featureSize) - } - - /** - * sub classes can extend the method and return required model for different transform tasks - */ - protected def wrapBigDLModel(m: Module[T], featureSize: Array[Int]): DLModel[T] = { - val dlModel = new DLModel[T](m, featureSize) - copyValues(dlModel.setParent(this)) - } - - override def copy(extra: ParamMap): DLEstimator[T] = { - copyValues(new DLEstimator(model, criterion, featureSize, labelSize), extra) - } -} - -/** - * [[DLModel]] helps embed a BigDL model into a Spark Transformer, thus Spark users can - * conveniently merge BigDL into Spark ML pipeline. - * [[DLModel]] supports feature data in the format of - * Array[Double], Array[Float], org.apache.spark.mllib.linalg.{Vector, VectorUDT}, - * org.apache.spark.ml.linalg.{Vector, VectorUDT}, Double and Float. - * Internally [[DLModel]] use features column as storage of the feature data, and create - * Tensors according to the constructor parameter featureSize. - * - * [[DLModel]] is compatible with both spark 1.5-plus and 2.0 by extending ML Transformer. - * @param model trainned BigDL models to use in prediction. - * @param featureSize The size (Tensor dimensions) of the feature data. (e.g. an image may be with - * featureSize = 28 * 28). - */ -@deprecated("`DLModel` is deprecated." + - "com.intel.analytics.bigdl.dlframes is deprecated in BigDL 0.11, " + - "and will be removed in future releases", "0.10.0") -class DLModel[@specialized(Float, Double) T: ClassTag]( - @transient val model: Module[T], - var featureSize : Array[Int], - override val uid: String = "DLModel" - )(implicit ev: TensorNumeric[T]) - extends DLTransformerBase[DLModel[T]] with DLParams[T] with HasBatchSize { - - def setFeaturesCol(featuresColName: String): this.type = set(featuresCol, featuresColName) - - def setPredictionCol(value: String): this.type = set(predictionCol, value) - - def setFeatureSize(value: Array[Int]): this.type = { - this.featureSize = value - this - } - - def setBatchSize(value: Int): this.type = set(batchSize, value) - - def getFeatureSize: Array[Int] = this.featureSize - - /** - * Perform a prediction on featureCol, and write result to the predictionCol. - */ - protected override def internalTransform(dataFrame: DataFrame): DataFrame = { - val featureType = dataFrame.schema($(featuresCol)).dataType - val featureColIndex = dataFrame.schema.fieldIndex($(featuresCol)) - val featureFunc = getConvertFunc(featureType) - val sc = dataFrame.sqlContext.sparkContext - val modelBroadCast = ModelBroadcast[T]().broadcast(sc, model.evaluate()) - val localBatchSize = $(batchSize) - val transformerBC = sc.broadcast(SampleToMiniBatch[T](localBatchSize)) - - val resultRDD = dataFrame.rdd.mapPartitions { rowIter => - val localModel = modelBroadCast.value() - val transformer = transformerBC.value.cloneTransformer() - rowIter.grouped(localBatchSize).flatMap { rowBatch => - val samples = rowBatch.map { row => - val features = featureFunc(row, featureColIndex) - val featureBuffer = features.head match { - case dd: Double => features.asInstanceOf[Seq[Double]].map(ev.fromType(_)) - case ff: Float => features.asInstanceOf[Seq[Float]].map(ev.fromType(_)) - } - Sample(Tensor(featureBuffer.toArray, featureSize)) - }.toIterator - val predictions = transformer(samples).flatMap { batch => - val batchResult = localModel.forward(batch.getInput()) - batchResult.toTensor.split(1).map(outputToPrediction) - } - rowBatch.toIterator.zip(predictions).map { case (row, predict) => - Row.fromSeq(row.toSeq ++ Seq(predict)) - } - } - } - - val resultSchema = transformSchema(dataFrame.schema) - dataFrame.sqlContext.createDataFrame(resultRDD, resultSchema) - } - - protected def outputToPrediction(output: Tensor[T]): Any = { - output.clone().storage().array().map(ev.toType[Double]) - } - - override def transformSchema(schema : StructType): StructType = { - validateDataType(schema, $(featuresCol)) - SchemaUtils.appendColumn(schema, $(predictionCol), ArrayType(DoubleType, false)) - } - - override def copy(extra: ParamMap): DLModel[T] = { - val copied = new DLModel(model, featureSize, uid).setParent(parent) - copyValues(copied, extra) - } -} - -// TODO, add save/load -object DLModel { - - -} - diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLImageReader.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLImageReader.scala deleted file mode 100644 index 2df4112b3d7..00000000000 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLImageReader.scala +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.intel.analytics.bigdl.dlframes - -import com.intel.analytics.bigdl.tensor.{Storage, Tensor} -import com.intel.analytics.bigdl.transform.vision.image.{BytesToMat, ImageFeature, ImageFrame} -import org.apache.spark.SparkContext -import org.apache.spark.sql.types._ -import org.apache.spark.sql.{DataFrame, Row, SQLContext} -import org.opencv.core.CvType -import scala.language.existentials - -/** - * Definition for image data in a DataFrame - */ -object DLImageSchema { - - /** - * Schema for the image column in a DataFrame. Image data is saved in an array of Bytes. - * The format is compatible with Spark Image format in v2.3 - */ - val byteSchema = StructType( - StructField("origin", StringType, true) :: - StructField("height", IntegerType, false) :: - StructField("width", IntegerType, false) :: - StructField("nChannels", IntegerType, false) :: - // OpenCV-compatible type: CV_8UC3, CV_32FC3 in most cases - StructField("mode", IntegerType, false) :: - // Bytes in OpenCV-compatible order: row-wise BGR in most cases - StructField("data", BinaryType, false) :: Nil) - - /** - * Schema for the image column in a DataFrame. Image data is saved in an array of Floats. - */ - val floatSchema = StructType( - StructField("origin", StringType, true) :: - StructField("height", IntegerType, false) :: - StructField("width", IntegerType, false) :: - StructField("nChannels", IntegerType, false) :: - // OpenCV-compatible type: CV_8UC3, CV_32FC3 in most cases - StructField("mode", IntegerType, false) :: - // floats in OpenCV-compatible order: row-wise BGR in most cases - StructField("data", new ArrayType(FloatType, false), false) :: Nil) - - private[dlframes] def imf2Row(imf: ImageFeature): Row = { - val (mode, data) = if (imf.contains(ImageFeature.imageTensor)) { - val floatData = imf(ImageFeature.imageTensor).asInstanceOf[Tensor[Float]].storage().array() - val cvType = imf.getChannel() match { - case 1 => CvType.CV_32FC1 - case 3 => CvType.CV_32FC3 - case 4 => CvType.CV_32FC4 - case other => throw new IllegalArgumentException(s"Unsupported number of channels:" + - s" $other in ${imf.uri()}. Only 1, 3 and 4 are supported.") - } - (cvType, floatData) - } else if (imf.contains(ImageFeature.bytes)) { - val bytesData = imf.bytes() - val cvType = imf.getChannel() match { - case 1 => CvType.CV_8UC1 - case 3 => CvType.CV_8UC3 - case 4 => CvType.CV_8UC4 - case other => throw new IllegalArgumentException(s"Unsupported number of channels:" + - s" $other in ${imf.uri()}. Only 1, 3 and 4 are supported.") - } - (cvType, bytesData) - } else { - throw new IllegalArgumentException(s"ImageFeature should have imageTensor or bytes.") - } - - Row( - imf.uri(), - imf.getHeight(), - imf.getWidth(), - imf.getChannel(), - mode, - data - ) - } - - private[dlframes] def row2IMF(row: Row): ImageFeature = { - val (origin, h, w, c) = (row.getString(0), row.getInt(1), row.getInt(2), row.getInt(3)) - val imf = ImageFeature() - imf.update(ImageFeature.uri, origin) - imf.update(ImageFeature.size, (h, w, c)) - val storageType = row.getInt(4) - storageType match { - case CvType.CV_8UC3 | CvType.CV_8UC1 | CvType.CV_8UC4 => - imf.update(ImageFeature.bytes, row.getAs[Array[Byte]](5)) - BytesToMat().transform(imf) - case CvType.CV_32FC3 | CvType.CV_32FC1 | CvType.CV_32FC4 => - val data = row.getSeq[Float](5).toArray - val size = Array(h, w, c) - val ten = Tensor(Storage(data)).resize(size) - imf.update(ImageFeature.imageTensor, ten) - case _ => - throw new IllegalArgumentException(s"Unsupported data type in imageColumn: $storageType") - } - imf - } -} - -/** - * Primary DataFrame-based image loading interface, defining API to read images into DataFrame. - */ -object DLImageReader { - - /** - * DataFrame with a single column of images named "image" (nullable) - */ - private val imageColumnSchema = - StructType(StructField("image", DLImageSchema.byteSchema, true) :: Nil) - - /** - * Read the directory of images into DataFrame from the local or remote source. - * - * @param path Directory to the input data files, the path can be comma separated paths as the - * list of inputs. Wildcards path are supported similarly to sc.binaryFiles(path). - * @param sc SparkContext to be used. - * @param minPartitions Number of the DataFrame partitions, - * if omitted uses defaultParallelism instead - * @return DataFrame with a single column "image" of images; - * see DLImageSchema for the details - */ - def readImages(path: String, sc: SparkContext, minPartitions: Int = 1): DataFrame = { - val imageFrame = ImageFrame.read(path, sc, minPartitions) - val rowRDD = imageFrame.toDistributed().rdd.map { imf => - Row(DLImageSchema.imf2Row(imf)) - } - SQLContext.getOrCreate(sc).createDataFrame(rowRDD, imageColumnSchema) - } -} diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLImageTransformer.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLImageTransformer.scala deleted file mode 100644 index fa728e0eb17..00000000000 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/DLImageTransformer.scala +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.intel.analytics.bigdl.dlframes - -import com.intel.analytics.bigdl.dataset.Transformer -import com.intel.analytics.bigdl.transform.vision.image.{FeatureTransformer, ImageFeature, MatToTensor} -import org.apache.spark.ml.DLTransformerBase -import org.apache.spark.ml.adapter.{HasInputCol, HasOutputCol, SchemaUtils} -import org.apache.spark.ml.util.Identifiable -import org.apache.spark.sql.types._ -import org.apache.spark.sql.{DataFrame, Row} - -/** - * Provides DataFrame-based API for image pre-processing and feature transformation. - * DLImageTransformer follows the Spark Transformer API pattern and can be used as one stage - * in Spark ML pipeline. - * - * The input column can be either DLImageSchema.byteSchema or DLImageSchema.floatSchema. If - * using DLImageReader, the default format is DLImageSchema.byteSchema - * The output column is always DLImageSchema.floatSchema. - * - * @param transformer Single or a sequence of BigDL FeatureTransformers to be used. E.g. - * Resize(256, 256) -> CenterCrop(224, 224) -> - * ChannelNormalize(123, 117, 104, 1, 1, 1) -> MatToTensor() - */ -class DLImageTransformer ( - val transformer: Transformer[ImageFeature, ImageFeature], - override val uid: String) - extends DLTransformerBase with HasInputCol with HasOutputCol { - - def this(transformer: FeatureTransformer) = - this(transformer, Identifiable.randomUID("DLImageTransformer")) - - setDefault(inputCol -> "image") - def setInputCol(value: String): this.type = set(inputCol, value) - - setDefault(outputCol -> "output") - def setOutputCol(value: String): this.type = set(outputCol, value) - - protected def validateInputType(inputType: DataType): Unit = { - val validTypes = Array(DLImageSchema.floatSchema, DLImageSchema.byteSchema) - - require(validTypes.exists(t => SchemaUtils.sameType(inputType, t)), - s"Bad input type: $inputType. Requires ${validTypes.mkString(", ")}") - } - - override def transformSchema(schema: StructType): StructType = { - val inputType = schema($(inputCol)).dataType - validateInputType(inputType) - if (schema.fieldNames.contains($(outputCol))) { - throw new IllegalArgumentException(s"Output column ${$(outputCol)} already exists.") - } - - val outputFields = schema.fields :+ - StructField($(outputCol), DLImageSchema.floatSchema, nullable = false) - StructType(outputFields) - } - - protected override def internalTransform(dataFrame: DataFrame): DataFrame = { - transformSchema(dataFrame.schema, logging = true) - val sc = dataFrame.sqlContext.sparkContext - val localTransformer = this.transformer - val transformerBC = sc.broadcast(localTransformer) - val toTensorBC = sc.broadcast(MatToTensor[Float](shareBuffer = true)) - - val inputColIndex = dataFrame.schema.fieldIndex($(inputCol)) - val resultRDD = dataFrame.rdd.mapPartitions { rowIter => - val localTransformer = transformerBC.value.cloneTransformer() - val toTensorTransformer = toTensorBC.value.cloneTransformer().asInstanceOf[MatToTensor[Float]] - rowIter.map { row => - val imf = DLImageSchema.row2IMF(row.getAs[Row](inputColIndex)) - val output = localTransformer.apply(Iterator(imf)).toArray.head - if (!output.contains(ImageFeature.imageTensor)) { - toTensorTransformer.transform(output) - } - Row.fromSeq(row.toSeq ++ Seq(DLImageSchema.imf2Row(output))) - } - } - - val resultSchema = transformSchema(dataFrame.schema) - dataFrame.sqlContext.createDataFrame(resultRDD, resultSchema) - } -} diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/SharedParamsAdapter.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/SharedParamsAdapter.scala deleted file mode 100644 index c7737f74de0..00000000000 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dlframes/SharedParamsAdapter.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2016 The BigDL Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ml.adapter - -import org.apache.spark.sql.types.{DataType, StructField, StructType} - - -trait HasPredictionCol extends org.apache.spark.ml.param.shared.HasPredictionCol - -trait HasFeaturesCol extends org.apache.spark.ml.param.shared.HasFeaturesCol - -trait HasInputCol extends org.apache.spark.ml.param.shared.HasInputCol - -trait HasOutputCol extends org.apache.spark.ml.param.shared.HasOutputCol - -object SchemaUtils { - - /** - * Appends a new column to the input schema. This fails if the given output column already exists - * @param schema input schema - * @param colName new column name. If this column name is an empty string "", this method returns - * the input schema unchanged. This allows users to disable output columns. - * @param dataType new column data type - * @return new schema with the input column appended - */ - def appendColumn( - schema: StructType, - colName: String, - dataType: DataType, - nullable: Boolean = false): StructType = { - - val colSF = StructField(colName, dataType, nullable) - require(!schema.fieldNames.contains(colSF.name), s"Column ${colSF.name} already exists.") - StructType(schema.fields :+ colSF) - } - - def sameType(a: DataType, b: DataType): Boolean = a.sameType(b) - -} diff --git a/spark/dl/src/test/integration-test.robot b/spark/dl/src/test/integration-test.robot index a82b20225d1..522e0e9043f 100644 --- a/spark/dl/src/test/integration-test.robot +++ b/spark/dl/src/test/integration-test.robot @@ -70,8 +70,6 @@ Run Spark Test Run Shell ${submit} --master ${spark_master} --conf "spark.serializer=org.apache.spark.serializer.JavaSerializer" --driver-memory 40g --executor-memory 40g --executor-cores 8 --total-executor-cores 8 --class com.intel.analytics.bigdl.example.languagemodel.PTBWordLM ${jar_path} -f ./simple-examples/data -b 120 --numLayers 2 --vocab 10001 --hidden 650 --numSteps 35 --learningRate 0.005 -e 1 --learningRateDecay 0.001 --keepProb 0.5 --overWrite --optimizerVersion "optimizerV2" Log To Console begin resnet Train Run Shell ${submit} --master ${spark_master} --conf "spark.serializer=org.apache.spark.serializer.JavaSerializer" --driver-memory 5g --executor-memory 5g --executor-cores 8 --total-executor-cores 32 --class com.intel.analytics.bigdl.models.resnet.TrainCIFAR10 ${jar_path} -f /tmp/cifar --batchSize 448 --optnet true --depth 20 --classes 10 --shortcutType A --nEpochs 1 --learningRate 0.1 --optimizerVersion "optimizerV2" - Log To Console begin DLClassifierLeNet - Run Shell ${submit} --master ${spark_master} --executor-cores 16 --total-executor-cores 16 --driver-memory 5g --executor-memory 30g --class com.intel.analytics.bigdl.example.MLPipeline.DLClassifierLeNet ${jar_path} -b 1200 -f /tmp/mnist --maxEpoch 1 Log To Console begin rnn Train Run Shell ${submit} --master ${spark_master} --driver-memory 5g --executor-memory 5g --executor-cores 12 --total-executor-cores 12 --class com.intel.analytics.bigdl.models.rnn.Train ${jar_path} -f ./ -s ./models --nEpochs 1 --checkpoint ./model/ -b 12 --optimizerVersion "optimizerV2" Log To Console begin inceptionV1 train @@ -124,8 +122,6 @@ Yarn Test Suite Set Environment Variable http_proxy ${http_proxy} Set Environment Variable https_proxy ${https_proxy} ${submit}= Catenate SEPARATOR=/ ${spark_home} bin spark-submit - Log To Console begin DLClassifierLeNet - Run Shell ${submit} --master yarn --deploy-mode client --conf "spark.serializer=org.apache.spark.serializer.JavaSerializer" --executor-cores 10 --num-executors 1 --driver-memory 20g --executor-memory 60g --class com.intel.analytics.bigdl.example.MLPipeline.DLClassifierLeNet ${jar_path} -b 1200 -f /tmp/mnist --maxEpoch 1 Log To Console begin text classification Run Shell ${submit} --master yarn --deploy-mode client --conf "spark.serializer=org.apache.spark.serializer.JavaSerializer" --conf spark.yarn.executor.memoryOverhead=40000 --executor-cores 10 --num-executors 2 --driver-memory 20g --executor-memory 40g --class com.intel.analytics.bigdl.example.textclassification.TextClassifier ${jar_path} --batchSize 240 --baseDir /tmp/text_data --partitionNum 4 Log To Console begin lenet