Skip to content

Commit

Permalink
support bn when TorchModel export model to pure pytorch (intel-analyt…
Browse files Browse the repository at this point in the history
…ics#2480)

* support bn

* meet code review

* fix style check

* fix ut

* fix ut

* fix ut

* some update

* fix ut

* fix unit test

* fix style check

* some update

* update commetns
  • Loading branch information
qiuxin2012 committed Jul 9, 2020
1 parent f05594e commit 8c2f096
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class TorchModel private(private val modelHolder: TorchModel2Holder, init_weight
extends AbstractModule[Activity, Activity, Float]{
import TorchModel._

protected lazy val loaded = {
protected var loaded = false
protected lazy val load = {
PythonInterpreter.set("model_bytes", modelHolder.torchBytes)
val loadModelCode =
s"""
Expand All @@ -50,6 +51,10 @@ class TorchModel private(private val modelHolder: TorchModel2Holder, init_weight
|${getName()} = CloudPickleSerializer.loads(CloudPickleSerializer, by)
|""".stripMargin
PythonInterpreter.exec(loadModelCode)
if (extraParams.length != 0) {
setExtraParam(extraParams)
}
loaded = true
true
}

Expand All @@ -73,7 +78,7 @@ class TorchModel private(private val modelHolder: TorchModel2Holder, init_weight
|""".stripMargin

override def updateOutput(input: Activity): Activity = {
loaded
load
// TODO: delete this time counting
val startTime = System.nanoTime()
// _data is come from FeatureSet.
Expand Down Expand Up @@ -118,7 +123,7 @@ class TorchModel private(private val modelHolder: TorchModel2Holder, init_weight
}

override def updateGradInput(input: Activity, gradOutput: Activity): Activity = {
loaded
load
val startTime = System.nanoTime()
val backwardCode =
s"""
Expand Down Expand Up @@ -162,16 +167,54 @@ class TorchModel private(private val modelHolder: TorchModel2Holder, init_weight

override def evaluate(): this.type = {
super.evaluate()
loaded
load
PythonInterpreter.set("newWeight", new NDArray[Array[Float]](weights.storage().array()))
PythonInterpreter.exec(setWeightCode)
PythonInterpreter.exec(s"${getName()}.eval()")
this
}

protected var extraParams: Array[Tensor[Float]] = Array()
override def getExtraParameter(): Array[Tensor[Float]] = {
if (loaded) {
val getExtraParamCode =
s"""
|${getName()}_extra_parameters = []
|for named_buffer in ${this.getName()}.named_buffers():
| ${getName()}_extra_parameters.append(named_buffer[1].data.numpy())
|""".stripMargin
PythonInterpreter.exec(getExtraParamCode)
val extraParams = PythonInterpreter.getValue[AnyRef](s"${getName()}_extra_parameters")
PythonFeatureSet.toArrayTensor(extraParams)
} else {
extraParams
}
}

// TODO: change to override setExtraParameter when switch to bigdl 0.11.0
private[zoo] def setExtraParam(extraParams: Array[Tensor[Float]]): this.type = {
if (loaded) {
val params = extraParams.map(param => new NDArray[Array[Float]](param.storage().array()))
val paramName = s"${getName()}_new_extra_param"
val idxName = s"${getName()}_buffer_idx"
PythonInterpreter.set(paramName, params)
val setExtraParamCode =
s"""
|${idxName} = 0
|for named_buffer in ${this.getName()}.named_buffers():
| named_buffer[1].copy_(
| torch.reshape(torch.Tensor(${paramName}[${idxName}]), named_buffer[1].size()))
| ${idxName} += 1
|""".stripMargin
PythonInterpreter.exec(setExtraParamCode)
}
this.extraParams = extraParams
this
}

override def training(): this.type = {
super.training()
loaded
load
PythonInterpreter.exec(s"${getName()}.train()")
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ class PythonZooNet[T: ClassTag](implicit ev: TensorNumeric[T]) extends PythonZoo
pids.asScala.foreach(pid => processToBeKill.add(pid + ""))
}

def getModuleExtraParameters(model: AbstractModule[_, _, T]): Array[JTensor] = {
model.getExtraParameter().map(toJTensor)
}

def createTorchNet(modelPath: String): TorchNet = {
TorchNet(modelPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import com.intel.analytics.zoo.pipeline.api.autograd.{Lambda, Variable}
import com.intel.analytics.zoo.pipeline.api.autograd._
import com.intel.analytics.zoo.pipeline.api.keras.layers.Input
import com.intel.analytics.zoo.pipeline.api.keras.layers.utils._
import com.intel.analytics.zoo.pipeline.api.net.{NetUtils, TorchNet}
import com.intel.analytics.zoo.pipeline.api.net.{NetUtils, TorchModel, TorchNet}
import com.intel.analytics.zoo.pipeline.estimator.{AbstractEstimator, ConstantClipping, GradientClipping, L2NormClipping}
import com.intel.analytics.zoo.tfpark.{TFTrainingHelper, TFTrainingHelperV2}
import org.apache.commons.lang.exception.ExceptionUtils
Expand Down Expand Up @@ -1032,6 +1032,42 @@ private[zoo] object InternalOptimizerUtil {
args: _*)
}

// TODO: Delete this when switch to Bigdl 0.11.0.
def getTorchModel[T: ClassTag](
models: RDD[Cache[T]],
parameters: AllReduceParameter[T],
trainingModel: TorchModel)(implicit ev: TensorNumeric[T]): TorchModel = {
val partitionNum = models.partitions.length
val extraState = models.map(_.localModels.head.getExtraParameter()).first()
trainingModel.setExtraParam(extraState.asInstanceOf[Array[Tensor[Float]]])
val (weights, gradients) = models.mapPartitions(iter => {
val cached = iter.next()
val curPartitionId = TaskContext.getPartitionId()
Iterator.single((Map(curPartitionId -> parameters.weightPartition),
Map(curPartitionId -> parameters.gradientPartition)))
}).reduce((a, b) => (a._1 ++ b._1, a._2 ++ b._2))

val parameterArray = trainingModel.parameters()
(0 until parameterArray._2.length).foreach(i =>
parameterArray._2(i).resizeAs(parameterArray._1(i))
)
val (parameter, gradientParameter) = getParametersFromModel(trainingModel)
val parameterLength = parameter.nElement()
val taskSize = parameterLength / partitionNum
require(taskSize != 0, "parameter length should not less than partition number")
val extraSize = parameterLength % partitionNum

(0 until partitionNum).map(pid => {
val start = pid * taskSize + math.min(pid, extraSize)
val length = taskSize + (if (pid < extraSize) 1 else 0)
parameter.narrow(1, start + 1, length).copy(weights(pid).asInstanceOf[Tensor[Float]])
gradientParameter.narrow(1, start + 1, length)
.copy(gradients(pid).asInstanceOf[Tensor[Float]])
})

trainingModel
}

def releaseBroadcast[T: ClassTag](
uuid: String)(implicit ev: TensorNumeric[T]): Unit = {
KerasUtils.invokeMethodWithEv(
Expand Down Expand Up @@ -1391,7 +1427,6 @@ private[zoo] class InternalDistriOptimizer[T: ClassTag] (
): Map[ValidationMethod[T], ValidationResult] = {
val validateRDD = validationSet.toDistributed().data(train = false)
val sc = validateRDD.sparkContext
val cachedModels = InternalOptimizerUtil.getModelCacheFromOptimizer(this)

val coresPerNode = EngineRef.getCoreNumber()
val _subModelNumber = EngineRef.getEngineType() match {
Expand All @@ -1418,7 +1453,7 @@ private[zoo] class InternalDistriOptimizer[T: ClassTag] (
}
} else {
val bcVMethods = validateRDD.sparkContext.broadcast(validationMethod)
val bcModel = ModelBroadcast[T]().broadcast(sc, model)
val bcModel = ModelBroadcast[T]().broadcast(sc, _model)
validateRDD.mapPartitions{_ =>
Iterator.single(Cache[T](
Array.tabulate(_subModelNumber)(_ => bcModel.value()),
Expand Down Expand Up @@ -1554,47 +1589,53 @@ object InternalDistriOptimizer {
trainingModel: Module[T])(implicit ev: TensorNumeric[T])
: Module[T] = {

if (!trainingModel.isInstanceOf[TFTrainingHelperV2]) {
InternalOptimizerUtil.getModel(models, parameters, trainingModel)
} else {
val partitionNum = models.partitions.length
models.mapPartitions(iter => {
iter.next().localModels.head.asInstanceOf[TFTrainingHelperV2].moveWeightsOutOfTF()
Iterator.single(1)
}).reduce(_ + _)
val extraState = models.map(_.localModels.head.getExtraParameter()).first()
trainingModel.setExtraParameter(extraState)

// make sure gradient is as the same length as weight
val parameterArray = trainingModel.parameters()
(0 until parameterArray._2.length).foreach(i =>
parameterArray._2(i).resizeAs(parameterArray._1(i))
)

val (parameter, gradientParameter) =
InternalOptimizerUtil.getParametersFromModel(trainingModel)


val (weights, gradients) = models.mapPartitions(iter => {
val cached = iter.next()
val curPartitionId = TaskContext.getPartitionId()
val (offset, size) = InternalOptimizerUtil.getLocalPartitionRangeFromParameters(parameters)
val weightTensor = Tensor[T](size)
weightTensor.copy(cached.modelWeights.head.narrow(1, offset, size))
Iterator.single((Map(curPartitionId -> weightTensor),
Map(curPartitionId -> parameters.gradientPartition)))
}).reduce((a, b) => (a._1 ++ b._1, a._2 ++ b._2))

val taskSize = parameters.size / partitionNum
require(taskSize != 0, "parameter length should not less than partition number")
val extraSize = parameters.size % partitionNum

(0 until partitionNum).map(pid => {
val start = parameters.paramOffset + pid * taskSize + math.min(pid, extraSize)
val length = taskSize + (if (pid < extraSize) 1 else 0)
parameter.narrow(1, start, length).copy(weights(pid))
gradientParameter.narrow(1, start, length).copy(gradients(pid))
})
trainingModel match {
case _: TFTrainingHelperV2 =>
val partitionNum = models.partitions.length
models.mapPartitions(iter => {
iter.next().localModels.head.asInstanceOf[TFTrainingHelperV2].moveWeightsOutOfTF()
Iterator.single(1)
}).reduce(_ + _)
val extraState = models.map(_.localModels.head.getExtraParameter()).first()
trainingModel.setExtraParameter(extraState)

// make sure gradient is as the same length as weight
val parameterArray = trainingModel.parameters()
(0 until parameterArray._2.length).foreach(i =>
parameterArray._2(i).resizeAs(parameterArray._1(i))
)

val (parameter, gradientParameter) =
InternalOptimizerUtil.getParametersFromModel(trainingModel)


val (weights, gradients) = models.mapPartitions(iter => {
val cached = iter.next()
val curPartitionId = TaskContext.getPartitionId()
val (offset, size) =
InternalOptimizerUtil.getLocalPartitionRangeFromParameters(parameters)
val weightTensor = Tensor[T](size)
weightTensor.copy(cached.modelWeights.head.narrow(1, offset, size))
Iterator.single((Map(curPartitionId -> weightTensor),
Map(curPartitionId -> parameters.gradientPartition)))
}).reduce((a, b) => (a._1 ++ b._1, a._2 ++ b._2))

val taskSize = parameters.size / partitionNum
require(taskSize != 0, "parameter length should not less than partition number")
val extraSize = parameters.size % partitionNum

(0 until partitionNum).map(pid => {
val start = parameters.paramOffset + pid * taskSize + math.min(pid, extraSize)
val length = taskSize + (if (pid < extraSize) 1 else 0)
parameter.narrow(1, start, length).copy(weights(pid))
gradientParameter.narrow(1, start, length).copy(gradients(pid))
})
case model: TorchModel =>
// TODO: delete this when switch to bigdl 0.11.0 and TorchModel override setExtraParameters
InternalOptimizerUtil.getTorchModel(
models, parameters, model)
case _ =>
InternalOptimizerUtil.getModel(models, parameters, trainingModel)
}
trainingModel
}
Expand Down

0 comments on commit 8c2f096

Please sign in to comment.