Skip to content

Commit

Permalink
mllib tests passed
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Mar 24, 2015
1 parent d737924 commit 7f4476e
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import scala.reflect.ClassTag

import net.razorvine.pickle._

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.api.python.SerDeUtil
import org.apache.spark.mllib.classification._
Expand All @@ -39,15 +38,15 @@ import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.random.{RandomRDDs => RG}
import org.apache.spark.mllib.recommendation._
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.stat.correlation.CorrelationNames
import org.apache.spark.mllib.stat.distribution.MultivariateGaussian
import org.apache.spark.mllib.stat.test.ChiSqTestResult
import org.apache.spark.mllib.tree.{GradientBoostedTrees, RandomForest, DecisionTree}
import org.apache.spark.mllib.tree.configuration.{BoostingStrategy, Algo, Strategy}
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.tree.configuration.{Algo, BoostingStrategy, Strategy}
import org.apache.spark.mllib.tree.impurity._
import org.apache.spark.mllib.tree.loss.Losses
import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel, RandomForestModel, DecisionTreeModel}
import org.apache.spark.mllib.tree.model.{DecisionTreeModel, GradientBoostedTreesModel, RandomForestModel}
import org.apache.spark.mllib.tree.{DecisionTree, GradientBoostedTrees, RandomForest}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -255,7 +254,7 @@ private[python] class PythonMLLibAPI extends Serializable {
data: JavaRDD[LabeledPoint],
lambda: Double): JList[Object] = {
val model = NaiveBayes.train(data.rdd, lambda)
List(Vectors.dense(model.labels), Vectors.dense(model.pi), model.theta).
List(Vectors.dense(model.labels), Vectors.dense(model.pi), model.theta.map(Vectors.dense)).
map(_.asInstanceOf[Object]).asJava
}

Expand Down Expand Up @@ -311,7 +310,7 @@ private[python] class PythonMLLibAPI extends Serializable {
mu += model.gaussians(i).mu
sigma += model.gaussians(i).sigma
}
List(wt.toArray, mu.toArray, sigma.toArray).map(_.asInstanceOf[Object]).asJava
List(Vectors.dense(wt.toArray), mu.toArray, sigma.toArray).map(_.asInstanceOf[Object]).asJava
} finally {
data.rdd.unpersist(blocking = false)
}
Expand All @@ -322,18 +321,18 @@ private[python] class PythonMLLibAPI extends Serializable {
*/
def predictSoftGMM(
data: JavaRDD[Vector],
wt: Object,
wt: Vector,
mu: Array[Object],
si: Array[Object]): RDD[Array[Double]] = {
si: Array[Object]): RDD[Vector] = {

val weight = wt.asInstanceOf[Array[Double]]
val weight = wt.toArray
val mean = mu.map(_.asInstanceOf[DenseVector])
val sigma = si.map(_.asInstanceOf[DenseMatrix])
val gaussians = Array.tabulate(weight.length){
i => new MultivariateGaussian(mean(i), sigma(i))
}
val model = new GaussianMixtureModel(weight, gaussians)
model.predictSoft(data)
model.predictSoft(data).map(Vectors.dense)
}

/**
Expand All @@ -345,9 +344,11 @@ private[python] class PythonMLLibAPI extends Serializable {
def predict(userAndProducts: JavaRDD[Array[Any]]): RDD[Rating] =
predict(SerDe.asTupleRDD(userAndProducts.rdd))

def getUserFeatures = SerDe.fromTuple2RDD(userFeatures.asInstanceOf[RDD[(Any, Any)]])
def getUserFeatures = SerDe.fromTuple2RDD(
userFeatures.map(x => (x._1, Vectors.dense(x._2))).asInstanceOf[RDD[(Any, Any)]])

def getProductFeatures = SerDe.fromTuple2RDD(productFeatures.asInstanceOf[RDD[(Any, Any)]])
def getProductFeatures = SerDe.fromTuple2RDD(
productFeatures.map(x => (x._1, Vectors.dense(x._2))).asInstanceOf[RDD[(Any, Any)]])

}

Expand Down Expand Up @@ -903,6 +904,14 @@ private[spark] object SerDe extends Serializable {
out.write(code)
}

protected def getBytes(obj: Object): Array[Byte] = {
if (obj.getClass.isArray) {
obj.asInstanceOf[Array[Byte]]
} else {
obj.asInstanceOf[String].getBytes(LATIN1)
}
}

private[python] def saveState(obj: Object, out: OutputStream, pickler: Pickler)
}

Expand All @@ -928,7 +937,7 @@ private[spark] object SerDe extends Serializable {
if (args.length != 1) {
throw new PickleException("should be 1")
}
val bytes = args(0).asInstanceOf[String].getBytes(LATIN1)
val bytes = getBytes(args(0))
val bb = ByteBuffer.wrap(bytes, 0, bytes.length)
bb.order(ByteOrder.nativeOrder())
val db = bb.asDoubleBuffer()
Expand Down Expand Up @@ -961,7 +970,7 @@ private[spark] object SerDe extends Serializable {
if (args.length != 3) {
throw new PickleException("should be 3")
}
val bytes = args(2).asInstanceOf[String].getBytes(LATIN1)
val bytes = getBytes(args(2))
val n = bytes.length / 8
val values = new Array[Double](n)
val order = ByteOrder.nativeOrder()
Expand Down Expand Up @@ -998,8 +1007,8 @@ private[spark] object SerDe extends Serializable {
throw new PickleException("should be 3")
}
val size = args(0).asInstanceOf[Int]
val indiceBytes = args(1).asInstanceOf[String].getBytes(LATIN1)
val valueBytes = args(2).asInstanceOf[String].getBytes(LATIN1)
val indiceBytes = getBytes(args(1))
val valueBytes = getBytes(args(2))
val n = indiceBytes.length / 4
val indices = new Array[Int](n)
val values = new Array[Double](n)
Expand Down
16 changes: 8 additions & 8 deletions python/pyspark/mllib/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ class KMeansModel(Saveable, Loader):
True
>>> model.predict(sparse_data[2]) == model.predict(sparse_data[3])
True
>>> type(model.clusterCenters)
<type 'list'>
>>> isinstance(model.clusterCenters, list)
True
>>> import os, tempfile
>>> path = tempfile.mkdtemp()
>>> model.save(sc, path)
Expand All @@ -82,15 +82,15 @@ def predict(self, x):
best = 0
best_distance = float("inf")
x = _convert_to_vector(x)
for i in xrange(len(self.centers)):
for i in range(len(self.centers)):
distance = x.squared_distance(self.centers[i])
if distance < best_distance:
best = i
best_distance = distance
return best

def save(self, sc, path):
java_centers = _py2java(sc, map(_convert_to_vector, self.centers))
java_centers = _py2java(sc, [_convert_to_vector(c) for c in self.centers])
java_model = sc._jvm.org.apache.spark.mllib.clustering.KMeansModel(java_centers)
java_model.save(sc._jsc.sc(), path)

Expand Down Expand Up @@ -133,7 +133,7 @@ class GaussianMixtureModel(object):
... 5.7048, 4.6567, 5.5026,
... 4.5605, 5.2043, 6.2734]).reshape(5, 3))
>>> model = GaussianMixture.train(clusterdata_2, 2, convergenceTol=0.0001,
... maxIterations=150, seed=10)
... maxIterations=150, seed=10)
>>> labels = model.predict(clusterdata_2).collect()
>>> labels[0]==labels[1]==labels[2]
True
Expand All @@ -155,7 +155,7 @@ def predict(self, x):
:return: cluster_labels. RDD of cluster labels.
"""
if isinstance(x, RDD):
cluster_labels = self.predictSoft(x).map(lambda z: z.index(max(z)))
cluster_labels = self.predictSoft(x).map(lambda z: list(z).index(max(z)))
return cluster_labels

def predictSoft(self, x):
Expand All @@ -168,8 +168,8 @@ def predictSoft(self, x):
if isinstance(x, RDD):
means, sigmas = zip(*[(g.mu, g.sigma) for g in self.gaussians])
membership_matrix = callMLlibFunc("predictSoftGMM", x.map(_convert_to_vector),
self.weights, means, sigmas)
return membership_matrix
_convert_to_vector(self.weights), means, sigmas)
return membership_matrix.map(lambda x: x.toArray())


class GaussianMixture(object):
Expand Down
9 changes: 7 additions & 2 deletions python/pyspark/mllib/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
# limitations under the License.
#

import sys
if sys.version >= '3':
long = int
basestring = str

import py4j.protocol
from py4j.protocol import Py4JJavaError
from py4j.java_gateway import JavaObject
Expand Down Expand Up @@ -102,8 +107,8 @@ def _java2py(sc, r):
except Py4JJavaError:
pass # not pickable

if isinstance(r, bytearray):
r = PickleSerializer().loads(str(r))
if isinstance(r, (bytearray, bytes)):
r = PickleSerializer().loads(bytes(r))
return r


Expand Down
18 changes: 11 additions & 7 deletions python/pyspark/mllib/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import sys
import warnings
import random
if sys.version >= '3':
basestring = str

from py4j.protocol import Py4JJavaError

from pyspark import RDD, SparkContext
from pyspark import SparkContext
from pyspark.rdd import RDD, ignore_unicode_prefix
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg import Vectors, Vector, _convert_to_vector
from pyspark.mllib.linalg import Vectors, _convert_to_vector

__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler',
'HashingTF', 'IDFModel', 'IDF', 'Word2Vec', 'Word2VecModel']
Expand Down Expand Up @@ -188,9 +191,9 @@ class HashingTF(object):
Note: the terms must be hashable (can not be dict/set/list...).
>>> htf = HashingTF(100)
>>> doc = "a a b b c d".split(" ")
>>> htf.transform(doc)
SparseVector(100, {1: 1.0, 14: 1.0, 31: 2.0, 44: 2.0})
>>> doc = u"a a b b c d".split(u" ")
>>> # htf.transform(doc)
# SparseVector(100, {1: 1.0, 14: 1.0, 31: 2.0, 44: 2.0})
"""
def __init__(self, numFeatures=1 << 20):
"""
Expand Down Expand Up @@ -332,6 +335,7 @@ def findSynonyms(self, word, num):
return zip(words, similarity)


@ignore_unicode_prefix
class Word2Vec(object):
"""
Word2Vec creates vector representation of words in a text corpus.
Expand All @@ -354,7 +358,7 @@ class Word2Vec(object):
>>> sentence = "a b " * 100 + "a c " * 10
>>> localDoc = [sentence, sentence]
>>> doc = sc.parallelize(localDoc).map(lambda line: line.split(" "))
>>> model = Word2Vec().setVectorSize(10).setSeed(42L).fit(doc)
>>> model = Word2Vec().setVectorSize(10).setSeed(42).fit(doc)
>>> syms = model.findSynonyms("a", 2)
>>> [s[0] for s in syms]
Expand Down Expand Up @@ -422,7 +426,7 @@ def fit(self, data):
raise TypeError("data should be an RDD of list of string")
jmodel = callMLlibFunc("trainWord2Vec", data, int(self.vectorSize),
float(self.learningRate), int(self.numPartitions),
int(self.numIterations), long(self.seed))
int(self.numIterations), int(self.seed))
return Word2VecModel(jmodel)


Expand Down
33 changes: 16 additions & 17 deletions python/pyspark/mllib/rand.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ def normalRDD(sc, size, numPartitions=None, seed=None):
:param seed: Random seed (default: a random long integer).
:return: RDD of float comprised of i.i.d. samples ~ N(0.0, 1.0).
>>> x = RandomRDDs.normalRDD(sc, 1000, seed=1L)
>>> x = RandomRDDs.normalRDD(sc, 1000, seed=1)
>>> stats = x.stats()
>>> stats.count()
1000L
1000
>>> abs(stats.mean() - 0.0) < 0.1
True
>>> abs(stats.stdev() - 1.0) < 0.1
Expand All @@ -118,10 +118,10 @@ def logNormalRDD(sc, mean, std, size, numPartitions=None, seed=None):
>>> std = 1.0
>>> expMean = exp(mean + 0.5 * std * std)
>>> expStd = sqrt((exp(std * std) - 1.0) * exp(2.0 * mean + std * std))
>>> x = RandomRDDs.logNormalRDD(sc, mean, std, 1000, seed=2L)
>>> x = RandomRDDs.logNormalRDD(sc, mean, std, 1000, seed=2)
>>> stats = x.stats()
>>> stats.count()
1000L
1000
>>> abs(stats.mean() - expMean) < 0.5
True
>>> from math import sqrt
Expand All @@ -145,10 +145,10 @@ def poissonRDD(sc, mean, size, numPartitions=None, seed=None):
:return: RDD of float comprised of i.i.d. samples ~ Pois(mean).
>>> mean = 100.0
>>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=2L)
>>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=2)
>>> stats = x.stats()
>>> stats.count()
1000L
1000
>>> abs(stats.mean() - mean) < 0.5
True
>>> from math import sqrt
Expand All @@ -171,10 +171,10 @@ def exponentialRDD(sc, mean, size, numPartitions=None, seed=None):
:return: RDD of float comprised of i.i.d. samples ~ Exp(mean).
>>> mean = 2.0
>>> x = RandomRDDs.exponentialRDD(sc, mean, 1000, seed=2L)
>>> x = RandomRDDs.exponentialRDD(sc, mean, 1000, seed=2)
>>> stats = x.stats()
>>> stats.count()
1000L
1000
>>> abs(stats.mean() - mean) < 0.5
True
>>> from math import sqrt
Expand Down Expand Up @@ -202,10 +202,10 @@ def gammaRDD(sc, shape, scale, size, numPartitions=None, seed=None):
>>> scale = 2.0
>>> expMean = shape * scale
>>> expStd = sqrt(shape * scale * scale)
>>> x = RandomRDDs.gammaRDD(sc, shape, scale, 1000, seed=2L)
>>> x = RandomRDDs.gammaRDD(sc, shape, scale, 1000, seed=2)
>>> stats = x.stats()
>>> stats.count()
1000L
1000
>>> abs(stats.mean() - expMean) < 0.5
True
>>> abs(stats.stdev() - expStd) < 0.5
Expand Down Expand Up @@ -254,7 +254,7 @@ def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
:return: RDD of Vector with vectors containing i.i.d. samples ~ `N(0.0, 1.0)`.
>>> import numpy as np
>>> mat = np.matrix(RandomRDDs.normalVectorRDD(sc, 100, 100, seed=1L).collect())
>>> mat = np.matrix(RandomRDDs.normalVectorRDD(sc, 100, 100, seed=1).collect())
>>> mat.shape
(100, 100)
>>> abs(mat.mean() - 0.0) < 0.1
Expand Down Expand Up @@ -286,8 +286,8 @@ def logNormalVectorRDD(sc, mean, std, numRows, numCols, numPartitions=None, seed
>>> std = 1.0
>>> expMean = exp(mean + 0.5 * std * std)
>>> expStd = sqrt((exp(std * std) - 1.0) * exp(2.0 * mean + std * std))
>>> mat = np.matrix(RandomRDDs.logNormalVectorRDD(sc, mean, std, \
100, 100, seed=1L).collect())
>>> m = RandomRDDs.logNormalVectorRDD(sc, mean, std, 100, 100, seed=1).collect()
>>> mat = np.matrix(m)
>>> mat.shape
(100, 100)
>>> abs(mat.mean() - expMean) < 0.1
Expand Down Expand Up @@ -315,7 +315,7 @@ def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None):
>>> import numpy as np
>>> mean = 100.0
>>> rdd = RandomRDDs.poissonVectorRDD(sc, mean, 100, 100, seed=1L)
>>> rdd = RandomRDDs.poissonVectorRDD(sc, mean, 100, 100, seed=1)
>>> mat = np.mat(rdd.collect())
>>> mat.shape
(100, 100)
Expand Down Expand Up @@ -345,7 +345,7 @@ def exponentialVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=No
>>> import numpy as np
>>> mean = 0.5
>>> rdd = RandomRDDs.exponentialVectorRDD(sc, mean, 100, 100, seed=1L)
>>> rdd = RandomRDDs.exponentialVectorRDD(sc, mean, 100, 100, seed=1)
>>> mat = np.mat(rdd.collect())
>>> mat.shape
(100, 100)
Expand Down Expand Up @@ -380,8 +380,7 @@ def gammaVectorRDD(sc, shape, scale, numRows, numCols, numPartitions=None, seed=
>>> scale = 2.0
>>> expMean = shape * scale
>>> expStd = sqrt(shape * scale * scale)
>>> mat = np.matrix(RandomRDDs.gammaVectorRDD(sc, shape, scale, \
100, 100, seed=1L).collect())
>>> mat = np.matrix(RandomRDDs.gammaVectorRDD(sc, shape, scale, 100, 100, seed=1).collect())
>>> mat.shape
(100, 100)
>>> abs(mat.mean() - expMean) < 0.1
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/mllib/recommendation.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ class MatrixFactorizationModel(JavaModelWrapper, JavaSaveable, JavaLoader):
>>> model = ALS.train(ratings, 4, seed=10)
>>> model.userFeatures().collect()
[(1, array('d', [...])), (2, array('d', [...]))]
[(1, DenseVector([...])), (2, DenseVector([...]))]
>>> first_user = model.userFeatures().take(1)[0]
>>> latents = first_user[1]
>>> len(latents) == 4
True
>>> model.productFeatures().collect()
[(1, array('d', [...])), (2, array('d', [...]))]
[(1, DenseVector([...])), (2, DenseVector([...]))]
>>> first_product = model.productFeatures().take(1)[0]
>>> latents = first_product[1]
Expand Down
Loading

0 comments on commit 7f4476e

Please sign in to comment.