From 009f854693aabee2f53db8d00edeab872f942070 Mon Sep 17 00:00:00 2001 From: Xin Qiu Date: Wed, 31 Jul 2019 13:44:00 +0800 Subject: [PATCH] Incremental Training for imagenet (#1391) --- .../estimator/DistriEstimatorSpec.scala | 115 +++++++++++++++++- 1 file changed, 112 insertions(+), 3 deletions(-) diff --git a/spark/dl/src/test/scala/com/intel/analytics/bigdl/pipeline/estimator/DistriEstimatorSpec.scala b/spark/dl/src/test/scala/com/intel/analytics/bigdl/pipeline/estimator/DistriEstimatorSpec.scala index a6c618a6b8b..13e643e906a 100644 --- a/spark/dl/src/test/scala/com/intel/analytics/bigdl/pipeline/estimator/DistriEstimatorSpec.scala +++ b/spark/dl/src/test/scala/com/intel/analytics/bigdl/pipeline/estimator/DistriEstimatorSpec.scala @@ -16,13 +16,14 @@ package com.intel.analytics.zoo.pipeline.estimator import com.intel.analytics.bigdl.Module -import com.intel.analytics.bigdl.dataset.{DistributedDataSet, MiniBatch} +import com.intel.analytics.bigdl.dataset.{DistributedDataSet, MiniBatch, Sample, SampleToMiniBatch} import com.intel.analytics.bigdl.nn._ import com.intel.analytics.bigdl.optim.{LBFGS, Loss, SGD, Trigger} import com.intel.analytics.bigdl.tensor.{Storage, Tensor} import com.intel.analytics.bigdl.utils.{Engine, LoggerFilter, RandomGenerator} -import com.intel.analytics.zoo.common.NNContext -import com.intel.analytics.zoo.feature.{DistributedDataSetWrapper, DistributedFeatureSet} +import com.intel.analytics.zoo.common._ +import com.intel.analytics.zoo.feature.pmem.DISK_AND_DRAM +import com.intel.analytics.zoo.feature.{DistributedDataSetWrapper, DistributedFeatureSet, FeatureSet} import com.intel.analytics.zoo.pipeline.api.keras.ZooSpecHelper import com.intel.analytics.zoo.pipeline.api.keras.models.InternalOptimizerUtil import org.apache.log4j.{Level, Logger} @@ -253,4 +254,112 @@ class DistriEstimatorSpec extends ZooSpecHelper { result } + "Estimator" should "works fine with DiskFeatureSet nslice = 2" in { + val rdd = sc.parallelize(1 to (256 * nodeNumber), nodeNumber) + .map(_ => Sample[Double](Tensor[Double](4).rand(), Tensor[Double](1).rand())) + val dset = FeatureSet.rdd(rdd, DISK_AND_DRAM(2)) -> SampleToMiniBatch(batchSize = batchSize) + val mm = EstimatorSpecModel.mse + mm.parameters()._1.foreach(_.fill(0.125)) + val sgd = new SGD[Double](20) + val estimator = Estimator(mm, sgd) + val mse = MSECriterion[Double]() + val state = InternalOptimizerUtil.getStateFromOptiMethod(sgd) + estimator.train(dset, mse, endTrigger = Some(MaxEpoch(2))) + + state[Int]("epoch") should be (3) + } + + "Estimator" should "works fine with DiskFeatureSet nslice = 5" in { + val rdd = sc.parallelize(1 to (1024 * nodeNumber), nodeNumber) + .map(_ => Sample[Double](Tensor[Double](4).rand(), Tensor[Double](1).rand())) + val dset = FeatureSet.rdd(rdd, DISK_AND_DRAM(5)) -> SampleToMiniBatch(batchSize = batchSize) + val mm = EstimatorSpecModel.mse + mm.parameters()._1.foreach(_.fill(0.125)) + val sgd = new SGD[Double](20) + val mse = MSECriterion[Double]() + val estimator = Estimator(mm, sgd) + val state = InternalOptimizerUtil.getStateFromOptiMethod(sgd) + estimator.train(dset, mse, endTrigger = Some(MaxEpoch(2))) + + state[Int]("epoch") should be (3) + } + + "Estimator" should "stop with MaxIteration" in { + val rdd = sc.parallelize(1 to (1024 * nodeNumber), nodeNumber) + .map(_ => Sample[Double](Tensor[Double](4).rand(), Tensor[Double](1).rand())) + val dset = FeatureSet.rdd(rdd, DISK_AND_DRAM(5)) -> SampleToMiniBatch(batchSize = batchSize) + val mm = EstimatorSpecModel.mse + mm.parameters()._1.foreach(_.fill(0.125)) + val sgd = new SGD[Double](20) + val mse = MSECriterion[Double]() + val estimator = Estimator(mm, sgd) + val state = InternalOptimizerUtil.getStateFromOptiMethod(sgd) + estimator.train(dset, mse, endTrigger = Some(MaxIteration(200))) + + state[Int]("neval") should be (201) + state[Int]("epoch") should be (2) + } + + "Estimator" should "stop with Or Trigger" in { + val rdd = sc.parallelize(1 to (1024 * nodeNumber), nodeNumber) + .map(_ => Sample[Double](Tensor[Double](4).rand(), Tensor[Double](1).rand())) + val dset = FeatureSet.rdd(rdd, DISK_AND_DRAM(5)) -> SampleToMiniBatch(batchSize = batchSize) + val mm = EstimatorSpecModel.mse + mm.parameters()._1.foreach(_.fill(0.125)) + val sgd = new SGD[Double](20) + val mse = MSECriterion[Double]() + val estimator = Estimator(mm, sgd) + val state = InternalOptimizerUtil.getStateFromOptiMethod(sgd) + estimator.train(dset, mse, + endTrigger = Some(Or(MaxIteration(200), MinLoss(0.05f)))) + + state[Int]("neval") should be <= 201 + state[Int]("epoch") should be <= 2 + } + + "Estimator" should "works with And Trigger" in { + val rdd = sc.parallelize(1 to (1024 * nodeNumber), nodeNumber) + .map(_ => Sample[Double](Tensor[Double](4).rand(), Tensor[Double](1).rand())) + val dset = FeatureSet.rdd(rdd, DISK_AND_DRAM(5)) -> SampleToMiniBatch(batchSize = batchSize) + val mm = EstimatorSpecModel.mse + mm.parameters()._1.foreach(_.fill(0.125)) + val sgd = new SGD[Double](20) + val mse = MSECriterion[Double]() + val estimator = Estimator(mm, sgd) + estimator.train(dset, mse, + endTrigger = Some(MaxIteration(200)), + checkPointTrigger = Some(Or(SeveralIteration(80), EveryEpoch())), + validationSet = dset, + validationMethod = Array(new Loss(mse))) + + val state = InternalOptimizerUtil.getStateFromOptiMethod(sgd) + state[Int]("neval") should be <= 201 + state[Int]("epoch") should be <= 2 + } + + "Estimator" should "throw exception when cache percentage == 0" in { + val rdd = sc.parallelize(1 to (1024 * nodeNumber), nodeNumber) + .map(_ => Sample[Double](Tensor[Double](4).rand(), Tensor[Double](1).rand())) + val dset = FeatureSet.rdd(rdd, DISK_AND_DRAM(0)) -> SampleToMiniBatch(batchSize = batchSize) + val mm = EstimatorSpecModel.mse + mm.parameters()._1.foreach(_.fill(0.125)) + val sgd = new SGD[Double](20) + val mse = MSECriterion[Double]() + val estimator = Estimator(mm, sgd) + intercept[Exception] { + estimator.train(dset, mse, endTrigger = Some(MaxIteration(200))) + } + } + + "Estimator" should "evaluate should works fine" in { + val rdd = sc.parallelize(1 to (1024 * nodeNumber), nodeNumber) + .map(_ => Sample[Double](Tensor[Double](4).rand(), Tensor[Double](1).rand())) + val dset = FeatureSet.rdd(rdd, DISK_AND_DRAM(0)) -> SampleToMiniBatch(batchSize = batchSize) + val mm = EstimatorSpecModel.mse + mm.parameters()._1.foreach(_.fill(0.125)) + val sgd = new SGD[Double](20) + val mse = MSECriterion[Double]() + val estimator = Estimator(mm, sgd) + estimator.evaluate(dset, Array(new Loss[Double](mse))) + } }