From 684909d65791d4a23dcccf618b891e099b52b318 Mon Sep 17 00:00:00 2001 From: Xin Qiu Date: Mon, 9 Sep 2019 09:38:44 +0800 Subject: [PATCH] Save Keras-like model to pure keras or tensorflow protobuf. (#1600) * checkpoint * some update * refine api * some update * fix build fail * meet code review * style check * fix typo * fix style check --- .../analytics/bigdl/dllib/inference/Net.scala | 253 +++++++++++++++++- .../bigdl/dllib/zooKeras/layers/Dense.scala | 18 ++ .../bigdl/dllib/zooKeras/layers/Dropout.scala | 11 +- .../bigdl/dllib/zooKeras/layers/LSTM.scala | 35 +++ .../bigdl/dllib/zooKeras/layers/Permute.scala | 7 + .../bigdl/dllib/zooKeras/layers/Reshape.scala | 7 + .../zooKeras/layers/utils/KerasUtils.scala | 28 +- .../dllib/zooKeras/models/Topology.scala | 36 +++ 8 files changed, 390 insertions(+), 5 deletions(-) diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/inference/Net.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/inference/Net.scala index c92d031f226..5b59e16c4ac 100644 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/inference/Net.scala +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/inference/Net.scala @@ -16,21 +16,33 @@ package com.intel.analytics.zoo.pipeline.api +import java.io.{BufferedReader, BufferedWriter, FileOutputStream, FileWriter, InputStreamReader, File => JFile} import java.nio.ByteOrder +import java.util +import com.intel.analytics.bigdl.Module import com.intel.analytics.bigdl.nn.Graph._ import com.intel.analytics.bigdl.nn.abstractnn.{AbstractModule, Activity, Initializable} -import com.intel.analytics.bigdl.nn.keras.KerasLayer +import com.intel.analytics.bigdl.nn.keras.{KerasIdentityWrapper, KerasLayer} import com.intel.analytics.bigdl.nn.{Container, Graph, InitializationMethod} +import com.intel.analytics.bigdl.nn.{Sequential => TSequential} +import com.intel.analytics.bigdl.python.api.PythonBigDL +import com.intel.analytics.bigdl.tensor.Tensor import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric -import com.intel.analytics.bigdl.utils.File +import com.intel.analytics.bigdl.utils.{File, Shape} import com.intel.analytics.zoo.models.caffe.CaffeLoader import com.intel.analytics.bigdl.utils.serializer.ModuleLoader import com.intel.analytics.bigdl.utils.tf.{Session, TensorflowLoader} +import com.intel.analytics.zoo.common.Utils import com.intel.analytics.zoo.pipeline.api.autograd.Variable -import com.intel.analytics.zoo.pipeline.api.keras.layers.WordEmbedding +import com.intel.analytics.zoo.pipeline.api.keras.layers.{KerasLayerWrapper, WordEmbedding} +import com.intel.analytics.zoo.pipeline.api.keras.layers.utils.KerasUtils import com.intel.analytics.zoo.pipeline.api.keras.models.{KerasNet, Model, Sequential} import com.intel.analytics.zoo.pipeline.api.net.{GraphNet, NetUtils} +import org.apache.commons.io.FileUtils +import org.apache.log4j.Logger +import org.apache.spark.bigdl.api.python.BigDLSerDe +import org.apache.zookeeper.KeeperException.UnimplementedException import scala.reflect.ClassTag @@ -53,6 +65,23 @@ trait Net { new Variable( this.asInstanceOf[AbstractModule[Activity, Activity, T]].inputs(vars.map(_.node): _*)) } + + private[zoo] def toKeras2(dir: String): String = { + throw new UnimplementedException() + } + + /** + * Get keras-like weights. + * @tparam T + * @return + */ + private[zoo] def getKerasWeights(): Array[Tensor[Float]] = { + if (this.asInstanceOf[AbstractModule[_, _, _]].parameters()._1.length != 0) { + throw new UnimplementedException() + } else { + Array() + } + } } object Net { @@ -186,4 +215,222 @@ object Net { implicit ev: TensorNumeric[T]): Session[T] = { TensorflowLoader.checkpoints(graphFile, binFile, byteOrder) } + + private[zoo] def saveToKeras2[T: ClassTag]( + model: Net, + filePath: String, + python: String = "python")(implicit ev: TensorNumeric[T]): Unit = { + NetSaver.saveToKeras2(model.asInstanceOf[Module[T]], filePath, python) + } + + private[zoo] def saveToTf[T: ClassTag]( + model: Net, + dir: String, + python: String = "python")(implicit ev: TensorNumeric[T]): Unit = { + NetSaver.saveToTf(model.asInstanceOf[Module[T]], dir, python) + } + + private[zoo] def getName(name: String): String = { + name.split("\\.").last + } + + private[zoo] def inputShapeToString( + inputShape: Shape, + paramName: String = "inputShape"): Map[String, String] = { + if (inputShape != null) { + Map("input_shape" -> s"(${inputShape.toSingle().mkString(", ")},)") + } else { + Map() + } + } + + private[zoo] def arrayToString(array: Seq[Int], name: String): Map[String, String] = { + Map(name -> s"(${array.mkString(", ")})") + } + + private[zoo] def activationToString( + activation: AbstractModule[_, _, _], + paramName: String = "activation"): Map[String, String] = { + val trueActivation = if (activation.isInstanceOf[KerasIdentityWrapper[_]]) { + activation.asInstanceOf[KerasIdentityWrapper[_]].layer + } else { + activation + } + if (activation != null) { + Map(paramName -> s"'${KerasUtils.getActivationName(trueActivation)}'") + } else { + Map() + } + + } + + private[zoo] def param( + boolean: Boolean, + paramName: String): Map[String, String] = { + Map(paramName -> s"${if (boolean) "True" else "False"}") + } + + private[zoo] def param( + integer: Int, + paramName: String): Map[String, String] = { + Map(paramName -> integer.toString) + } + + private[zoo] def param( + double: Double, + paramName: String): Map[String, String] = { + Map(paramName -> double.toString) + } + + private[zoo] def param( + name: String, + paramName: String = "name"): Map[String, String] = { + Map(paramName -> s"'$name'") + } + + private[zoo] def kerasDef( + module: Module[_], + params: Map[String, String]): String = { + s"${Net.getName(module.getClass.getName)}(" + + params.map(v => s"${v._1}=${v._2}").mkString(", ") + ")" + } + + protected object NetSaver { + private val logger = Logger.getLogger(getClass) + + protected val header = + """ + |from tensorflow.keras.models import Sequential + |from tensorflow.keras.layers import * + |from pyspark.serializers import PickleSerializer + | + |def load_to_numpy(file): + | in_file = open(file, "rb") + | data = in_file.read() + | in_file.close() + | r=PickleSerializer().loads(data, encoding="bytes") + | return r.to_ndarray() + """.stripMargin + "\n" + + protected val tfHeader = + """ + |from zoo.util.tf import export_tf + |from tensorflow.keras import backend as K + |import tensorflow as tf + """.stripMargin + "\n" + + def save[T: ClassTag]( + m: Module[T], + path: String, + python: String, + saveCommand: String) + (implicit ev: TensorNumeric[T]): Unit = { + val tmpDir = Utils.createTmpDir("ZooKeras") + logger.info(s"Write model's temp file to ${tmpDir}") + val modelFile = tmpDir.toString + s"/${m.getName()}.py" + val bw = new BufferedWriter(new FileWriter(modelFile)) + bw.write(header) + if (m.isInstanceOf[Sequential[T]]) { + export(m.asInstanceOf[Sequential[T]], tmpDir.toString, bw) + } else { + throw new IllegalArgumentException(s"${m.getClass.getName} is not supported.") + } + bw.write(saveWeights(m, tmpDir.toString)) + bw.write(saveCommand) + bw.flush() + bw.close() + execCommand(s"${python} ${modelFile}") + FileUtils.deleteDirectory(tmpDir.toFile()) + } + + def saveToTf[T: ClassTag](m: Module[T], path: String, python: String) + (implicit ev: TensorNumeric[T]): Unit = { + val saveCommand = tfHeader + + s"export_tf(K.get_session(), '${path}', model.inputs, model.outputs)\n" + save(m, path, python, saveCommand) + } + + def saveToKeras2[T: ClassTag](m: Module[T], path: String, python: String) + (implicit ev: TensorNumeric[T]): Unit = { + save(m, path, python, s"model.save('$path')\n") + } + + def execCommand(command: String): Unit = { + val proc = Runtime.getRuntime().exec(command) + proc.waitFor() + if (proc.exitValue() != 0) { + val error = new BufferedReader(new InputStreamReader(proc.getErrorStream())) + val errorMsg = new StringBuilder() + var line = error.readLine() + while(line != null) { + errorMsg.append(line + "\n") + line = error.readLine() + } + throw new RuntimeException(s"Export Keras2 model failed:\n" + errorMsg.toString()) + } + + } + + def export[T: ClassTag]( + sequential: Sequential[T], + path: String, + writer: BufferedWriter): Unit = { + writer.write(s"${sequential.getName()} = " + + s"Sequential(name='${(sequential.getName())}')\n") + val modules = sequential.modules(0).asInstanceOf[TSequential[T]].modules + modules.foreach{ module => + if (module.isInstanceOf[Sequential[T]]) { + export(module.asInstanceOf[Sequential[T]], path, writer) + writer.write(s"${sequential.getName()}.add(${module.getName})\n") + } else if (module.isInstanceOf[Net]) { + writer.write(s"${module.getName()} = ${module.asInstanceOf[Net].toKeras2(path)}\n") + writer.write(s"${sequential.getName()}.add(${module.getName})\n") + } else { + throw new IllegalArgumentException(s"unkown type ${this.getClass.getName}") + } + } + } + + private[zoo] def saveWeights[T: ClassTag]( + module: AbstractModule[_, _, T], path: String) + (implicit ev: TensorNumeric[T]): String = { + val moduleName = module.getName() + var i = 0 + val wStrings = module.asInstanceOf[Net].getKerasWeights().map{p => + val pName = s"${moduleName}_p${i}" + val pPath = getUniqueFile(s"${path}/${pName}") + saveToJTensor(p, pPath) + i += 1 + (s"${pName} = load_to_numpy('${pPath}')", + pName) + } + val loadWeights = wStrings.map(_._1).mkString("\n") + val weightsList = wStrings.map(_._2).mkString(",") + loadWeights + "\n" + + s"${moduleName}.set_weights([${weightsList}])\n" + } + + private def getUniqueFile(path: String): JFile = { + var file = new JFile(path) + var i = 0 + while(file.exists()) { + file = new JFile(path + s".$i") + i += 1 + } + file + } + + private def saveToJTensor[T: ClassTag]( + tensor: Tensor[T], file: JFile) + (implicit ev: TensorNumeric[T]): Unit = { + val pythonBigDL = new PythonBigDL[T]() + val jt = pythonBigDL.toJTensor(tensor) + val bytes = BigDLSerDe.dumps(jt) + val fio = new FileOutputStream(file) + fio.write(bytes) + fio.flush() + fio.close() + } + + } } diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/Dense.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/Dense.scala index 03c57eda8f7..177b7c85454 100644 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/Dense.scala +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/Dense.scala @@ -59,6 +59,24 @@ class Dense[T: ClassTag]( override val inputShape: Shape = null)(implicit ev: TensorNumeric[T]) extends BigDLDense[T](outputDim, init, activation, wRegularizer, bRegularizer, bias, inputShape) with Net { + + override private[zoo] def toKeras2(dir: String): String = { + val params = Net.inputShapeToString(inputShape) ++ + Net.activationToString(activation) ++ + Net.param(getName()) ++ + Net.param(bias, "use_bias") ++ + Net.param(outputDim, "units") + Net.kerasDef(this, params) + } + + override private[zoo] def getKerasWeights(): Array[Tensor[Float]] = { + val weights = this.parameters()._1 + val kWeights = Array.tabulate(weights.length)(_ => Tensor[Float]()) + weights(0) = weights(0).t().contiguous() + weights(0).cast[Float](kWeights(0).resizeAs(weights(0))) + weights(1).cast[Float](kWeights(1).resizeAs(weights(1))) + kWeights + } } object Dense { diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/Dropout.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/Dropout.scala index 702483860cd..57b6a1429e0 100644 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/Dropout.scala +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/Dropout.scala @@ -36,7 +36,16 @@ class Dropout[T: ClassTag]( override val p: Double, override val inputShape: Shape = null) (implicit ev: TensorNumeric[T]) - extends com.intel.analytics.bigdl.nn.keras.Dropout[T](p, inputShape) with Net {} + extends com.intel.analytics.bigdl.nn.keras.Dropout[T](p, inputShape) with Net { + + override private[zoo] def toKeras2(dir: String): String = { + val params = Net.inputShapeToString(inputShape) ++ + Net.param(getName()) ++ + Net.param(p, "rate") + Net.kerasDef(this, params) + } + +} object Dropout { def apply[@specialized(Float, Double) T: ClassTag]( diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/LSTM.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/LSTM.scala index af35e9a7859..00bacfd22ee 100644 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/LSTM.scala +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/LSTM.scala @@ -78,6 +78,41 @@ class LSTM[T: ClassTag]( uRegularizer = uRegularizer, bRegularizer = bRegularizer) } + + override private[zoo] def toKeras2(dir: String): String = { + val params = Net.inputShapeToString(inputShape) ++ + Net.activationToString(activation) ++ + Net.activationToString(innerActivation, "recurrent_activation") ++ + Net.param(returnSeq, "return_sequences") ++ + Net.param(outputDimension, "units") + Net.param(getName()) + Net.kerasDef(this, params) + } + + override private[zoo] def getKerasWeights(): Array[Tensor[Float]] = { + val weights = this.parameters()._1 + val kWeights = Array.tabulate(weights.length)(_ => Tensor[Float]()) + weights(0) = weights(0).t().contiguous() + weights(2) = weights(2).t().contiguous() + weights(0).cast[Float](kWeights(0).resizeAs(weights(0))) + weights(2).cast[Float](kWeights(1).resizeAs(weights(2))) + weights(1).cast[Float](kWeights(2).resizeAs(weights(1))) + // map to keras's weight + switch(kWeights(0), 2) + switch(kWeights(1), 2) + switch(kWeights(2), 1) + + kWeights + } + + private def switch(t: Tensor[Float], dim: Int): Unit = { + val tmpWeight = t.narrow(dim, 1, outputDimension).clone() + tmpWeight.copy(t.narrow(dim, 1 + outputDimension, outputDimension)) + t.narrow(dim, 1 + outputDimension, outputDimension) + .copy(t.narrow(dim, 2 * outputDimension + 1, outputDimension)) + t.narrow(dim, 2 * outputDimension + 1, outputDimension).copy(tmpWeight) + } + } object LSTM { diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/Permute.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/Permute.scala index b0d634176f5..95572521551 100644 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/Permute.scala +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/Permute.scala @@ -40,6 +40,13 @@ class Permute[T: ClassTag]( override val inputShape: Shape = null)(implicit ev: TensorNumeric[T]) extends BigDLPermute[T]( dims, inputShape) with Net { + + override private[zoo] def toKeras2(dir: String): String = { + val params = Net.inputShapeToString(inputShape) ++ + Net.param(getName()) ++ + Net.arrayToString(dims, "dims") + Net.kerasDef(this, params) + } } object Permute { diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/Reshape.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/Reshape.scala index 96d229445f3..a1e97198c74 100644 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/Reshape.scala +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/Reshape.scala @@ -98,6 +98,13 @@ class Reshape[T: ClassTag]( } layer.asInstanceOf[AbstractModule[Tensor[T], Tensor[T], T]] } + + override private[zoo] def toKeras2(dir: String): String = { + val params = Net.inputShapeToString(inputShape) ++ + Net.param(getName()) ++ + Net.arrayToString(targetShape, "target_shape") + Net.kerasDef(this, params) + } } object Reshape { diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/utils/KerasUtils.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/utils/KerasUtils.scala index 8c98f73f8c8..098298c161b 100644 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/utils/KerasUtils.scala +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/layers/utils/KerasUtils.scala @@ -20,7 +20,7 @@ import com.intel.analytics.bigdl.Criterion import com.intel.analytics.bigdl.nn.Graph.ModuleNode import com.intel.analytics.bigdl.nn._ import com.intel.analytics.bigdl.nn.keras.{KerasIdentityWrapper, KerasLayer, KerasLayerWrapper, Sequential => KSequential, SoftMax => KSoftMax} -import com.intel.analytics.bigdl.nn.abstractnn.{AbstractModule, Activity, DataFormat} +import com.intel.analytics.bigdl.nn.abstractnn.{AbstractModule, Activity, DataFormat, TensorModule} import com.intel.analytics.bigdl.optim._ import com.intel.analytics.bigdl.tensor.Tensor import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric @@ -84,6 +84,32 @@ object KerasUtils { } } + def getActivationName[T: ClassTag](activation: AbstractModule[_, _, T]): String = { + if (activation == null) { + throw new IllegalArgumentException("activation is null") + } else { + activation match { + case _: Tanh[T] => "tanh" + case _: Sigmoid[T] => "sigmoid" + case _: ReLU[T] => "relu" + case _: com.intel.analytics.bigdl.nn.SoftMax[T] => "softmax" + case _: SoftPlus[T] => "softplus" + case _: SoftSign[T] => "softsign" + case _: HardSigmoid[T] => "hard_sigmoid" + case _: ReLU6[T] => "relu6" + case _: TanhShrink[T] => "tanh_shrink" + case _: SoftMin[T] => "softmin" + case _: LogSigmoid[T] => "log_sigmoid" + case _: LogSoftMax[T] => "log_softmax" + case _: Identity[T] => "linear" + case _: com.intel.analytics.zoo.pipeline.api.keras.layers.SoftMax[T] => "softmax" + case _ => throw new IllegalArgumentException("unkown activation" + + activation.getClass.getName) + } + } + + } + def getTorchActivation[T : ClassTag] (activation: String) (implicit ev: TensorNumeric[T]): AbstractModule[Tensor[T], Tensor[T], T] = { if (activation == null) null diff --git a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/models/Topology.scala b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/models/Topology.scala index dd360a4c584..ed4b64e259f 100644 --- a/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/models/Topology.scala +++ b/spark/dl/src/main/scala/com/intel/analytics/bigdl/dllib/zooKeras/models/Topology.scala @@ -547,6 +547,29 @@ abstract class KerasNet[T](implicit val tag: ClassTag[T], implicit val ev: Tenso def toModel(): Model[T] + + /** + * Save model to keras2 h5 file. Only for inference + * @param filePath path to save model. + * @param python python path, need analytics-zoo and tensorflow installed. + */ + def saveToKeras2[T: ClassTag]( + filePath: String, + python: String = "python")(implicit ev: TensorNumeric[T]): Unit = { + Net.saveToKeras2[T](this, filePath, python) + } + + /** + * Save model to tensorflow protobuf. Only for inference. + * @param dir directory to save model. + * @param python python path, need analytics-zoo and tensorflow installed. + */ + def saveToTf[T: ClassTag]( + dir: String, + python: String = "python")(implicit ev: TensorNumeric[T]): Unit = { + Net.saveToTf[T](this, dir, python) + } + /** * Print out the summary information of an Analytics Zoo Keras Model. * @@ -892,6 +915,19 @@ class Sequential[T: ClassTag] private () val graph = this.toModel() graph.summary(lineLength, positions) } + + override private[zoo] def getKerasWeights(): Array[Tensor[Float]] = { + val weights = new ArrayBuffer[Tensor[Float]]() + modules(0).asInstanceOf[TSequential[T]].modules.foreach(m => { + val params = m.asInstanceOf[Net].getKerasWeights() + if (params != null) { + params.foreach{p => + weights += p + } + } + }) + weights.toArray + } } object Sequential extends KerasLayerSerializable {