Skip to content

Commit

Permalink
Merge pull request #474 from keflavich/memmap_when_smoothing
Browse files Browse the repository at this point in the history
Memory mapping in smoothing operations
  • Loading branch information
keflavich committed Mar 26, 2018
2 parents cd06db6 + 8c475cc commit 8f4f3f9
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 108 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ env:
- SETUP_CMD='test -a "--boxed"'
- NUMPY_VERSION=1.13
- ASTROPY_VERSION=stable
- CONDA_DEPENDENCIES='matplotlib aplpy pytest pytest-xdist astropy-helpers'
- CONDA_DEPENDENCIES='matplotlib aplpy pytest pytest-xdist astropy-helpers joblib'
- CONDA_CHANNELS='astropy-ci-extras astropy'
- PIP_DEPENDENCIES='https://github.com/radio-astro-tools/pvextractor/archive/master.zip radio_beam'
- SETUP_XVFB=True
Expand All @@ -45,7 +45,7 @@ matrix:
# make sure *everything* is run for coverage tests
- python: 3.6
env: SETUP_CMD='test --coverage'
CONDA_DEPENDENCIES='matplotlib aplpy yt bottleneck pytest pytest-xdist astropy-helpers'
CONDA_DEPENDENCIES='matplotlib aplpy yt bottleneck pytest pytest-xdist astropy-helpers joblib'

# Check for sphinx doc build warnings - we do this first because it
# may run for a long time
Expand Down
6 changes: 5 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
0.4.3 (unreleased)
------------------
- none yet
- Refactor spectral smoothing tools to allow parallelized application *and*
memory mapped output (to avoid loading cube into memory). Created
``apply_function_parallel_spectral`` to make this general. Added
``joblib`` as a dependency.
(https://github.com/radio-astro-tools/spectral-cube/pull/474)

0.4.2 (2018-02-21)
------------------
Expand Down
3 changes: 3 additions & 0 deletions pip-requirements
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
numpy>=1.9
astropy>=1.0
joblib
21 changes: 10 additions & 11 deletions spectral_cube/cube_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,27 +397,26 @@ def _map_context(numcores):
"""
Mapping context manager to allow parallel mapping or regular mapping
depending on the number of cores specified.
The builtin map is overloaded to handle python3 problems: python3 returns a
generator, while ``multiprocessing.Pool.map`` actually runs the whole thing
"""
if numcores is not None and numcores > 1:
try:
import multiprocessing
p = multiprocessing.Pool(processes=numcores)
map = p.map
from joblib import Parallel, delayed
from joblib.pool import has_shareable_memory
map = lambda x,y: Parallel(n_jobs=numcores)(delayed(has_shareable_memory)(x))(y)
parallel = True
except ImportError:
map = builtins.map
warnings.warn("Could not import multiprocessing. "
map = lambda x,y: list(builtins.map(x,y))
warnings.warn("Could not import joblib. "
"map will be non-parallel.")
parallel = False
else:
parallel = False
map = builtins.map
map = lambda x,y: list(builtins.map(x,y))

try:
yield map
finally:
if parallel:
p.close()
yield map


def convert_bunit(bunit):
Expand Down
236 changes: 143 additions & 93 deletions spectral_cube/spectral_cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import re
import itertools
import copy
import tempfile

import astropy.wcs
from astropy import units as u
Expand Down Expand Up @@ -87,6 +88,20 @@ def wrapper(*args, **kwargs):
wrapper.__doc__ += _NP_DOC
return wrapper

def _apply_function(arguments, outcube, function, **kwargs):
"""
Helper function to apply a function to a spectrum.
Needs to be declared toward the top of the code to allow pickling by
joblib.
"""
(spec, includemask, ii, jj) = arguments

if any(includemask):
outcube[:,jj,ii] = function(spec, **kwargs)
else:
outcube[:,jj,ii] = spec


# convenience structures to keep track of the reversed index
# conventions between WCS and numpy
np2wcs = {2: 0, 1: 1, 0: 2}
Expand Down Expand Up @@ -778,7 +793,7 @@ def _cube_on_cube_operation(self, function, cube, equivalencies=[], **kwargs):
return self._new_cube_with(data=data, unit=unit)

def apply_function(self, function, axis=None, weights=None, unit=None,
projection=False, progressbar=False,
projection=False, progressbar=False,
update_function=None, keep_shape=False, **kwargs):
"""
Apply a function to valid data along the specified axis or to the whole
Expand Down Expand Up @@ -2569,80 +2584,150 @@ def _gsmooth_image(args):

return newcube

def spectral_smooth_median(self, ksize, update_function=None, **kwargs):
def spectral_smooth_median(self, ksize,
use_memmap=True,
verbose=0,
num_cores=None,
**kwargs):
"""
Smooth the cube along the spectral dimension
Parameters
----------
ksize : int
Size of the median filter (scipy.ndimage.filters.median_filter)
update_function : method
Method that is called to update an external progressbar
If provided, it disables the default `astropy.utils.console.ProgressBar`
verbose : int
Verbosity level to pass to joblib
use_memmap : bool
If specified, a memory mapped temporary file on disk will be
written to rather than storing the intermediate spectra in memory.
num_cores : int or None
The number of cores to use if running in parallel
kwargs : dict
Passed to the convolve function
Not used at the moment.
"""

if not scipyOK:
raise ImportError("Scipy could not be imported: this function won't work.")

return self._apply_function_parallel_spectral(ndimage.filters.median_filter,
data=self.filled_data,
size=ksize,
num_cores=num_cores,
use_memmap=use_memmap,
**kwargs)

def _apply_function_parallel_spectral(self,
function,
data,
num_cores=None,
verbose=0,
use_memmap=True,
parallel=True,
**kwargs
):
"""
Apply a function in parallel along the spectral dimension. The
function will be performed on data with masked values replaced with the
cube's fill value.
Parameters
----------
function : function
The function to apply in the spectral dimension. It must take
two arguments: an array representing a spectrum and a boolean array
representing the mask. It may also accept **kwargs. The function
must return an object with the same shape as the input spectrum.
verbose : int
Verbosity level to pass to joblib
use_memmap : bool
If specified, a memory mapped temporary file on disk will be
written to rather than storing the intermediate spectra in memory.
num_cores : int or None
The number of cores to use if running in parallel
parallel : bool
If set to ``False``, will force the use of a single core without
using ``joblib``.
kwargs : dict
Passed to ``function``
"""
shape = self.shape

# "cubelist" is a generator
# the boolean check will skip smoothing for bad spectra
# 'spectra' is a generator
# the boolean check will skip the function for bad spectra
# TODO: should spatial good/bad be cached?
cubelist = ((self.filled_data[:,jj,ii],
self.mask.include(view=(slice(None), jj, ii)))
for jj in range(self.shape[1])
for ii in range(self.shape[2]))

if update_function is None:
pb = ProgressBar(shape[1] * shape[2])
update_function = pb.update

def _gsmooth_spectrum(args):
"""
Helper function to smooth a spectrum
"""
(spec, includemask),kwargs = args
update_function()
spectra = ((data[:,jj,ii],
self.mask.include(view=(slice(None), jj, ii)),
ii, jj,
)
for jj in range(shape[1])
for ii in range(shape[2]))

if any(includemask):
return ndimage.filters.median_filter(spec, size=ksize)
if use_memmap:
ntf = tempfile.NamedTemporaryFile()
outcube = np.memmap(ntf, mode='w+', shape=shape, dtype=np.float)
else:
if self._is_huge and not self.allow_huge_operations:
raise ValueError("Applying a function without ``use_memmap`` "
"requires loading the whole array into "
"memory *twice*, which can overload the "
"machine's memory for large cubes. Either "
"set ``use_memmap=True`` or set "
"``cube.allow_huge_operations=True`` to "
"override this restriction.")
outcube = np.empty(shape=shape, dtype=np.float)

if parallel and use_memmap:
# it is not possible to run joblib parallelization without memmap
try:
from joblib import Parallel, delayed

Parallel(n_jobs=num_cores,
verbose=verbose,
max_nbytes=None)(delayed(_apply_function)(arg,
outcube,
function,
**kwargs)
for arg in spectra)
except ImportError:
if num_cores is not None and num_cores > 1:
warnings.warn("Could not import joblib. Will run in serial.",
ImportError)
parallel = False

# this isn't an else statement because we want to catch the case where
# the above clause fails on ImportError
if not parallel or not use_memmap:
if verbose > 0:
progressbar = ProgressBar(self.shape[1]*self.shape[2])
pbu = progressbar.update
else:
return spec
pbu = object

# could be numcores, except _gsmooth_spectrum is unpicklable
with cube_utils._map_context(1) as map:
smoothcube_ = np.array([x for x in
map(_gsmooth_spectrum, zip(cubelist,
itertools.cycle([kwargs]),
)
)
]
)
for arg in spectra:
_apply_function(arg, outcube, function, **kwargs)
pbu()

# empirical: need to swapaxes to get shape right
# cube = np.arange(6*5*4).reshape([4,5,6]).swapaxes(0,2)
# cubelist.T.reshape(cube.shape) == cube
smoothcube = smoothcube_.T.reshape(shape)

# TODO: do something about the mask?
newcube = self._new_cube_with(data=smoothcube, wcs=self.wcs,
newcube = self._new_cube_with(data=outcube, wcs=self.wcs,
mask=self.mask, meta=self.meta,
fill_value=self.fill_value)

return newcube


def spectral_smooth(self, kernel,
#numcores=None,
convolve=convolution.convolve,
update_function=None,
verbose=0,
use_memmap=True,
num_cores=None,
**kwargs):
"""
Smooth the cube along the spectral dimension
Note that the mask is left unchanged in this operation.
Parameters
----------
kernel : `~astropy.convolution.Kernel1D`
Expand All @@ -2651,61 +2736,25 @@ def spectral_smooth(self, kernel,
The astropy convolution function to use, either
`astropy.convolution.convolve` or
`astropy.convolution.convolve_fft`
update_function : method
Method that is called to update an external progressbar
If provided, it disables the default `astropy.utils.console.ProgressBar`
verbose : int
Verbosity level to pass to joblib
use_memmap : bool
If specified, a memory mapped temporary file on disk will be
written to rather than storing the intermediate spectra in memory.
num_cores : int or None
The number of cores to use if running in parallel
kwargs : dict
Passed to the convolve function
"""

shape = self.shape

# "cubelist" is a generator
# the boolean check will skip smoothing for bad spectra
# TODO: should spatial good/bad be cached?
cubelist = ((self.filled_data[:,jj,ii],
self.mask.include(view=(slice(None), jj, ii)))
for jj in range(self.shape[1])
for ii in range(self.shape[2]))

if update_function is None:
pb = ProgressBar(shape[1] * shape[2])
update_function = pb.update

def _gsmooth_spectrum(args):
"""
Helper function to smooth a spectrum
"""
(spec, includemask),kernel,kwargs = args
update_function()

if any(includemask):
return convolve(spec, kernel, normalize_kernel=True, **kwargs)
else:
return spec

# could be numcores, except _gsmooth_spectrum is unpicklable
with cube_utils._map_context(1) as map:
smoothcube_ = np.array([x for x in
map(_gsmooth_spectrum, zip(cubelist,
itertools.cycle([kernel]),
itertools.cycle([kwargs]),
)
)
]
)

# empirical: need to swapaxes to get shape right
# cube = np.arange(6*5*4).reshape([4,5,6]).swapaxes(0,2)
# cubelist.T.reshape(cube.shape) == cube
smoothcube = smoothcube_.T.reshape(shape)

# TODO: do something about the mask?
newcube = self._new_cube_with(data=smoothcube, wcs=self.wcs,
mask=self.mask, meta=self.meta,
fill_value=self.fill_value)

return newcube
return self._apply_function_parallel_spectral(convolve,
data=self.filled_data,
kernel=kernel,
normalize_kernel=True,
num_cores=num_cores,
use_memmap=use_memmap,
verbose=verbose,
**kwargs)

def spectral_interpolate(self, spectral_grid,
suppress_smooth_warning=False,
Expand Down Expand Up @@ -2769,6 +2818,7 @@ def spectral_interpolate(self, spectral_grid,
warnings.warn("Input grid has too small a spacing. The data should "
"be smoothed prior to resampling.")


newcube = np.empty([spectral_grid.size, self.shape[1], self.shape[2]],
dtype=cubedata[:1, 0, 0].dtype)
newmask = np.empty([spectral_grid.size, self.shape[1], self.shape[2]],
Expand Down
Loading

0 comments on commit 8f4f3f9

Please sign in to comment.