Skip to content

Commit

Permalink
pass ml tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Mar 24, 2015
1 parent 375ea17 commit d737924
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 37 deletions.
4 changes: 2 additions & 2 deletions python/pyspark/ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti
>>> lr = LogisticRegression(maxIter=5, regParam=0.01)
>>> model = lr.fit(df)
>>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0))]).toDF()
>>> print model.transform(test0).head().prediction
>>> model.transform(test0).head().prediction
0.0
>>> test1 = sc.parallelize([Row(features=Vectors.sparse(1, [0], [1.0]))]).toDF()
>>> print model.transform(test1).head().prediction
>>> model.transform(test1).head().prediction
1.0
>>> lr.setParams("vector")
Traceback (most recent call last):
Expand Down
22 changes: 12 additions & 10 deletions python/pyspark/ml/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.
#

from pyspark.rdd import ignore_unicode_prefix
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, HasNumFeatures
from pyspark.ml.util import keyword_only
from pyspark.ml.wrapper import JavaTransformer
Expand All @@ -24,6 +25,7 @@


@inherit_doc
@ignore_unicode_prefix
class Tokenizer(JavaTransformer, HasInputCol, HasOutputCol):
"""
A tokenizer that converts the input string to lowercase and then
Expand All @@ -32,15 +34,15 @@ class Tokenizer(JavaTransformer, HasInputCol, HasOutputCol):
>>> from pyspark.sql import Row
>>> df = sc.parallelize([Row(text="a b c")]).toDF()
>>> tokenizer = Tokenizer(inputCol="text", outputCol="words")
>>> print tokenizer.transform(df).head()
>>> tokenizer.transform(df).head()
Row(text=u'a b c', words=[u'a', u'b', u'c'])
>>> # Change a parameter.
>>> print tokenizer.setParams(outputCol="tokens").transform(df).head()
>>> tokenizer.setParams(outputCol="tokens").transform(df).head()
Row(text=u'a b c', tokens=[u'a', u'b', u'c'])
>>> # Temporarily modify a parameter.
>>> print tokenizer.transform(df, {tokenizer.outputCol: "words"}).head()
>>> tokenizer.transform(df, {tokenizer.outputCol: "words"}).head()
Row(text=u'a b c', words=[u'a', u'b', u'c'])
>>> print tokenizer.transform(df).head()
>>> tokenizer.transform(df).head()
Row(text=u'a b c', tokens=[u'a', u'b', u'c'])
>>> # Must use keyword arguments to specify params.
>>> tokenizer.setParams("text")
Expand Down Expand Up @@ -79,13 +81,13 @@ class HashingTF(JavaTransformer, HasInputCol, HasOutputCol, HasNumFeatures):
>>> from pyspark.sql import Row
>>> df = sc.parallelize([Row(words=["a", "b", "c"])]).toDF()
>>> hashingTF = HashingTF(numFeatures=10, inputCol="words", outputCol="features")
>>> print hashingTF.transform(df).head().features
(10,[7,8,9],[1.0,1.0,1.0])
>>> print hashingTF.setParams(outputCol="freqs").transform(df).head().freqs
(10,[7,8,9],[1.0,1.0,1.0])
>>> hashingTF.transform(df).head().features
SparseVector(10, {7: 1.0, 8: 1.0, 9: 1.0})
>>> hashingTF.setParams(outputCol="freqs").transform(df).head().freqs
SparseVector(10, {7: 1.0, 8: 1.0, 9: 1.0})
>>> params = {hashingTF.numFeatures: 5, hashingTF.outputCol: "vector"}
>>> print hashingTF.transform(df, params).head().vector
(5,[2,3,4],[1.0,1.0,1.0])
>>> hashingTF.transform(df, params).head().vector
SparseVector(5, {2: 1.0, 3: 1.0, 4: 1.0})
"""

_java_class = "org.apache.spark.ml.feature.HashingTF"
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/mllib/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

def _new_smart_decode(obj):
if isinstance(obj, float):
s = unicode(obj)
s = str(obj)
return _float_str_mapping.get(s, s)
return _old_smart_decode(obj)

Expand Down
46 changes: 27 additions & 19 deletions python/pyspark/mllib/linalg.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,18 @@
object from MLlib or pass SciPy C{scipy.sparse} column vectors if
SciPy is available in their environment.
"""
from __future__ import print_function

import sys
import array
import copy_reg
try:
import copy_reg
except ImportError:
import copyreg as copy_reg

if sys.version >= '3':
basestring = str
xrange = range

import numpy as np

Expand Down Expand Up @@ -57,7 +65,7 @@ def fast_pickle_array(ar):
def _convert_to_vector(l):
if isinstance(l, Vector):
return l
elif type(l) in (array.array, np.array, np.ndarray, list, tuple):
elif type(l) in (array.array, np.array, np.ndarray, list, tuple, xrange):
return DenseVector(l)
elif _have_scipy and scipy.sparse.issparse(l):
assert l.shape[1] == 1, "Expected column vector"
Expand Down Expand Up @@ -88,7 +96,7 @@ def _vector_size(v):
"""
if isinstance(v, Vector):
return len(v)
elif type(v) in (array.array, list, tuple):
elif type(v) in (array.array, list, tuple, xrange):
return len(v)
elif type(v) == np.ndarray:
if v.ndim == 1 or (v.ndim == 2 and v.shape[1] == 1):
Expand Down Expand Up @@ -176,7 +184,7 @@ class DenseVector(Vector):
A dense vector represented by a value array.
"""
def __init__(self, ar):
if isinstance(ar, basestring):
if isinstance(ar, bytes):
ar = np.frombuffer(ar, dtype=np.float64)
elif not isinstance(ar, np.ndarray):
ar = np.array(ar, dtype=np.float64)
Expand Down Expand Up @@ -308,12 +316,12 @@ def __init__(self, size, *args):
:param args: Non-zero entries, as a dictionary, list of tupes,
or two sorted lists containing indices and values.
>>> print SparseVector(4, {1: 1.0, 3: 5.5})
(4,[1,3],[1.0,5.5])
>>> print SparseVector(4, [(1, 1.0), (3, 5.5)])
(4,[1,3],[1.0,5.5])
>>> print SparseVector(4, [1, 3], [1.0, 5.5])
(4,[1,3],[1.0,5.5])
>>> SparseVector(4, {1: 1.0, 3: 5.5})
SparseVector(4, {1: 1.0, 3: 5.5})
>>> SparseVector(4, [(1, 1.0), (3, 5.5)])
SparseVector(4, {1: 1.0, 3: 5.5})
>>> SparseVector(4, [1, 3], [1.0, 5.5])
SparseVector(4, {1: 1.0, 3: 5.5})
"""
self.size = int(size)
assert 1 <= len(args) <= 2, "must pass either 2 or 3 arguments"
Expand All @@ -325,8 +333,8 @@ def __init__(self, size, *args):
self.indices = np.array([p[0] for p in pairs], dtype=np.int32)
self.values = np.array([p[1] for p in pairs], dtype=np.float64)
else:
if isinstance(args[0], basestring):
assert isinstance(args[1], str), "values should be string too"
if isinstance(args[0], bytes):
assert isinstance(args[1], bytes), "values should be string too"
if args[0]:
self.indices = np.frombuffer(args[0], np.int32)
self.values = np.frombuffer(args[1], np.float64)
Expand Down Expand Up @@ -555,12 +563,12 @@ def sparse(size, *args):
:param args: Non-zero entries, as a dictionary, list of tupes,
or two sorted lists containing indices and values.
>>> print Vectors.sparse(4, {1: 1.0, 3: 5.5})
(4,[1,3],[1.0,5.5])
>>> print Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
(4,[1,3],[1.0,5.5])
>>> print Vectors.sparse(4, [1, 3], [1.0, 5.5])
(4,[1,3],[1.0,5.5])
>>> Vectors.sparse(4, {1: 1.0, 3: 5.5})
SparseVector(4, {1: 1.0, 3: 5.5})
>>> Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
SparseVector(4, {1: 1.0, 3: 5.5})
>>> Vectors.sparse(4, [1, 3], [1.0, 5.5])
SparseVector(4, {1: 1.0, 3: 5.5})
"""
return SparseVector(size, *args)

Expand Down Expand Up @@ -611,7 +619,7 @@ class DenseMatrix(Matrix):
"""
def __init__(self, numRows, numCols, values):
Matrix.__init__(self, numRows, numCols)
if isinstance(values, basestring):
if isinstance(values, bytes):
values = np.frombuffer(values, dtype=np.float64)
elif not isinstance(values, np.ndarray):
values = np.array(values, dtype=np.float64)
Expand Down
9 changes: 7 additions & 2 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@
import itertools
try:
import cPickle as pickle
protocol = 2
except ImportError:
import pickle
protocol = 3

from pyspark import cloudpickle

Expand Down Expand Up @@ -383,10 +385,13 @@ class PickleSerializer(FramedSerializer):
"""

def dumps(self, obj):
return pickle.dumps(obj, 2)
return pickle.dumps(obj, protocol)

def loads(self, obj):
return pickle.loads(obj)
if sys.version >= '3':
return pickle.loads(obj, encoding='bytes')
else:
return pickle.loads(obj)


class CloudPickleSerializer(PickleSerializer):
Expand Down
3 changes: 0 additions & 3 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,6 @@ def test_infer_nested_schema(self):

def test_apply_schema(self):
from datetime import date, datetime
if sys.version >= '3':
# TODO(davies): fix deserialize datetime from python3 in Pyrolite
return
rdd = self.sc.parallelize([(127, -128, -32768, 32767, 2147483647, 1.0,
date(2010, 1, 1), datetime(2010, 1, 1, 1, 1, 1),
{"a": 1}, (2,), [1, 2, 3], None)])
Expand Down

0 comments on commit d737924

Please sign in to comment.