diff --git a/.travis.yml b/.travis.yml index bbca21a8c..060d0002e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -27,7 +27,7 @@ env: - SETUP_CMD='test -a "--boxed"' - NUMPY_VERSION=1.13 - ASTROPY_VERSION=stable - - CONDA_DEPENDENCIES='matplotlib aplpy pytest pytest-xdist astropy-helpers' + - CONDA_DEPENDENCIES='matplotlib aplpy pytest pytest-xdist astropy-helpers joblib' - CONDA_CHANNELS='astropy-ci-extras astropy' - PIP_DEPENDENCIES='https://github.com/radio-astro-tools/pvextractor/archive/master.zip radio_beam' - SETUP_XVFB=True @@ -45,7 +45,7 @@ matrix: # make sure *everything* is run for coverage tests - python: 3.6 env: SETUP_CMD='test --coverage' - CONDA_DEPENDENCIES='matplotlib aplpy yt bottleneck pytest pytest-xdist astropy-helpers' + CONDA_DEPENDENCIES='matplotlib aplpy yt bottleneck pytest pytest-xdist astropy-helpers joblib' # Check for sphinx doc build warnings - we do this first because it # may run for a long time diff --git a/CHANGES.rst b/CHANGES.rst index 2dd2a65f3..648e535a3 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,6 +1,10 @@ 0.4.3 (unreleased) ------------------ - - none yet + - Refactor spectral smoothing tools to allow parallelized application *and* + memory mapped output (to avoid loading cube into memory). Created + ``apply_function_parallel_spectral`` to make this general. Added + ``joblib`` as a dependency. + (https://github.com/radio-astro-tools/spectral-cube/pull/474) 0.4.2 (2018-02-21) ------------------ diff --git a/pip-requirements b/pip-requirements new file mode 100644 index 000000000..aa89daa66 --- /dev/null +++ b/pip-requirements @@ -0,0 +1,3 @@ +numpy>=1.9 +astropy>=1.0 +joblib diff --git a/spectral_cube/cube_utils.py b/spectral_cube/cube_utils.py index 5b043e024..f058fb770 100644 --- a/spectral_cube/cube_utils.py +++ b/spectral_cube/cube_utils.py @@ -397,27 +397,26 @@ def _map_context(numcores): """ Mapping context manager to allow parallel mapping or regular mapping depending on the number of cores specified. + + The builtin map is overloaded to handle python3 problems: python3 returns a + generator, while ``multiprocessing.Pool.map`` actually runs the whole thing """ if numcores is not None and numcores > 1: try: - import multiprocessing - p = multiprocessing.Pool(processes=numcores) - map = p.map + from joblib import Parallel, delayed + from joblib.pool import has_shareable_memory + map = lambda x,y: Parallel(n_jobs=numcores)(delayed(has_shareable_memory)(x))(y) parallel = True except ImportError: - map = builtins.map - warnings.warn("Could not import multiprocessing. " + map = lambda x,y: list(builtins.map(x,y)) + warnings.warn("Could not import joblib. " "map will be non-parallel.") parallel = False else: parallel = False - map = builtins.map + map = lambda x,y: list(builtins.map(x,y)) - try: - yield map - finally: - if parallel: - p.close() + yield map def convert_bunit(bunit): diff --git a/spectral_cube/spectral_cube.py b/spectral_cube/spectral_cube.py index 164fd75a8..7efa54fe8 100644 --- a/spectral_cube/spectral_cube.py +++ b/spectral_cube/spectral_cube.py @@ -10,6 +10,7 @@ import re import itertools import copy +import tempfile import astropy.wcs from astropy import units as u @@ -87,6 +88,20 @@ def wrapper(*args, **kwargs): wrapper.__doc__ += _NP_DOC return wrapper +def _apply_function(arguments, outcube, function, **kwargs): + """ + Helper function to apply a function to a spectrum. + Needs to be declared toward the top of the code to allow pickling by + joblib. + """ + (spec, includemask, ii, jj) = arguments + + if any(includemask): + outcube[:,jj,ii] = function(spec, **kwargs) + else: + outcube[:,jj,ii] = spec + + # convenience structures to keep track of the reversed index # conventions between WCS and numpy np2wcs = {2: 0, 1: 1, 0: 2} @@ -778,7 +793,7 @@ def _cube_on_cube_operation(self, function, cube, equivalencies=[], **kwargs): return self._new_cube_with(data=data, unit=unit) def apply_function(self, function, axis=None, weights=None, unit=None, - projection=False, progressbar=False, + projection=False, progressbar=False, update_function=None, keep_shape=False, **kwargs): """ Apply a function to valid data along the specified axis or to the whole @@ -2569,7 +2584,11 @@ def _gsmooth_image(args): return newcube - def spectral_smooth_median(self, ksize, update_function=None, **kwargs): + def spectral_smooth_median(self, ksize, + use_memmap=True, + verbose=0, + num_cores=None, + **kwargs): """ Smooth the cube along the spectral dimension @@ -2577,72 +2596,138 @@ def spectral_smooth_median(self, ksize, update_function=None, **kwargs): ---------- ksize : int Size of the median filter (scipy.ndimage.filters.median_filter) - update_function : method - Method that is called to update an external progressbar - If provided, it disables the default `astropy.utils.console.ProgressBar` + verbose : int + Verbosity level to pass to joblib + use_memmap : bool + If specified, a memory mapped temporary file on disk will be + written to rather than storing the intermediate spectra in memory. + num_cores : int or None + The number of cores to use if running in parallel kwargs : dict - Passed to the convolve function + Not used at the moment. """ if not scipyOK: raise ImportError("Scipy could not be imported: this function won't work.") + return self._apply_function_parallel_spectral(ndimage.filters.median_filter, + data=self.filled_data, + size=ksize, + num_cores=num_cores, + use_memmap=use_memmap, + **kwargs) + + def _apply_function_parallel_spectral(self, + function, + data, + num_cores=None, + verbose=0, + use_memmap=True, + parallel=True, + **kwargs + ): + """ + Apply a function in parallel along the spectral dimension. The + function will be performed on data with masked values replaced with the + cube's fill value. + + Parameters + ---------- + function : function + The function to apply in the spectral dimension. It must take + two arguments: an array representing a spectrum and a boolean array + representing the mask. It may also accept **kwargs. The function + must return an object with the same shape as the input spectrum. + verbose : int + Verbosity level to pass to joblib + use_memmap : bool + If specified, a memory mapped temporary file on disk will be + written to rather than storing the intermediate spectra in memory. + num_cores : int or None + The number of cores to use if running in parallel + parallel : bool + If set to ``False``, will force the use of a single core without + using ``joblib``. + kwargs : dict + Passed to ``function`` + """ shape = self.shape - # "cubelist" is a generator - # the boolean check will skip smoothing for bad spectra + # 'spectra' is a generator + # the boolean check will skip the function for bad spectra # TODO: should spatial good/bad be cached? - cubelist = ((self.filled_data[:,jj,ii], - self.mask.include(view=(slice(None), jj, ii))) - for jj in range(self.shape[1]) - for ii in range(self.shape[2])) - - if update_function is None: - pb = ProgressBar(shape[1] * shape[2]) - update_function = pb.update - - def _gsmooth_spectrum(args): - """ - Helper function to smooth a spectrum - """ - (spec, includemask),kwargs = args - update_function() + spectra = ((data[:,jj,ii], + self.mask.include(view=(slice(None), jj, ii)), + ii, jj, + ) + for jj in range(shape[1]) + for ii in range(shape[2])) - if any(includemask): - return ndimage.filters.median_filter(spec, size=ksize) + if use_memmap: + ntf = tempfile.NamedTemporaryFile() + outcube = np.memmap(ntf, mode='w+', shape=shape, dtype=np.float) + else: + if self._is_huge and not self.allow_huge_operations: + raise ValueError("Applying a function without ``use_memmap`` " + "requires loading the whole array into " + "memory *twice*, which can overload the " + "machine's memory for large cubes. Either " + "set ``use_memmap=True`` or set " + "``cube.allow_huge_operations=True`` to " + "override this restriction.") + outcube = np.empty(shape=shape, dtype=np.float) + + if parallel and use_memmap: + # it is not possible to run joblib parallelization without memmap + try: + from joblib import Parallel, delayed + + Parallel(n_jobs=num_cores, + verbose=verbose, + max_nbytes=None)(delayed(_apply_function)(arg, + outcube, + function, + **kwargs) + for arg in spectra) + except ImportError: + if num_cores is not None and num_cores > 1: + warnings.warn("Could not import joblib. Will run in serial.", + ImportError) + parallel = False + + # this isn't an else statement because we want to catch the case where + # the above clause fails on ImportError + if not parallel or not use_memmap: + if verbose > 0: + progressbar = ProgressBar(self.shape[1]*self.shape[2]) + pbu = progressbar.update else: - return spec + pbu = object - # could be numcores, except _gsmooth_spectrum is unpicklable - with cube_utils._map_context(1) as map: - smoothcube_ = np.array([x for x in - map(_gsmooth_spectrum, zip(cubelist, - itertools.cycle([kwargs]), - ) - ) - ] - ) + for arg in spectra: + _apply_function(arg, outcube, function, **kwargs) + pbu() - # empirical: need to swapaxes to get shape right - # cube = np.arange(6*5*4).reshape([4,5,6]).swapaxes(0,2) - # cubelist.T.reshape(cube.shape) == cube - smoothcube = smoothcube_.T.reshape(shape) # TODO: do something about the mask? - newcube = self._new_cube_with(data=smoothcube, wcs=self.wcs, + newcube = self._new_cube_with(data=outcube, wcs=self.wcs, mask=self.mask, meta=self.meta, fill_value=self.fill_value) return newcube + def spectral_smooth(self, kernel, - #numcores=None, convolve=convolution.convolve, - update_function=None, + verbose=0, + use_memmap=True, + num_cores=None, **kwargs): """ Smooth the cube along the spectral dimension + Note that the mask is left unchanged in this operation. + Parameters ---------- kernel : `~astropy.convolution.Kernel1D` @@ -2651,61 +2736,25 @@ def spectral_smooth(self, kernel, The astropy convolution function to use, either `astropy.convolution.convolve` or `astropy.convolution.convolve_fft` - update_function : method - Method that is called to update an external progressbar - If provided, it disables the default `astropy.utils.console.ProgressBar` + verbose : int + Verbosity level to pass to joblib + use_memmap : bool + If specified, a memory mapped temporary file on disk will be + written to rather than storing the intermediate spectra in memory. + num_cores : int or None + The number of cores to use if running in parallel kwargs : dict Passed to the convolve function """ - shape = self.shape - - # "cubelist" is a generator - # the boolean check will skip smoothing for bad spectra - # TODO: should spatial good/bad be cached? - cubelist = ((self.filled_data[:,jj,ii], - self.mask.include(view=(slice(None), jj, ii))) - for jj in range(self.shape[1]) - for ii in range(self.shape[2])) - - if update_function is None: - pb = ProgressBar(shape[1] * shape[2]) - update_function = pb.update - - def _gsmooth_spectrum(args): - """ - Helper function to smooth a spectrum - """ - (spec, includemask),kernel,kwargs = args - update_function() - - if any(includemask): - return convolve(spec, kernel, normalize_kernel=True, **kwargs) - else: - return spec - - # could be numcores, except _gsmooth_spectrum is unpicklable - with cube_utils._map_context(1) as map: - smoothcube_ = np.array([x for x in - map(_gsmooth_spectrum, zip(cubelist, - itertools.cycle([kernel]), - itertools.cycle([kwargs]), - ) - ) - ] - ) - - # empirical: need to swapaxes to get shape right - # cube = np.arange(6*5*4).reshape([4,5,6]).swapaxes(0,2) - # cubelist.T.reshape(cube.shape) == cube - smoothcube = smoothcube_.T.reshape(shape) - - # TODO: do something about the mask? - newcube = self._new_cube_with(data=smoothcube, wcs=self.wcs, - mask=self.mask, meta=self.meta, - fill_value=self.fill_value) - - return newcube + return self._apply_function_parallel_spectral(convolve, + data=self.filled_data, + kernel=kernel, + normalize_kernel=True, + num_cores=num_cores, + use_memmap=use_memmap, + verbose=verbose, + **kwargs) def spectral_interpolate(self, spectral_grid, suppress_smooth_warning=False, @@ -2769,6 +2818,7 @@ def spectral_interpolate(self, spectral_grid, warnings.warn("Input grid has too small a spacing. The data should " "be smoothed prior to resampling.") + newcube = np.empty([spectral_grid.size, self.shape[1], self.shape[2]], dtype=cubedata[:1, 0, 0].dtype) newmask = np.empty([spectral_grid.size, self.shape[1], self.shape[2]], diff --git a/spectral_cube/tests/test_performance.py b/spectral_cube/tests/test_performance.py index add7f49bd..2ec25e3c2 100644 --- a/spectral_cube/tests/test_performance.py +++ b/spectral_cube/tests/test_performance.py @@ -4,9 +4,14 @@ from __future__ import print_function, absolute_import, division +import pytest + from .test_moments import moment_cube from .helpers import assert_allclose from ..spectral_cube import SpectralCube +from . import utilities + +from astropy import convolution def find_base_nbytes(obj): # from http://stackoverflow.com/questions/34637875/size-of-numpy-strided-array-broadcast-array-in-memory @@ -50,3 +55,55 @@ def test_pix_cen(): assert find_base_nbytes(s) == sc.shape[0]*bytes_per_pix assert find_base_nbytes(y) == sc.shape[1]*sc.shape[2]*bytes_per_pix assert find_base_nbytes(x) == sc.shape[1]*sc.shape[2]*bytes_per_pix + +@pytest.mark.skipif('True') +def test_parallel_performance_smoothing(): + + import timeit + + setup = 'cube,_ = utilities.generate_gaussian_cube(shape=(300,64,64))' + stmt = 'result = cube.spectral_smooth(kernel=convolution.Gaussian1DKernel(20.0), num_cores={0}, use_memmap=False)' + + rslt = {} + for ncores in (1,2,3,4): + time = timeit.timeit(stmt=stmt.format(ncores), setup=setup, number=5, globals=globals()) + rslt[ncores] = time + + print() + print("memmap=False") + print(rslt) + + setup = 'cube,_ = utilities.generate_gaussian_cube(shape=(300,64,64))' + stmt = 'result = cube.spectral_smooth(kernel=convolution.Gaussian1DKernel(20.0), num_cores={0}, use_memmap=True)' + + rslt = {} + for ncores in (1,2,3,4): + time = timeit.timeit(stmt=stmt.format(ncores), setup=setup, number=5, globals=globals()) + rslt[ncores] = time + + stmt = 'result = cube.spectral_smooth(kernel=convolution.Gaussian1DKernel(20.0), num_cores={0}, use_memmap=True, parallel=False)' + rslt[0] = timeit.timeit(stmt=stmt.format(1), setup=setup, number=5, globals=globals()) + + print() + print("memmap=True") + print(rslt) + + + if False: + for shape in [(300,64,64), (600,64,64), (900,64,64), + (300,128,128), (300,256,256), (900,256,256)]: + + setup = 'cube,_ = utilities.generate_gaussian_cube(shape={0})'.format(shape) + stmt = 'result = cube.spectral_smooth(kernel=convolution.Gaussian1DKernel(20.0), num_cores={0}, use_memmap=True)' + + rslt = {} + for ncores in (1,2,3,4): + time = timeit.timeit(stmt=stmt.format(ncores), setup=setup, number=5, globals=globals()) + rslt[ncores] = time + + stmt = 'result = cube.spectral_smooth(kernel=convolution.Gaussian1DKernel(20.0), num_cores={0}, use_memmap=True, parallel=False)' + rslt[0] = timeit.timeit(stmt=stmt.format(1), setup=setup, number=5, globals=globals()) + + print() + print("memmap=True shape={0}".format(shape)) + print(rslt) diff --git a/spectral_cube/tests/test_regrid.py b/spectral_cube/tests/test_regrid.py index 9b3402c5b..5139b3b6d 100644 --- a/spectral_cube/tests/test_regrid.py +++ b/spectral_cube/tests/test_regrid.py @@ -21,6 +21,12 @@ except ImportError: REPROJECT_INSTALLED = False +try: + import joblib + JOBLIB_INSTALLED = True +except ImportError: + JOBLIB_INSTALLED = False + def test_convolution(): cube, data = cube_and_raw('255_delta.fits') @@ -107,7 +113,14 @@ def test_spectral_smooth(): cube, data = cube_and_raw('522_delta.fits') - result = cube.spectral_smooth(kernel=convolution.Gaussian1DKernel(1.0)) + result = cube.spectral_smooth(kernel=convolution.Gaussian1DKernel(1.0), use_memmap=False) + + np.testing.assert_almost_equal(result[:,0,0].value, + convolution.Gaussian1DKernel(1.0, + x_size=5).array, + 4) + + result = cube.spectral_smooth(kernel=convolution.Gaussian1DKernel(1.0), use_memmap=True) np.testing.assert_almost_equal(result[:,0,0].value, convolution.Gaussian1DKernel(1.0, @@ -115,6 +128,39 @@ def test_spectral_smooth(): 4) +# TODO: uncomment this when we figure out how to make it work +@pytest.mark.skipif('not JOBLIB_INSTALLED') +def test_spectral_smooth_4cores(): + + cube, data = cube_and_raw('522_delta.fits') + + + result = cube.spectral_smooth(kernel=convolution.Gaussian1DKernel(1.0), num_cores=4, use_memmap=True) + + np.testing.assert_almost_equal(result[:,0,0].value, + convolution.Gaussian1DKernel(1.0, + x_size=5).array, + 4) + + # this is one way to test non-parallel mode + result = cube.spectral_smooth(kernel=convolution.Gaussian1DKernel(1.0), num_cores=4, use_memmap=False) + + np.testing.assert_almost_equal(result[:,0,0].value, + convolution.Gaussian1DKernel(1.0, + x_size=5).array, + 4) + + # num_cores = 4 is a contradiction with parallel=False, but we want to make + # sure it does the same thing + result = cube.spectral_smooth(kernel=convolution.Gaussian1DKernel(1.0), num_cores=4, parallel=False) + + np.testing.assert_almost_equal(result[:,0,0].value, + convolution.Gaussian1DKernel(1.0, + x_size=5).array, + 4) + + + def test_spectral_smooth_fail(): cube, data = cube_and_raw('522_delta_beams.fits') diff --git a/spectral_cube/tests/test_spectral_cube.py b/spectral_cube/tests/test_spectral_cube.py index a888258e7..42d5e03b5 100644 --- a/spectral_cube/tests/test_spectral_cube.py +++ b/spectral_cube/tests/test_spectral_cube.py @@ -35,6 +35,12 @@ warnings.simplefilter('error', utils.NotImplementedWarning) warnings.simplefilter('error', utils.WCSMismatchWarning) +try: + import joblib + JOBLIB_INSTALLED = True +except ImportError: + JOBLIB_INSTALLED = False + try: import scipy.ndimage SCIPYOK = True @@ -1680,6 +1686,18 @@ def test_spectral_smooth_median(): np.testing.assert_almost_equal(cube_spectral_median[:,1,1].value, result) +@pytest.mark.skipif('not SCIPYOK') +@pytest.mark.skipif('not JOBLIB_INSTALLED') +def test_spectral_smooth_median_4cores(): + cube, data = cube_and_raw('adv.fits') + + cube_spectral_median = cube.spectral_smooth_median(3, num_cores=4) + + # Check first slice + result = np.array([0.77513282, 0.35675333, 0.35675333, 0.98688694]) + + np.testing.assert_almost_equal(cube_spectral_median[:,1,1].value, result) + def test_initialization_from_units(): """ Regression test for issue 447