Skip to content

Commit

Permalink
Incremental Training for imagenet (intel-analytics#1391)
Browse files Browse the repository at this point in the history
  • Loading branch information
qiuxin2012 committed Jul 31, 2019
1 parent 267e18f commit 009f854
Showing 1 changed file with 112 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
package com.intel.analytics.zoo.pipeline.estimator

import com.intel.analytics.bigdl.Module
import com.intel.analytics.bigdl.dataset.{DistributedDataSet, MiniBatch}
import com.intel.analytics.bigdl.dataset.{DistributedDataSet, MiniBatch, Sample, SampleToMiniBatch}
import com.intel.analytics.bigdl.nn._
import com.intel.analytics.bigdl.optim.{LBFGS, Loss, SGD, Trigger}
import com.intel.analytics.bigdl.tensor.{Storage, Tensor}
import com.intel.analytics.bigdl.utils.{Engine, LoggerFilter, RandomGenerator}
import com.intel.analytics.zoo.common.NNContext
import com.intel.analytics.zoo.feature.{DistributedDataSetWrapper, DistributedFeatureSet}
import com.intel.analytics.zoo.common._
import com.intel.analytics.zoo.feature.pmem.DISK_AND_DRAM
import com.intel.analytics.zoo.feature.{DistributedDataSetWrapper, DistributedFeatureSet, FeatureSet}
import com.intel.analytics.zoo.pipeline.api.keras.ZooSpecHelper
import com.intel.analytics.zoo.pipeline.api.keras.models.InternalOptimizerUtil
import org.apache.log4j.{Level, Logger}
Expand Down Expand Up @@ -253,4 +254,112 @@ class DistriEstimatorSpec extends ZooSpecHelper {
result
}

"Estimator" should "works fine with DiskFeatureSet nslice = 2" in {
val rdd = sc.parallelize(1 to (256 * nodeNumber), nodeNumber)
.map(_ => Sample[Double](Tensor[Double](4).rand(), Tensor[Double](1).rand()))
val dset = FeatureSet.rdd(rdd, DISK_AND_DRAM(2)) -> SampleToMiniBatch(batchSize = batchSize)
val mm = EstimatorSpecModel.mse
mm.parameters()._1.foreach(_.fill(0.125))
val sgd = new SGD[Double](20)
val estimator = Estimator(mm, sgd)
val mse = MSECriterion[Double]()
val state = InternalOptimizerUtil.getStateFromOptiMethod(sgd)
estimator.train(dset, mse, endTrigger = Some(MaxEpoch(2)))

state[Int]("epoch") should be (3)
}

"Estimator" should "works fine with DiskFeatureSet nslice = 5" in {
val rdd = sc.parallelize(1 to (1024 * nodeNumber), nodeNumber)
.map(_ => Sample[Double](Tensor[Double](4).rand(), Tensor[Double](1).rand()))
val dset = FeatureSet.rdd(rdd, DISK_AND_DRAM(5)) -> SampleToMiniBatch(batchSize = batchSize)
val mm = EstimatorSpecModel.mse
mm.parameters()._1.foreach(_.fill(0.125))
val sgd = new SGD[Double](20)
val mse = MSECriterion[Double]()
val estimator = Estimator(mm, sgd)
val state = InternalOptimizerUtil.getStateFromOptiMethod(sgd)
estimator.train(dset, mse, endTrigger = Some(MaxEpoch(2)))

state[Int]("epoch") should be (3)
}

"Estimator" should "stop with MaxIteration" in {
val rdd = sc.parallelize(1 to (1024 * nodeNumber), nodeNumber)
.map(_ => Sample[Double](Tensor[Double](4).rand(), Tensor[Double](1).rand()))
val dset = FeatureSet.rdd(rdd, DISK_AND_DRAM(5)) -> SampleToMiniBatch(batchSize = batchSize)
val mm = EstimatorSpecModel.mse
mm.parameters()._1.foreach(_.fill(0.125))
val sgd = new SGD[Double](20)
val mse = MSECriterion[Double]()
val estimator = Estimator(mm, sgd)
val state = InternalOptimizerUtil.getStateFromOptiMethod(sgd)
estimator.train(dset, mse, endTrigger = Some(MaxIteration(200)))

state[Int]("neval") should be (201)
state[Int]("epoch") should be (2)
}

"Estimator" should "stop with Or Trigger" in {
val rdd = sc.parallelize(1 to (1024 * nodeNumber), nodeNumber)
.map(_ => Sample[Double](Tensor[Double](4).rand(), Tensor[Double](1).rand()))
val dset = FeatureSet.rdd(rdd, DISK_AND_DRAM(5)) -> SampleToMiniBatch(batchSize = batchSize)
val mm = EstimatorSpecModel.mse
mm.parameters()._1.foreach(_.fill(0.125))
val sgd = new SGD[Double](20)
val mse = MSECriterion[Double]()
val estimator = Estimator(mm, sgd)
val state = InternalOptimizerUtil.getStateFromOptiMethod(sgd)
estimator.train(dset, mse,
endTrigger = Some(Or(MaxIteration(200), MinLoss(0.05f))))

state[Int]("neval") should be <= 201
state[Int]("epoch") should be <= 2
}

"Estimator" should "works with And Trigger" in {
val rdd = sc.parallelize(1 to (1024 * nodeNumber), nodeNumber)
.map(_ => Sample[Double](Tensor[Double](4).rand(), Tensor[Double](1).rand()))
val dset = FeatureSet.rdd(rdd, DISK_AND_DRAM(5)) -> SampleToMiniBatch(batchSize = batchSize)
val mm = EstimatorSpecModel.mse
mm.parameters()._1.foreach(_.fill(0.125))
val sgd = new SGD[Double](20)
val mse = MSECriterion[Double]()
val estimator = Estimator(mm, sgd)
estimator.train(dset, mse,
endTrigger = Some(MaxIteration(200)),
checkPointTrigger = Some(Or(SeveralIteration(80), EveryEpoch())),
validationSet = dset,
validationMethod = Array(new Loss(mse)))

val state = InternalOptimizerUtil.getStateFromOptiMethod(sgd)
state[Int]("neval") should be <= 201
state[Int]("epoch") should be <= 2
}

"Estimator" should "throw exception when cache percentage == 0" in {
val rdd = sc.parallelize(1 to (1024 * nodeNumber), nodeNumber)
.map(_ => Sample[Double](Tensor[Double](4).rand(), Tensor[Double](1).rand()))
val dset = FeatureSet.rdd(rdd, DISK_AND_DRAM(0)) -> SampleToMiniBatch(batchSize = batchSize)
val mm = EstimatorSpecModel.mse
mm.parameters()._1.foreach(_.fill(0.125))
val sgd = new SGD[Double](20)
val mse = MSECriterion[Double]()
val estimator = Estimator(mm, sgd)
intercept[Exception] {
estimator.train(dset, mse, endTrigger = Some(MaxIteration(200)))
}
}

"Estimator" should "evaluate should works fine" in {
val rdd = sc.parallelize(1 to (1024 * nodeNumber), nodeNumber)
.map(_ => Sample[Double](Tensor[Double](4).rand(), Tensor[Double](1).rand()))
val dset = FeatureSet.rdd(rdd, DISK_AND_DRAM(0)) -> SampleToMiniBatch(batchSize = batchSize)
val mm = EstimatorSpecModel.mse
mm.parameters()._1.foreach(_.fill(0.125))
val sgd = new SGD[Double](20)
val mse = MSECriterion[Double]()
val estimator = Estimator(mm, sgd)
estimator.evaluate(dset, Array(new Loss[Double](mse)))
}
}

0 comments on commit 009f854

Please sign in to comment.