Skip to content

Commit

Permalink
Guard more changes behind sys.version; still doesn't run
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen authored and Davies Liu committed Mar 20, 2015
1 parent 1aa5e8f commit 2fb2db3
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 30 deletions.
6 changes: 5 additions & 1 deletion python/pyspark/broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
#

import os
import cPickle
import sys
if sys.version < '3':
import cPickle
else:
import pickle as cPickle
import gc
from tempfile import NamedTemporaryFile

Expand Down
79 changes: 55 additions & 24 deletions python/pyspark/cloudpickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,16 @@
import itertools
if sys.version < '3':
from copy_reg import _extension_registry, _inverted_registry, _extension_cache
from types import InstanceType
from pickle import DictionaryType
else:
from copyreg import _extension_registry, _inverted_registry, _extension_cache
InstanceType = object # see http://bugs.python.org/issue8206
DictionaryType = dict
import dis
import traceback
import platform

# But see http://bugs.python.org/issue8206
try:
from types import InstanceType
except ImportError:
InstanceType = object


PyImp = platform.python_implementation()


Expand Down Expand Up @@ -106,7 +103,9 @@ def range_params(rangeobj):
Note: Only guarentees that elements of range are the same. parameters may
be different.
e.g. range(1,1) is interpretted as range(0,0); both behave the same
though w/ iteration
though w/ iteration.
NOTE: this is only used by Python 3.
"""

range_len = len(rangeobj)
Expand All @@ -117,6 +116,25 @@ def range_params(rangeobj):
return start, 1, 1
return (start, rangeobj[1] - rangeobj[0], range_len)

def xrange_params(xrangeobj):
"""Returns a 3 element tuple describing the xrange start, step, and len
respectively
Note: Only guarentees that elements of xrange are the same. parameters may
be different.
e.g. xrange(1,1) is interpretted as xrange(0,0); both behave the same
though w/ iteration
NOTE: this is only used by Python 2.x
"""

xrange_len = len(xrangeobj)
if not xrange_len: #empty
return (0,1,0)
start = xrangeobj[0]
if xrange_len == 1: #one element
return start, 1, 1
return (start, xrangeobj[1] - xrangeobj[0], xrange_len)

#debug variables intended for developer use:
printSerialization = False
printMemoization = False
Expand All @@ -127,7 +145,10 @@ def range_params(rangeobj):

class CloudPickler(pickle.Pickler):

dispatch = pickle._Pickler.dispatch.copy()
if sys.version < '3':
dispatch = pickle.Pickler.dispatch.copy()
else:
dispatch = pickle._Pickler.dispatch.copy()
savedForceImports = False
savedDjangoEnv = False #hack tro transport django environment

Expand Down Expand Up @@ -159,7 +180,14 @@ def dump(self, obj):
def save_memoryview(self, obj):
"""Fallback to save_string"""
pickle.Pickler.save_string(self,str(obj))
dispatch[memoryview] = save_memoryview
if sys.version >= '3':
dispatch[memoryview] = save_memoryview

def save_buffer(self, obj):
"""Fallback to save_string"""
pickle.Pickler.save_string(self,str(obj))
if sys.version < '3':
dispatch[buffer] = save_buffer

#block broken objects
def save_unsupported(self, obj, pack=None):
Expand Down Expand Up @@ -187,7 +215,7 @@ def save_dict(self, obj):
self.save_reduce(_get_module_builtins, (), obj=obj)
else:
pickle.Pickler.save_dict(self, obj)
dispatch[dict] = save_dict
dispatch[DictionaryType] = save_dict


def save_module(self, obj, pack=struct.pack):
Expand Down Expand Up @@ -665,19 +693,19 @@ def save_reduce(self, func, args, state=None,
save(state)
write(pickle.BUILD)


def save_range(self, obj):
def save_xrange(self, obj):
"""Save an xrange object in python 2.5
Python 2.6 supports this natively
"""
_range_params = range_params(obj)
self.save_reduce(_build_range, _range_params)
range_params = xrange_params(obj)
self.save_reduce(_build_xrange,range_params)

#python2.6+ supports xrange pickling. some py2.5 extensions might as well. We just test it
try:
range(0).__reduce__()
except TypeError: #can't pickle -- use PiCloud pickler
dispatch[range] = save_range
if sys.version < '3':
#python2.6+ supports xrange pickling. some py2.5 extensions might as well. We just test it
try:
xrange(0).__reduce__()
except TypeError: #can't pickle -- use PiCloud pickler
dispatch[xrange] = save_xrange

def save_partial(self, obj):
"""Partial objects do not serialize correctly in python2.x -- this fixes the bugs"""
Expand Down Expand Up @@ -738,7 +766,10 @@ def save_file(self, obj):
self.save(retval) #save stringIO
self.memoize(obj)

dispatch[io.TextIOWrapper] = save_file
if sys.version >= '3':
dispatch[io.TextIOWrapper] = save_file
else:
dispatch[file] = save_file
"""Special functions for Add-on libraries"""

def inject_numpy(self):
Expand Down Expand Up @@ -895,9 +926,9 @@ def _modules_to_main(modList):
setattr(main,modname.__name__, modname)

#object generators:
def _build_range(start, step, len):
"""Built xrange explicitly"""
return range(start, start + step*len, step)
def _build_xrange(start, step, len):
"""Build xrange explicitly (only used on Python 2.x)"""
return xrange(start, start + step*len, step)

def _genpartial(func, args, kwds):
if not args:
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@

__all__ = ['SparkConf']

import sys
# TODO: this is a hack
if sys.version >= '3':
unicode = str


class SparkConf(object):

Expand Down
2 changes: 2 additions & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import os
import shutil
import sys
if sys.version >= '3':
xrange = range
from threading import Lock
from tempfile import NamedTemporaryFile

Expand Down
8 changes: 7 additions & 1 deletion python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@

import copy
from collections import defaultdict
from itertools import chain, ifilter, imap
from itertools import chain
import sys
if sys.version < '3':
from itertools import ifilter, imap
else:
ifilter = filter
imap = map
import operator
import sys
import shlex
Expand Down
19 changes: 15 additions & 4 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,18 @@
>>> sc.stop()
"""

import cPickle
from itertools import chain, izip, product
import sys
if sys.version < '3':
import cPickle
else:
import pickle as cPickle
from itertools import chain, product
if sys.version < '3':
from itertools import izip
else:
izip = zip
import marshal
import struct
import sys
import types
import collections
import zlib
Expand Down Expand Up @@ -377,7 +384,11 @@ def namedtuple(*args, **kwargs):
# hack the cls already generated by namedtuple
# those created in other module can be pickled as normal,
# so only hack those in __main__ module
for n, o in sys.modules["__main__"].__dict__.iteritems():
try:
module_items = sys.modules["__main__"].__dict__.iteritems()
except AttributeError:
module_items = sys.modules["__main__"].__dict__.items()
for n, o in module_items:
if (type(o) is type and o.__base__ is tuple
and hasattr(o, "_fields")
and "__reduce__" not in o.__dict__):
Expand Down

0 comments on commit 2fb2db3

Please sign in to comment.