Skip to content

Commit

Permalink
support multi input models for nnframes (#1553)
Browse files Browse the repository at this point in the history
* support multi input for nnframes

* update ut

* add doc and unit test

* doc update

* scala style
  • Loading branch information
hhbyyh committed Aug 20, 2019
1 parent dc167f3 commit 3ae79ea
Show file tree
Hide file tree
Showing 11 changed files with 406 additions and 23 deletions.
68 changes: 65 additions & 3 deletions docs/docs/APIGuide/PipelineAPI/nnframes.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,15 @@ More concrete examples are available in package `com.intel.analytics.zoo.example
type are supported) and convert each feature/label to Tensor according to the specified Tensor
size.

**3.** `NNEstimator(model, criterion, featurePreprocessing: Preprocessing[F, Tensor[T]],
**3.** `NNEstimator(model, criterion, featureSize: Array[Array[Int]], labelSize: Array[Int])`

This is the interface for multi-input model. It takes model, criterion, featureSize(Array of
Int Array) and labelSize(Array of Int). `NNEstimator`
will extract the data from feature and label columns (only Scalar, Array[_] or Vector data
type are supported) and convert each feature/label to Tensor according to the specified Tensor
size.

**4.** `NNEstimator(model, criterion, featurePreprocessing: Preprocessing[F, Tensor[T]],
labelPreprocessing: Preprocessing[F, Tensor[T]])`

Takes model, criterion, featurePreprocessing and labelPreprocessing. `NNEstimator`
Expand Down Expand Up @@ -109,6 +117,57 @@ estimator = NNEstimator(model, criterion, SeqToTensor([2]), ArrayToTensor([2]))\
nnModel = estimator.fit(df)
res = nnModel.transform(df)
```

***Example with multi-inputs Model.***
This example trains a model with 3 inputs. And users can
use VectorAssembler from Spark MLlib to combine different fields. With the specified sizes for
each model input, NNEstiamtor and NNClassifer will split the input features data and send
tensors to corresponding inputs.
```python
sparkConf = init_spark_conf().setAppName("testNNClassifer").setMaster('local[1]')
sc = init_nncontext(sparkConf)
spark = SparkSession\
.builder\
.getOrCreate()

df = spark.createDataFrame(
[(1, 35, 109.0, Vectors.dense([2.0, 5.0, 0.5, 0.5]), 1.0),
(2, 58, 2998.0, Vectors.dense([4.0, 10.0, 0.5, 0.5]), 2.0),
(3, 18, 123.0, Vectors.dense([3.0, 15.0, 0.5, 0.5]), 1.0)],
["user", "age", "income", "history", "label"])

assembler = VectorAssembler(
inputCols=["user", "age", "income", "history"],
outputCol="features")

df = assembler.transform(df)

x1 = ZLayer.Input(shape=(1,))
x2 = ZLayer.Input(shape=(2,))
x3 = ZLayer.Input(shape=(2, 2,))

user_embedding = ZLayer.Embedding(5, 10)(x1)
flatten = ZLayer.Flatten()(user_embedding)
dense1 = ZLayer.Dense(2)(x2)
gru = ZLayer.LSTM(4, input_shape=(2, 2))(x3)

merged = ZLayer.merge([flatten, dense1, gru], mode="concat")
zy = ZLayer.Dense(2)(merged)

zmodel = ZModel([x1, x2, x3], zy)
criterion = ClassNLLCriterion()
classifier = NNClassifier(zmodel, criterion, [[1], [2], [2, 2]]) \
.setOptimMethod(Adam()) \
.setLearningRate(0.1)\
.setBatchSize(2) \
.setMaxEpoch(10)

nnClassifierModel = classifier.fit(df)
print(nnClassifierModel.getBatchSize())
res = nnClassifierModel.transform(df).collect()

```

---

## NNModel
Expand Down Expand Up @@ -141,7 +200,8 @@ and use it as a transformer in your Spark ML pipeline to predict the results for

Takes model and featureSize(Array of Int). `NNModel` will extract the data from feature
column (only Scalar, Array[_] or Vector data type are supported) and convert each feature
to Tensor according to the specified Tensor size.
to Tensor according to the specified Tensor size. User can also set featureSize as
Array[Array[Int]] for multi-inputs model.

**3.** `NNModel(model, featurePreprocessing: Preprocessing[F, Tensor[T]])`

Expand Down Expand Up @@ -189,6 +249,7 @@ DoubleType.
Takes model, criterion, featureSize(Array of Int). `NNClassifier`
will extract the data from feature and label columns and convert each feature to Tensor
according to the specified Tensor size. `ScalarToTensor` is used to convert the label column.
User can also set featureSize as Array[Array[Int]] for multi-inputs model.

**3.** `NNClassifier(model, criterion, featurePreprocessing: Preprocessing[F, Tensor[T]])`

Expand Down Expand Up @@ -276,7 +337,8 @@ Both label and prediction column will have the datatype of Double.

Takes model and featureSize(Array of Int). `NNClassifierModel` will extract the data from feature
column (only Scalar, Array[_] or Vector data type are supported) and convert each feature
to Tensor according to the specified Tensor size.
to Tensor according to the specified Tensor size. User can also set featureSize as
Array[Array[Int]] for multi-inputs model.

**3.** `NNClassifierModel(model, featurePreprocessing: Preprocessing[F, Tensor[T]])`

Expand Down
62 changes: 62 additions & 0 deletions pyzoo/test/zoo/pipeline/nnframes/test_nn_classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from zoo.common.nncontext import *
from zoo.pipeline.nnframes import *
from zoo.pipeline.api.keras.optimizers import Adam as KAdam
from zoo.pipeline.api.keras import layers as ZLayer
from zoo.pipeline.api.keras.models import Model as ZModel
from zoo.feature.common import *
from zoo.feature.image import *
from zoo.util.tf import *
Expand Down Expand Up @@ -311,6 +313,66 @@ def test_NNEstimator_checkpoint(self):
if exc.errno != errno.ENOENT: # ENOENT - no such file or directory
raise # re-raise exception

def test_NNEstimator_multi_input(self):
zx1 = ZLayer.Input(shape=(1, ))
zx2 = ZLayer.Input(shape=(1, ))
zz = ZLayer.merge([zx1, zx2], mode="concat")
zy = ZLayer.Dense(2)(zz)
zmodel = ZModel([zx1, zx2], zy)

criterion = MSECriterion()
df = self.get_estimator_df()
estimator = NNEstimator(zmodel, criterion, [[1], [1]]).setMaxEpoch(5) \
.setBatchSize(4)
nnmodel = estimator.fit(df)
nnmodel.transform(df).collect()

def test_NNEstimator_works_with_VectorAssembler_multi_input(self):
if self.sc.version.startswith("2"):
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.getOrCreate()

df = spark.createDataFrame(
[(1, 35, 109.0, Vectors.dense([2.0, 5.0, 0.5, 0.5]), 1.0),
(2, 58, 2998.0, Vectors.dense([4.0, 10.0, 0.5, 0.5]), 2.0),
(3, 18, 123.0, Vectors.dense([3.0, 15.0, 0.5, 0.5]), 1.0)],
["user", "age", "income", "history", "label"])

assembler = VectorAssembler(
inputCols=["user", "age", "income", "history"],
outputCol="features")

df = assembler.transform(df)

x1 = ZLayer.Input(shape=(1,))
x2 = ZLayer.Input(shape=(2,))
x3 = ZLayer.Input(shape=(2, 2,))

user_embedding = ZLayer.Embedding(5, 10)(x1)
flatten = ZLayer.Flatten()(user_embedding)
dense1 = ZLayer.Dense(2)(x2)
gru = ZLayer.LSTM(4, input_shape=(2, 2))(x3)

merged = ZLayer.merge([flatten, dense1, gru], mode="concat")
zy = ZLayer.Dense(2)(merged)

zmodel = ZModel([x1, x2, x3], zy)
criterion = ClassNLLCriterion()
classifier = NNClassifier(zmodel, criterion, [[1], [2], [2, 2]]) \
.setOptimMethod(Adam()) \
.setLearningRate(0.1) \
.setBatchSize(2) \
.setMaxEpoch(10)

nnClassifierModel = classifier.fit(df)
print(nnClassifierModel.getBatchSize())
res = nnClassifierModel.transform(df).collect()

def test_NNModel_transform_with_nonDefault_featureCol(self):
model = Sequential().add(Linear(2, 2))
nnModel = NNModel(model, SeqToTensor([2]))\
Expand Down
9 changes: 9 additions & 0 deletions pyzoo/zoo/feature/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,15 @@ def __init__(self, size=[], bigdl_type="float"):
super(SeqToTensor, self).__init__(bigdl_type, size)


class SeqToMultipleTensors(Preprocessing):
"""
a Transformer that converts an Array[_] or Seq[_] or ML Vector to several tensors.
:param size, list of int list, dimensions of target Tensors, e.g. [[2],[4]]
"""
def __init__(self, size=[], bigdl_type="float"):
super(SeqToMultipleTensors, self).__init__(bigdl_type, size)


class ArrayToTensor(Preprocessing):
"""
a Transformer that converts an Array[_] to a Tensor.
Expand Down
12 changes: 8 additions & 4 deletions pyzoo/zoo/pipeline/nnframes/nn_classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,10 @@ def __init__(self, model, criterion,
label_preprocessing = SeqToTensor()

if type(feature_preprocessing) is list:
assert(all(isinstance(x, int) for x in feature_preprocessing))
feature_preprocessing = SeqToTensor(feature_preprocessing)
if type(feature_preprocessing[0]) is list:
feature_preprocessing = SeqToMultipleTensors(feature_preprocessing)
elif isinstance(feature_preprocessing[0], int):
feature_preprocessing = SeqToTensor(feature_preprocessing)

if type(label_preprocessing) is list:
assert(all(isinstance(x, int) for x in label_preprocessing))
Expand Down Expand Up @@ -461,8 +463,10 @@ def __init__(self, model, feature_preprocessing=None, jvalue=None, bigdl_type="f
feature_preprocessing = SeqToTensor()

if type(feature_preprocessing) is list:
assert(all(isinstance(x, int) for x in feature_preprocessing))
feature_preprocessing = SeqToTensor(feature_preprocessing)
if type(feature_preprocessing[0]) is list:
feature_preprocessing = SeqToMultipleTensors(feature_preprocessing)
elif isinstance(feature_preprocessing[0], int):
feature_preprocessing = SeqToTensor(feature_preprocessing)

sample_preprocessing = ChainedPreprocessing([feature_preprocessing, TensorToSample()])
self.value = callBigDlFunc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,44 @@ import scala.reflect.ClassTag
* @tparam F data type from feature column, E.g. Array[_] or Vector
* @tparam L data type from label column, E.g. Float, Double, Array[_] or Vector
*/
class FeatureLabelPreprocessing[F, L, T: ClassTag](
featureStep: Preprocessing[F, Tensor[T]],
labelStep: Preprocessing[L, Tensor[T]])(implicit ev: TensorNumeric[T])
extends Preprocessing[(F, Option[L]), Sample[T]] {
class FeatureLabelPreprocessing[F, X, L, T: ClassTag] private[zoo] (
featureStep: Preprocessing[F, X],
labelStep: Preprocessing[L, Tensor[T]]
)(implicit ev: TensorNumeric[T]) extends Preprocessing[(F, Option[L]), Sample[T]] {

override def apply(prev: Iterator[(F, Option[L])]): Iterator[Sample[T]] = {
prev.map { case (feature, label ) =>
val featureTensor = featureStep(Iterator(feature)).next()
label match {
case Some(l) =>
val labelTensor = labelStep(Iterator(l)).next()
Sample[T](featureTensor, labelTensor)
case None =>
Sample[T](featureTensor)
val featureTensors = featureStep(Iterator(feature)).next()
featureTensors match {
case ft: Tensor[T] =>
val ft = featureTensors.asInstanceOf[Tensor[T]]
label match {
case Some(l) =>
val labelTensor = labelStep(Iterator(l)).next()
Sample[T](ft, labelTensor)
case None =>
Sample[T](ft)
}
case fat: Array[Tensor[T]] =>
label match {
case Some(l) =>
val labelTensor = labelStep(Iterator(l)).next()
Sample[T](fat, labelTensor)
case None =>
Sample[T](fat)
}
case _ =>
throw new UnsupportedOperationException(
s"FeatureLabelPreprocessing expects table or tensor, but got $featureTensors")
}
}
}
}

object FeatureLabelPreprocessing {
def apply[F, L, T: ClassTag](
featureStep: Preprocessing[F, Tensor[T]],
def apply[F, X, L, T: ClassTag](
featureStep: Preprocessing[F, X],
labelStep: Preprocessing[L, Tensor[T]]
)(implicit ev: TensorNumeric[T]): FeatureLabelPreprocessing[F, L, T] =
)(implicit ev: TensorNumeric[T]): FeatureLabelPreprocessing[F, X, L, T] =
new FeatureLabelPreprocessing(featureStep, labelStep)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2018 Analytics Zoo Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.intel.analytics.zoo.feature.common

import com.intel.analytics.bigdl.dataset.Sample
import com.intel.analytics.bigdl.tensor.Tensor
import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric

import scala.reflect.ClassTag

/**
* a Preprocessing that converts multiple tensors to Sample.
*/
class MultiTensorsToSample[T: ClassTag]()(implicit ev: TensorNumeric[T])
extends Preprocessing[Array[Tensor[T]], Sample[T]] {

override def apply(prev: Iterator[Array[Tensor[T]]]): Iterator[Sample[T]] = {
prev.map(Sample(_))
}
}

object MultiTensorsToSample {
def apply[F, T: ClassTag]()(implicit ev: TensorNumeric[T]): MultiTensorsToSample[T] =
new MultiTensorsToSample()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2018 Analytics Zoo Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.intel.analytics.zoo.feature.common

import com.intel.analytics.bigdl.tensor.Tensor
import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric

import scala.reflect.ClassTag

/**
* a Preprocessing that converts Array[Float], Array[Double] or MLlib Vector to multiple tensors
* for multi-input models
* @param multiSizes dimensions of target Tensors.
*/
class SeqToMultipleTensors[T: ClassTag](multiSizes: Array[Array[Int]])
(implicit ev: TensorNumeric[T]) extends Preprocessing[Any, Array[Tensor[T]]] {

override def apply(prev: Iterator[Any]): Iterator[Array[Tensor[T]]] = {
prev.map { f =>
val tensors = f match {
case sd: Seq[Any] => matchSeq(sd)
case _ => throw new IllegalArgumentException("SeqToTensor only supports Float, Double, " +
s"Array[Float], Array[Double] or MLlib Vector but got $f")
}
tensors
}
}

def matchSeq(list: Seq[Any]): Array[Tensor[T]] = {
val rawData = list.head match {
case dd: Double => list.asInstanceOf[Seq[Double]].map(ev.fromType(_)).toArray
case ff: Float => list.asInstanceOf[Seq[Float]].map(ev.fromType(_)).toArray
case ii: Int => list.asInstanceOf[Seq[Int]].map(ev.fromType(_)).toArray
case _ => throw new IllegalArgumentException(s"SeqToTensor only supports Array[Int], " +
s"Array[Float] and Array[Double] for ArrayType, but got $list")
}

require(multiSizes.map(s => s.product).sum == rawData.length, s"feature columns length " +
s"${rawData.length} does not match with the sum of tensors" +
s" ${multiSizes.map(a => a.mkString(",")).mkString("\n")}")

var cur = 0
val tensors = multiSizes.map { size =>
val rawLength = size.product
val t = Tensor(rawData.slice(cur, cur + rawLength), size)
cur += rawLength
t
}
tensors
}
}


object SeqToMultipleTensors {
def apply[T: ClassTag](
multiSizes: Array[Array[Int]]
)(implicit ev: TensorNumeric[T]): SeqToMultipleTensors[T] =
new SeqToMultipleTensors[T](multiSizes)
}
Loading

0 comments on commit 3ae79ea

Please sign in to comment.