diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index 389bee7eee6e9..40e91a2d0655d 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -9,10 +9,10 @@ It does not include an unpickler, as standard python unpickling suffices. This module was extracted from the `cloud` package, developed by `PiCloud, Inc. -`_. +`_. Copyright (c) 2012, Regents of the University of California. -Copyright (c) 2009 `PiCloud, Inc. `_. +Copyright (c) 2009 `PiCloud, Inc. `_. All rights reserved. Redistribution and use in source and binary forms, with or without @@ -42,18 +42,19 @@ """ from __future__ import print_function -import operator -import opcode -import os +import dis +from functools import partial +import imp import io +import itertools +import logging +import opcode +import operator import pickle import struct import sys -import types -from functools import partial -import itertools -import dis import traceback +import types import weakref from pyspark.util import _exception_message @@ -71,6 +72,92 @@ from io import BytesIO as StringIO PY3 = True + +def _make_cell_set_template_code(): + """Get the Python compiler to emit LOAD_FAST(arg); STORE_DEREF + + Notes + ----- + In Python 3, we could use an easier function: + + .. code-block:: python + + def f(): + cell = None + + def _stub(value): + nonlocal cell + cell = value + + return _stub + + _cell_set_template_code = f() + + This function is _only_ a LOAD_FAST(arg); STORE_DEREF, but that is + invalid syntax on Python 2. If we use this function we also don't need + to do the weird freevars/cellvars swap below + """ + def inner(value): + lambda: cell # make ``cell`` a closure so that we get a STORE_DEREF + cell = value + + co = inner.__code__ + + # NOTE: we are marking the cell variable as a free variable intentionally + # so that we simulate an inner function instead of the outer function. This + # is what gives us the ``nonlocal`` behavior in a Python 2 compatible way. + if not PY3: + return types.CodeType( + co.co_argcount, + co.co_nlocals, + co.co_stacksize, + co.co_flags, + co.co_code, + co.co_consts, + co.co_names, + co.co_varnames, + co.co_filename, + co.co_name, + co.co_firstlineno, + co.co_lnotab, + co.co_cellvars, # this is the trickery + (), + ) + else: + return types.CodeType( + co.co_argcount, + co.co_kwonlyargcount, + co.co_nlocals, + co.co_stacksize, + co.co_flags, + co.co_code, + co.co_consts, + co.co_names, + co.co_varnames, + co.co_filename, + co.co_name, + co.co_firstlineno, + co.co_lnotab, + co.co_cellvars, # this is the trickery + (), + ) + + +_cell_set_template_code = _make_cell_set_template_code() + + +def cell_set(cell, value): + """Set the value of a closure cell. + """ + return types.FunctionType( + _cell_set_template_code, + {}, + '_cell_set_inner', + (), + (cell,), + )(value) + + #relevant opcodes STORE_GLOBAL = opcode.opmap['STORE_GLOBAL'] DELETE_GLOBAL = opcode.opmap['DELETE_GLOBAL'] @@ -161,6 +248,7 @@ def dump(self, obj): print_exec(sys.stderr) raise pickle.PicklingError(msg) + def save_memoryview(self, obj): """Fallback to save_string""" Pickler.save_string(self, str(obj)) @@ -186,8 +274,22 @@ def save_module(self, obj): """ Save a module as an import """ + mod_name = obj.__name__ + # If module is successfully found then it is not a dynamically created module + if hasattr(obj, '__file__'): + is_dynamic = False + else: + try: + _find_module(mod_name) + is_dynamic = False + except ImportError: + is_dynamic = True + self.modules.add(obj) - self.save_reduce(subimport, (obj.__name__,), obj=obj) + if is_dynamic: + self.save_reduce(dynamic_subimport, (obj.__name__, vars(obj)), obj=obj) + else: + self.save_reduce(subimport, (obj.__name__,), obj=obj) dispatch[types.ModuleType] = save_module def save_codeobject(self, obj): @@ -241,11 +343,32 @@ def save_function(self, obj, name=None): if getattr(themodule, name, None) is obj: return self.save_global(obj, name) + # a builtin_function_or_method which comes in as an attribute of some + # object (e.g., object.__new__, itertools.chain.from_iterable) will end + # up with modname "__main__" and so end up here. But these functions + # have no __code__ attribute in CPython, so the handling for + # user-defined functions below will fail. + # So we pickle them here using save_reduce; have to do it differently + # for different python versions. + if not hasattr(obj, '__code__'): + if PY3: + if sys.version_info < (3, 4): + raise pickle.PicklingError("Can't pickle %r" % obj) + else: + rv = obj.__reduce_ex__(self.proto) + else: + if hasattr(obj, '__self__'): + rv = (getattr, (obj.__self__, name)) + else: + raise pickle.PicklingError("Can't pickle %r" % obj) + return Pickler.save_reduce(self, obj=obj, *rv) + # if func is lambda, def'ed at prompt, is in main, or is nested, then # we'll pickle the actual function object rather than simply saving a # reference (as is done in default pickler), via save_function_tuple. - if islambda(obj) or obj.__code__.co_filename == '' or themodule is None: - #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule) + if (islambda(obj) + or getattr(obj.__code__, 'co_filename', None) == '' + or themodule is None): self.save_function_tuple(obj) return else: @@ -267,6 +390,97 @@ def save_function(self, obj, name=None): self.memoize(obj) dispatch[types.FunctionType] = save_function + def _save_subimports(self, code, top_level_dependencies): + """ + Ensure de-pickler imports any package child-modules that + are needed by the function + """ + # check if any known dependency is an imported package + for x in top_level_dependencies: + if isinstance(x, types.ModuleType) and hasattr(x, '__package__') and x.__package__: + # check if the package has any currently loaded sub-imports + prefix = x.__name__ + '.' + for name, module in sys.modules.items(): + # Older versions of pytest will add a "None" module to sys.modules. + if name is not None and name.startswith(prefix): + # check whether the function can address the sub-module + tokens = set(name[len(prefix):].split('.')) + if not tokens - set(code.co_names): + # ensure unpickler executes this import + self.save(module) + # then discards the reference to it + self.write(pickle.POP) + + def save_dynamic_class(self, obj): + """ + Save a class that can't be stored as module global. + + This method is used to serialize classes that are defined inside + functions, or that otherwise can't be serialized as attribute lookups + from global modules. + """ + clsdict = dict(obj.__dict__) # copy dict proxy to a dict + if not isinstance(clsdict.get('__dict__', None), property): + # don't extract dict that are properties + clsdict.pop('__dict__', None) + clsdict.pop('__weakref__', None) + + # hack as __new__ is stored differently in the __dict__ + new_override = clsdict.get('__new__', None) + if new_override: + clsdict['__new__'] = obj.__new__ + + # namedtuple is a special case for Spark where we use the _load_namedtuple function + if getattr(obj, '_is_namedtuple_', False): + self.save_reduce(_load_namedtuple, (obj.__name__, obj._fields)) + return + + save = self.save + write = self.write + + # We write pickle instructions explicitly here to handle the + # possibility that the type object participates in a cycle with its own + # __dict__. We first write an empty "skeleton" version of the class and + # memoize it before writing the class' __dict__ itself. We then write + # instructions to "rehydrate" the skeleton class by restoring the + # attributes from the __dict__. + # + # A type can appear in a cycle with its __dict__ if an instance of the + # type appears in the type's __dict__ (which happens for the stdlib + # Enum class), or if the type defines methods that close over the name + # of the type, (which is common for Python 2-style super() calls). + + # Push the rehydration function. + save(_rehydrate_skeleton_class) + + # Mark the start of the args for the rehydration function. + write(pickle.MARK) + + # On PyPy, __doc__ is a readonly attribute, so we need to include it in + # the initial skeleton class. This is safe because we know that the + # doc can't participate in a cycle with the original class. + doc_dict = {'__doc__': clsdict.pop('__doc__', None)} + + # Create and memoize an empty class with obj's name and bases. + save(type(obj)) + save(( + obj.__name__, + obj.__bases__, + doc_dict, + )) + write(pickle.REDUCE) + self.memoize(obj) + + # Now save the rest of obj's __dict__. Any references to obj + # encountered while saving will point to the skeleton class. + save(clsdict) + + # Write a tuple of (skeleton_class, clsdict). + write(pickle.TUPLE) + + # Call _rehydrate_skeleton_class(skeleton_class, clsdict) + write(pickle.REDUCE) + def save_function_tuple(self, func): """ Pickles an actual func object. @@ -279,17 +493,31 @@ def save_function_tuple(self, func): safe, since this won't contain a ref to the func), and memoize it as soon as it's created. The other stuff can then be filled in later. """ + if is_tornado_coroutine(func): + self.save_reduce(_rebuild_tornado_coroutine, (func.__wrapped__,), + obj=func) + return + save = self.save write = self.write - code, f_globals, defaults, closure, dct, base_globals = self.extract_func_data(func) + code, f_globals, defaults, closure_values, dct, base_globals = self.extract_func_data(func) save(_fill_function) # skeleton function updater write(pickle.MARK) # beginning of tuple that _fill_function expects + self._save_subimports( + code, + itertools.chain(f_globals.values(), closure_values or ()), + ) + # create a skeleton function object and memoize it save(_make_skel_func) - save((code, closure, base_globals)) + save(( + code, + len(closure_values) if closure_values is not None else -1, + base_globals, + )) write(pickle.REDUCE) self.memoize(func) @@ -298,6 +526,7 @@ def save_function_tuple(self, func): save(defaults) save(dct) save(func.__module__) + save(closure_values) write(pickle.TUPLE) write(pickle.REDUCE) # applies _fill_function on the tuple @@ -335,7 +564,7 @@ def extract_code_globals(cls, co): def extract_func_data(self, func): """ Turn the function into a tuple of data necessary to recreate it: - code, globals, defaults, closure, dict + code, globals, defaults, closure_values, dict """ code = func.__code__ @@ -352,7 +581,11 @@ def extract_func_data(self, func): defaults = func.__defaults__ # process closure - closure = [c.cell_contents for c in func.__closure__] if func.__closure__ else [] + closure = ( + list(map(_get_cell_contents, func.__closure__)) + if func.__closure__ is not None + else None + ) # save the dict dct = func.__dict__ @@ -363,12 +596,18 @@ def extract_func_data(self, func): return (code, f_globals, defaults, closure, dct, base_globals) def save_builtin_function(self, obj): - if obj.__module__ is "__builtin__": + if obj.__module__ == "__builtin__": return self.save_global(obj) return self.save_function(obj) dispatch[types.BuiltinFunctionType] = save_builtin_function def save_global(self, obj, name=None, pack=struct.pack): + """ + Save a "global". + + The name of this method is somewhat misleading: all types get + dispatched here. + """ if obj.__module__ == "__builtin__" or obj.__module__ == "builtins": if obj in _BUILTIN_TYPE_NAMES: return self.save_reduce(_builtin_type, (_BUILTIN_TYPE_NAMES[obj],), obj=obj) @@ -397,42 +636,7 @@ def save_global(self, obj, name=None, pack=struct.pack): typ = type(obj) if typ is not obj and isinstance(obj, (type, types.ClassType)): - d = dict(obj.__dict__) # copy dict proxy to a dict - if not isinstance(d.get('__dict__', None), property): - # don't extract dict that are properties - d.pop('__dict__', None) - d.pop('__weakref__', None) - - # hack as __new__ is stored differently in the __dict__ - new_override = d.get('__new__', None) - if new_override: - d['__new__'] = obj.__new__ - - # workaround for namedtuple (hijacked by PySpark) - if getattr(obj, '_is_namedtuple_', False): - self.save_reduce(_load_namedtuple, (obj.__name__, obj._fields)) - return - - self.save(_load_class) - self.save_reduce(typ, (obj.__name__, obj.__bases__, {"__doc__": obj.__doc__}), obj=obj) - d.pop('__doc__', None) - # handle property and staticmethod - dd = {} - for k, v in d.items(): - if isinstance(v, property): - k = ('property', k) - v = (v.fget, v.fset, v.fdel, v.__doc__) - elif isinstance(v, staticmethod) and hasattr(v, '__func__'): - k = ('staticmethod', k) - v = v.__func__ - elif isinstance(v, classmethod) and hasattr(v, '__func__'): - k = ('classmethod', k) - v = v.__func__ - dd[k] = v - self.save(dd) - self.write(pickle.TUPLE2) - self.write(pickle.REDUCE) - + self.save_dynamic_class(obj) else: raise pickle.PicklingError("Can't pickle %r" % obj) @@ -441,18 +645,26 @@ def save_global(self, obj, name=None, pack=struct.pack): def save_instancemethod(self, obj): # Memoization rarely is ever useful due to python bounding - if PY3: - self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj) + if obj.__self__ is None: + self.save_reduce(getattr, (obj.im_class, obj.__name__)) else: - self.save_reduce(types.MethodType, (obj.__func__, obj.__self__, obj.__self__.__class__), - obj=obj) + if PY3: + self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj) + else: + self.save_reduce(types.MethodType, (obj.__func__, obj.__self__, obj.__self__.__class__), + obj=obj) dispatch[types.MethodType] = save_instancemethod def save_inst(self, obj): - """Inner logic to save instance. Based off pickle.save_inst - Supports __transient__""" + """Inner logic to save instance. Based off pickle.save_inst""" cls = obj.__class__ + # Try the dispatch table (pickle module doesn't do it) + f = self.dispatch.get(cls) + if f: + f(self, obj) # Call unbound method with explicit self + return + memo = self.memo write = self.write save = self.save @@ -482,13 +694,6 @@ def save_inst(self, obj): getstate = obj.__getstate__ except AttributeError: stuff = obj.__dict__ - #remove items if transient - if hasattr(obj, '__transient__'): - transient = obj.__transient__ - stuff = stuff.copy() - for k in list(stuff.keys()): - if k in transient: - del stuff[k] else: stuff = getstate() pickle._keep_alive(stuff, memo) @@ -503,6 +708,17 @@ def save_property(self, obj): self.save_reduce(property, (obj.fget, obj.fset, obj.fdel, obj.__doc__), obj=obj) dispatch[property] = save_property + def save_classmethod(self, obj): + try: + orig_func = obj.__func__ + except AttributeError: # Python 2.6 + orig_func = obj.__get__(None, object) + if isinstance(obj, classmethod): + orig_func = orig_func.__func__ # Unbind + self.save_reduce(type(obj), (orig_func,), obj=obj) + dispatch[classmethod] = save_classmethod + dispatch[staticmethod] = save_classmethod + def save_itemgetter(self, obj): """itemgetter serializer (needed for namedtuple support)""" class Dummy: @@ -540,8 +756,6 @@ def __getattribute__(self, item): def save_reduce(self, func, args, state=None, listitems=None, dictitems=None, obj=None): - """Modified to support __transient__ on new objects - Change only affects protocol level 2 (which is always used by PiCloud""" # Assert that args is a tuple or None if not isinstance(args, tuple): raise pickle.PicklingError("args from reduce() should be a tuple") @@ -555,7 +769,6 @@ def save_reduce(self, func, args, state=None, # Protocol 2 special case: if func's name is __newobj__, use NEWOBJ if self.proto >= 2 and getattr(func, "__name__", "") == "__newobj__": - #Added fix to allow transient cls = args[0] if not hasattr(cls, "__new__"): raise pickle.PicklingError( @@ -566,15 +779,6 @@ def save_reduce(self, func, args, state=None, args = args[1:] save(cls) - #Don't pickle transient entries - if hasattr(obj, '__transient__'): - transient = obj.__transient__ - state = state.copy() - - for k in list(state.keys()): - if k in transient: - del state[k] - save(args) write(pickle.NEWOBJ) else: @@ -623,72 +827,82 @@ def save_file(self, obj): return self.save_reduce(getattr, (sys,'stderr'), obj=obj) if obj is sys.stdin: raise pickle.PicklingError("Cannot pickle standard input") - if hasattr(obj, 'isatty') and obj.isatty(): + if obj.closed: + raise pickle.PicklingError("Cannot pickle closed files") + if hasattr(obj, 'isatty') and obj.isatty(): raise pickle.PicklingError("Cannot pickle files that map to tty objects") - if 'r' not in obj.mode: - raise pickle.PicklingError("Cannot pickle files that are not opened for reading") + if 'r' not in obj.mode and '+' not in obj.mode: + raise pickle.PicklingError("Cannot pickle files that are not opened for reading: %s" % obj.mode) + name = obj.name - try: - fsize = os.stat(name).st_size - except OSError: - raise pickle.PicklingError("Cannot pickle file %s as it cannot be stat" % name) - if obj.closed: - #create an empty closed string io - retval = pystringIO.StringIO("") - retval.close() - elif not fsize: #empty file - retval = pystringIO.StringIO("") - try: - tmpfile = file(name) - tst = tmpfile.read(1) - except IOError: - raise pickle.PicklingError("Cannot pickle file %s as it cannot be read" % name) - tmpfile.close() - if tst != '': - raise pickle.PicklingError("Cannot pickle file %s as it does not appear to map to a physical, real file" % name) - else: - try: - tmpfile = file(name) - contents = tmpfile.read() - tmpfile.close() - except IOError: - raise pickle.PicklingError("Cannot pickle file %s as it cannot be read" % name) - retval = pystringIO.StringIO(contents) + retval = pystringIO.StringIO() + + try: + # Read the whole file curloc = obj.tell() - retval.seek(curloc) + obj.seek(0) + contents = obj.read() + obj.seek(curloc) + except IOError: + raise pickle.PicklingError("Cannot pickle file %s as it cannot be read" % name) + retval.write(contents) + retval.seek(curloc) retval.name = name self.save(retval) self.memoize(obj) + def save_ellipsis(self, obj): + self.save_reduce(_gen_ellipsis, ()) + + def save_not_implemented(self, obj): + self.save_reduce(_gen_not_implemented, ()) + if PY3: dispatch[io.TextIOWrapper] = save_file else: dispatch[file] = save_file - """Special functions for Add-on libraries""" + dispatch[type(Ellipsis)] = save_ellipsis + dispatch[type(NotImplemented)] = save_not_implemented - def inject_numpy(self): - numpy = sys.modules.get('numpy') - if not numpy or not hasattr(numpy, 'ufunc'): - return - self.dispatch[numpy.ufunc] = self.__class__.save_ufunc - - def save_ufunc(self, obj): - """Hack function for saving numpy ufunc objects""" - name = obj.__name__ - numpy_tst_mods = ['numpy', 'scipy.special'] - for tst_mod_name in numpy_tst_mods: - tst_mod = sys.modules.get(tst_mod_name, None) - if tst_mod and name in tst_mod.__dict__: - return self.save_reduce(_getobject, (tst_mod_name, name)) - raise pickle.PicklingError('cannot save %s. Cannot resolve what module it is defined in' - % str(obj)) + # WeakSet was added in 2.7. + if hasattr(weakref, 'WeakSet'): + def save_weakset(self, obj): + self.save_reduce(weakref.WeakSet, (list(obj),)) + dispatch[weakref.WeakSet] = save_weakset + + """Special functions for Add-on libraries""" def inject_addons(self): """Plug in system. Register additional pickling functions if modules already loaded""" - self.inject_numpy() + pass + + def save_logger(self, obj): + self.save_reduce(logging.getLogger, (obj.name,), obj=obj) + + dispatch[logging.Logger] = save_logger + + +# Tornado support + +def is_tornado_coroutine(func): + """ + Return whether *func* is a Tornado coroutine function. + Running coroutines are not supported. + """ + if 'tornado.gen' not in sys.modules: + return False + gen = sys.modules['tornado.gen'] + if not hasattr(gen, "is_coroutine_function"): + # Tornado version is too old + return False + return gen.is_coroutine_function(func) + +def _rebuild_tornado_coroutine(func): + from tornado import gen + return gen.coroutine(func) # Shorthands for legacy support @@ -705,6 +919,10 @@ def dumps(obj, protocol=2): return file.getvalue() +# including pickles unloading functions in this namespace +load = pickle.load +loads = pickle.loads + #hack for __import__ not working as desired def subimport(name): @@ -712,6 +930,12 @@ def subimport(name): return sys.modules[name] +def dynamic_subimport(name, vars): + mod = imp.new_module(name) + mod.__dict__.update(vars) + sys.modules[name] = mod + return mod + # restores function attributes def _restore_attr(obj, attr): for key, val in attr.items(): @@ -755,59 +979,114 @@ def _genpartial(func, args, kwds): kwds = {} return partial(func, *args, **kwds) +def _gen_ellipsis(): + return Ellipsis -def _fill_function(func, globals, defaults, dict, module): +def _gen_not_implemented(): + return NotImplemented + + +def _get_cell_contents(cell): + try: + return cell.cell_contents + except ValueError: + # sentinel used by ``_fill_function`` which will leave the cell empty + return _empty_cell_value + + +def instance(cls): + """Create a new instance of a class. + + Parameters + ---------- + cls : type + The class to create an instance of. + + Returns + ------- + instance : cls + A new instance of ``cls``. + """ + return cls() + + +@instance +class _empty_cell_value(object): + """sentinel for empty closures + """ + @classmethod + def __reduce__(cls): + return cls.__name__ + + +def _fill_function(func, globals, defaults, dict, module, closure_values): """ Fills in the rest of function data into the skeleton function object that were created via _make_skel_func(). - """ + """ func.__globals__.update(globals) func.__defaults__ = defaults func.__dict__ = dict func.__module__ = module - return func + cells = func.__closure__ + if cells is not None: + for cell, value in zip(cells, closure_values): + if value is not _empty_cell_value: + cell_set(cell, value) + return func -def _make_cell(value): - return (lambda: value).__closure__[0] +def _make_empty_cell(): + if False: + # trick the compiler into creating an empty cell in our lambda + cell = None + raise AssertionError('this route should not be executed') -def _reconstruct_closure(values): - return tuple([_make_cell(v) for v in values]) + return (lambda: cell).__closure__[0] -def _make_skel_func(code, closures, base_globals = None): +def _make_skel_func(code, cell_count, base_globals=None): """ Creates a skeleton function object that contains just the provided code and the correct number of cells in func_closure. All other func attributes (e.g. func_globals) are empty. """ - closure = _reconstruct_closure(closures) if closures else None - if base_globals is None: base_globals = {} base_globals['__builtins__'] = __builtins__ - return types.FunctionType(code, base_globals, - None, None, closure) + closure = ( + tuple(_make_empty_cell() for _ in range(cell_count)) + if cell_count >= 0 else + None + ) + return types.FunctionType(code, base_globals, None, None, closure) -def _load_class(cls, d): - """ - Loads additional properties into class `cls`. +def _rehydrate_skeleton_class(skeleton_class, class_dict): + """Put attributes from `class_dict` back on `skeleton_class`. + + See CloudPickler.save_dynamic_class for more info. """ - for k, v in d.items(): - if isinstance(k, tuple): - typ, k = k - if typ == 'property': - v = property(*v) - elif typ == 'staticmethod': - v = staticmethod(v) - elif typ == 'classmethod': - v = classmethod(v) - setattr(cls, k, v) - return cls + for attrname, attr in class_dict.items(): + setattr(skeleton_class, attrname, attr) + return skeleton_class +def _find_module(mod_name): + """ + Iterate over each part instead of calling imp.find_module directly. + This function is able to find submodules (e.g. sickit.tree) + """ + path = None + for part in mod_name.split('.'): + if path is not None: + path = [path] + file, path, description = imp.find_module(part, path) + if file is not None: + file.close() + return path, description + def _load_namedtuple(name, fields): """ Loads a class generated by namedtuple @@ -815,10 +1094,24 @@ def _load_namedtuple(name, fields): from collections import namedtuple return namedtuple(name, fields) - """Constructors for 3rd party libraries Note: These can never be renamed due to client compatibility issues""" def _getobject(modname, attribute): mod = __import__(modname, fromlist=[attribute]) return mod.__dict__[attribute] + + +""" Use copy_reg to extend global pickle definitions """ + +if sys.version_info < (3, 4): + method_descriptor = type(str.upper) + + def _reduce_method_descriptor(obj): + return (getattr, (obj.__objclass__, obj.__name__)) + + try: + import copy_reg as copyreg + except ImportError: + import copyreg + copyreg.pickle(method_descriptor, _reduce_method_descriptor)