From b6b789282d707ab8589eb0f1a68cc151d3eb2b71 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Fri, 2 Aug 2024 00:20:17 -0600 Subject: [PATCH] general improvements --- docs/source/conf.py | 26 +- pyproject.toml | 12 +- solpolpy/alpha.py | 11 +- solpolpy/constants.py | 12 +- solpolpy/core.py | 171 ++++++------- solpolpy/errors.py | 14 +- solpolpy/graph.py | 83 ++----- solpolpy/instruments.py | 56 +++-- solpolpy/plotting.py | 54 +++-- solpolpy/polarizers.py | 474 ------------------------------------ solpolpy/system.py | 44 ++++ solpolpy/transforms.py | 474 ++++++++++++++++++++++++++++++++++++ solpolpy/util.py | 34 +++ tests/fixtures.py | 141 +++++++++++ tests/test_core.py | 175 +++----------- tests/test_instruments.py | 14 +- tests/test_plotting.py | 14 +- tests/test_polarizers.py | 490 +++++++++++++++++++------------------- 18 files changed, 1193 insertions(+), 1106 deletions(-) delete mode 100644 solpolpy/polarizers.py create mode 100644 solpolpy/system.py create mode 100644 solpolpy/transforms.py create mode 100644 solpolpy/util.py create mode 100644 tests/fixtures.py diff --git a/docs/source/conf.py b/docs/source/conf.py index a11be2b..8a4b627 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -18,9 +18,9 @@ # -- Project information ----------------------------------------------------- from solpolpy import __version__ -project = 'solpolpy' -copyright = '2023, PUNCH Science Operations Center' -author = 'PUNCH Science Operations Center' +project = "solpolpy" +copyright = "2023, PUNCH Science Operations Center" +author = "PUNCH Science Operations Center" # The full version, including alpha/beta/rc tags release = __version__ @@ -32,14 +32,14 @@ # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. -extensions = ['autoapi.extension', - 'sphinx.ext.autodoc', - 'sphinx.ext.napoleon', - 'nbsphinx', - 'IPython.sphinxext.ipython_console_highlighting'] +extensions = ["autoapi.extension", + "sphinx.ext.autodoc", + "sphinx.ext.napoleon", + "nbsphinx", + "IPython.sphinxext.ipython_console_highlighting"] # Add any paths that contain templates here, relative to this directory. -templates_path = ['_templates'] +templates_path = ["_templates"] # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. @@ -54,7 +54,7 @@ # html_theme = "pydata_sphinx_theme" html_show_sourcelink = False -html_static_path = ['_static'] +html_static_path = ["_static"] html_theme_options = { "use_edit_page_button": True, "icon_links": [ @@ -63,10 +63,10 @@ "url": "https://github.com/punch-mission/solpolpy", "icon": "fa-brands fa-github", "type": "fontawesome", - } + }, ], "show_nav_level": 1, - "show_toc_level": 3 + "show_toc_level": 3, } html_context = { # "github_url": "https://github.com", # or your GitHub Enterprise site @@ -77,4 +77,4 @@ } -autoapi_dirs = ['../../solpolpy'] +autoapi_dirs = ["../../solpolpy"] diff --git a/pyproject.toml b/pyproject.toml index aea9df1..06c89f5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "solpolpy" -version = "0.2.0" +version = "0.3.0" authors = [ { name="J. Marcus Hughes", email="mhughes@boulder.swri.edu"}, { name="Matthew J. West", email="mwest@boulder.swri.edu"}, @@ -11,7 +11,7 @@ authors = [ description = "Solar polarization resolver for any instrument" readme = "README.md" license = { file="LICENSE" } -requires-python = ">=3.10" +requires-python = ">=3.11" classifiers = [ "Programming Language :: Python :: 3", "License :: OSI Approved :: MIT License", @@ -34,6 +34,8 @@ test = [ 'ruff' ] +dev = ["solpolpy[test]", "pre-commit"] + [build-system] requires = ["setuptools>=61.0"] build-backend = "setuptools.build_meta" @@ -44,6 +46,12 @@ packages = ['solpolpy'] [tool.ruff] line-length = 120 ignore-init-module-imports = true +#lint.select = ["ALL"] +lint.ignore = ["F403"] + +[tool.ruff.lint.per-file-ignores] +"__init__.py" = ["E402"] +"tests/*.py" = ['S101'] [tool.isort] balanced_wrapping = true diff --git a/solpolpy/alpha.py b/solpolpy/alpha.py index 7f8378f..76dcedc 100644 --- a/solpolpy/alpha.py +++ b/solpolpy/alpha.py @@ -1,11 +1,11 @@ -""" Functions related to constructing an alpha array for transformation""" +"""Functions related to constructing an alpha array for transformation.""" import astropy.units as u import numpy as np def radial_north(shape): - """An alpha array oriented west + """An alpha array oriented west. Parameters ---------- @@ -18,11 +18,12 @@ def radial_north(shape): alpha array used in calculations Notes - ------ + ----- - assumes solar north is up - assumes polarizer 0 is along solar north axis - creates radial polarization map - angles increase in counterclockwise direction + """ x_size, y_size = shape x = np.arange(-x_size // 2, x_size // 2) @@ -31,5 +32,5 @@ def radial_north(shape): return np.rot90(np.fliplr(np.arctan2(yy, xx)+np.pi), k=1)*u.radian -ALPHA_FUNCTIONS = {'radial_north': radial_north, - 'zeros': np.zeros} +ALPHA_FUNCTIONS = {"radial_north": radial_north, + "zeros": np.zeros} diff --git a/solpolpy/constants.py b/solpolpy/constants.py index 2e23461..4294871 100644 --- a/solpolpy/constants.py +++ b/solpolpy/constants.py @@ -1,16 +1,6 @@ -"""Constants used in code""" +"""Constants used in code.""" import astropy.units as u -VALID_KINDS = {"mzp": [["Bm", "Bz", "Bp", "alpha"], ["Bm", "Bz", "Bp"]], - "bpb": [["B", "pB", "alpha"], ["B", "pB"]], - "btbr": [["Bt", "Br", "alpha"], ["Bt", "Br"]], - "stokes": [["Bi", "Bq", "Bu"], ["Bi", "Bq", "Bu", "alpha"]], - "bp3": [["B", "pB", "pBp", "alpha"], ["B", "pB", "pBp"]], - "bthp": [["B", "theta", "p"]], - "fourpol": [["B0", "B90", "B45", "B135"]], # fourpol comes before npol to force it instead of npol - "npol": [["B*"]], # star indicates angle in degrees, as many as desired are supported - } - # offset angles come from https://www.sciencedirect.com/science/article/pii/S0019103515003620?via%3Dihub STEREOA_OFFSET_ANGLE = 45.8 * u.degree STEREOB_OFFSET_ANGLE = -18 * u.degree diff --git a/solpolpy/core.py b/solpolpy/core.py index 2750596..9fcba5f 100644 --- a/solpolpy/core.py +++ b/solpolpy/core.py @@ -1,4 +1,4 @@ -"""Core transformation functions for solpolpy""" +"""Core transformation functions for solpolpy.""" import typing as t import astropy.units as u @@ -7,17 +7,19 @@ from ndcube import NDCollection, NDCube from solpolpy.alpha import radial_north -from solpolpy.constants import STEREOA_OFFSET_ANGLE, STEREOB_OFFSET_ANGLE, VALID_KINDS +from solpolpy.constants import STEREOA_OFFSET_ANGLE, STEREOB_OFFSET_ANGLE from solpolpy.errors import UnsupportedTransformationError -from solpolpy.graph import transform_graph +from solpolpy.graph import SYSTEM_REQUIRED_KEYS, System, transform_graph from solpolpy.instruments import load_data -def resolve(input_data: t.Union[t.List[str], NDCollection], +@u.quantity_input +def resolve(input_data: list[str] | NDCollection, out_system: str, imax_effect: bool = False, - out_angles: t.Optional[t.List[float]] = None) -> NDCollection: - """ Apply a polarization transformation to a set of input dataframes. + out_angles: u.degree = None) -> NDCollection: + # TODO: improve this documentation + """Apply a polarization transformation to a set of input dataframes. Parameters ---------- @@ -50,6 +52,7 @@ def resolve(input_data: t.Union[t.List[str], NDCollection], ------- NDCollection The transformed data are returned as a NDCollection. + """ # standardize out_system to all lowercase out_system = out_system.lower() @@ -59,45 +62,42 @@ def resolve(input_data: t.Union[t.List[str], NDCollection], input_kind = determine_input_kind(input_data) - input_key = list(input_data) + input_keys = list(input_data.keys()) transform_path = get_transform_path(input_kind, out_system) equation = get_transform_equation(transform_path) - requires_alpha = check_alpha_requirement(transform_path) if imax_effect: - if input_kind == 'mzp' and out_system == 'mzp': + if input_kind == "mzp" and out_system == "mzp": # TODO: does this ever get triggered? input_data = resolve_imax_effect(input_data) else: - raise UnsupportedTransformationError('IMAX effect applies only for MZP->MZP solpolpy transformations') + msg = "IMAX effect applies only for MZP->MZP transformations." + raise UnsupportedTransformationError(msg) - if requires_alpha and "alpha" not in input_key: + if requires_alpha(equation) and "alpha" not in input_keys: input_data = add_alpha(input_data) - result = equation(input_data, + return equation(input_data, offset_angle=determine_offset_angle(input_data), out_angles=out_angles) - return result def determine_offset_angle(input_collection: NDCollection) -> float: - """Get the instrument specific offset angle""" - if 'B0.0' in input_collection: - match input_collection['B0.0'].meta.get('OBSRVTRY', 'BLANK'): - case 'STEREO_A': - offset_angle = STEREOA_OFFSET_ANGLE - case 'STEREO_B': - offset_angle = STEREOB_OFFSET_ANGLE - case _: - offset_angle = 0 * u.degree - else: - offset_angle = 0 * u.degree + """Get the instrument specific offset angle.""" + first_key = next(iter(input_collection.keys())) + match input_collection[first_key].meta.get("OBSRVTRY", "BLANK"): + case "STEREO_A": + offset_angle = STEREOA_OFFSET_ANGLE + case "STEREO_B": + offset_angle = STEREOB_OFFSET_ANGLE + case _: + offset_angle = 0 * u.degree return offset_angle -def determine_input_kind(input_data: NDCollection) -> str: - """Determine what kind of data was input in the NDCollection +def determine_input_kind(input_data: NDCollection) -> System: + """Determine what kind of data was input in the NDCollection. Parameters ---------- @@ -108,27 +108,31 @@ def determine_input_kind(input_data: NDCollection) -> str: ------- str a valid input kind, see documentation of `resolve` for the full list under `out_system` + """ - input_keys = list(input_data) - for valid_kind, param_list in VALID_KINDS.items(): - if valid_kind == "npol": - try: - key_angles = [float(k[1:]) for k in set(input_keys) if k != "alpha"] - except ValueError: - msg = "npol polarization system keys must be like BANG where ANG is any angle in degrees, e.g. B30.4" - raise ValueError(msg) - else: - if len(key_angles) >= 1: # must be at least one angle - return "npol" - else: - for param_option in param_list: - if set(input_keys) == set(param_option): - return valid_kind - raise ValueError("Unidentified Polarization System.") + input_keys = set(input_data) + input_keys.discard("alpha") + if len(input_keys) == 0: + msg = "Found no cubes in the `input_data` collection." + raise ValueError(msg) + + for valid_kind, param_set in SYSTEM_REQUIRED_KEYS.items(): + if valid_kind != System.npol and param_set == input_keys: + return valid_kind + try: + input_keys_quantities = [u.Quantity(key) for key in input_keys] + except TypeError: + pass + else: + if all(u.get_physical_type(q) == "angle" for q in input_keys_quantities): + return System.npol + msg = "Could not determine input transformation." + raise UnsupportedTransformationError(msg) -def get_transform_path(input_kind: str, output_kind: str) -> t.List[str]: - """Given an input and output system type, determine the require path of transforms from the transform graph + +def get_transform_path(input_kind: str, output_kind: str) -> list[str]: + """Given an input and output system type, determine the require path of transforms from the transform graph. Parameters ---------- @@ -142,16 +146,18 @@ def get_transform_path(input_kind: str, output_kind: str) -> t.List[str]: ------- List[str] a list of transformation identifiers used to convert from `input_kind` to `output_kind` + """ try: path = nx.shortest_path(transform_graph, input_kind, output_kind) except nx.exception.NetworkXNoPath: - raise UnsupportedTransformationError(f"Not possible to convert {input_kind} to {output_kind}") + msg = f"Not possible to convert {input_kind} to {output_kind}" + raise UnsupportedTransformationError(msg) return path -def get_transform_equation(path: t.List[str]) -> t.Callable: - """Given a transform path, compose the equation, i.e. the composed function of transforms that executes that path +def get_transform_equation(path: list[str]) -> t.Callable: + """Given a transform path, compose the equation, i.e. the composed function of transforms that executes that path. Parameters ---------- @@ -162,17 +168,18 @@ def get_transform_equation(path: t.List[str]) -> t.Callable: ------- Callable a function that executes the transformation + """ current_function = identity for i, step_start in enumerate(path[:-1]): step_end = path[i + 1] - current_function = _compose2(transform_graph.get_edge_data(step_start, step_end)['func'], + current_function = _compose2(transform_graph.get_edge_data(step_start, step_end)["func"], current_function) return current_function -def check_alpha_requirement(path: t.List[str]) -> bool: - """Determine if an alpha array is required for this transformation path +def requires_alpha(func: t.Callable) -> bool: + """Determine if an alpha array is required for this transformation path. Parameters ---------- @@ -183,20 +190,16 @@ def check_alpha_requirement(path: t.List[str]) -> bool: ------- bool whether alpha array is required for transformation + """ - requires_alpha = False - for i, step_start in enumerate(path[:-1]): - step_end = path[i + 1] - requires_alpha = transform_graph.get_edge_data(step_start, step_end)['requires_alpha'] or requires_alpha - return requires_alpha + return getattr(func, "uses_alpha", False) def generate_imax_matrix(array_shape) -> np.ndarray: - """ - Define an A matrix with which to convert MZP^ (camera coords) = A x MZP (solar coords) + """Define an A matrix with which to convert MZP^ (camera coords) = A x MZP (solar coords). Parameters - ------- + ---------- array_shape Defined input WCS array shape for matrix generation @@ -206,10 +209,11 @@ def generate_imax_matrix(array_shape) -> np.ndarray: Output A matrix used in converting between camera coordinates and solar coordinates """ - # Ideal MZP wrt Solar North + # TODO fix variable name thmzp = [-60, 0, 60] * u.degree + # TODO don't hard code extents butg get from the file long_arr, lat_arr = np.meshgrid(np.linspace(-20, 20, array_shape[0]), np.linspace(-20, 20, array_shape[1])) # Foreshortening (IMAX) effect on polarizer angle @@ -230,11 +234,10 @@ def generate_imax_matrix(array_shape) -> np.ndarray: def resolve_imax_effect(input_data: NDCollection) -> NDCollection: - """ - Resolves the IMAX effect for provided input data, correcting measured polarization angles for wide FOV imagers. + """Resolves the IMAX effect for provided input data, correcting measured polarization angles for wide FOV imagers. Parameters - ------- + ---------- input_data : NDCollection Input data on which to correct foreshortened polarization angles @@ -244,21 +247,21 @@ def resolve_imax_effect(input_data: NDCollection) -> NDCollection: Output data with corrected polarization angles """ - - data_shape = input_data['Bm'].data.shape + data_shape = _determine_image_shape(input_data) data_mzp_camera = np.zeros([data_shape[0], data_shape[1], 3, 1]) - for i, key in enumerate(['Bm', 'Bz', 'Bp']): + for i, key in enumerate(["M", "Z", "P"]): data_mzp_camera[:, :, i, 0] = input_data[key].data imax_matrix = generate_imax_matrix(data_shape) try: imax_matrix_inv = np.linalg.inv(imax_matrix) except np.linalg.LinAlgError as err: - if 'Singular matrix' in str(err): - raise ValueError('Singular IMAX effect matrix is degenerate') + if "Singular matrix" in str(err): + msg = "Singular IMAX effect matrix is degenerate" + raise ValueError(msg) else: - raise err + raise data_mzp_solar = np.matmul(imax_matrix_inv, data_mzp_camera) @@ -268,8 +271,8 @@ def resolve_imax_effect(input_data: NDCollection) -> NDCollection: return input_data -def _determine_image_shape(input_collection: NDCollection) -> t.Tuple[int, int]: - """Evaluates the shape of the image in the input NDCollection +def _determine_image_shape(input_collection: NDCollection) -> tuple[int, int]: + """Evaluates the shape of the image in the input NDCollection. Parameters ---------- @@ -280,14 +283,14 @@ def _determine_image_shape(input_collection: NDCollection) -> t.Tuple[int, int]: ------- Tuple[int, int] shape of image data + """ keys = list(input_collection.keys()) - shape = input_collection[keys[0]].data.shape - return shape + return input_collection[keys[0]].data.shape def add_alpha(input_data: NDCollection) -> NDCollection: - """Adds an alpha array to an image NDCollection + """Adds an alpha array to an image NDCollection. Parameters ---------- @@ -298,24 +301,27 @@ def add_alpha(input_data: NDCollection) -> NDCollection: ------- NDCollection dataset with alpha array appended + """ # test if alpha exists. if not check if alpha keyword added. if not create default alpha with warning. img_shape = _determine_image_shape(input_data) - keys = list(input_data) + keys = list(input_data.keys()) wcs = input_data[keys[0]].wcs - metad = input_data[keys[0]].meta + meta = input_data[keys[0]].meta if len(img_shape) == 2: # it's an image and not just an array alpha = radial_north(img_shape) else: - raise ValueError(f"Data must be an image with 2 dimensions, found {len(img_shape)}.") - input_data.update(NDCollection([("alpha", NDCube(alpha, wcs=wcs, meta=metad))], meta={}, aligned_axes='all')) + msg = f"Data must be an image with 2 dimensions, found {len(img_shape)}." + raise ValueError(msg) + input_data.update(NDCollection([("alpha", NDCube(alpha, wcs=wcs, meta=meta))], meta={}, + aligned_axes="all")) return input_data def _compose2(f: t.Callable, g: t.Callable) -> t.Callable: - """Compose 2 functions together, i.e. f(g(x)) + """Compose 2 functions together, i.e. f(g(x)). Parameters ---------- @@ -328,12 +334,16 @@ def _compose2(f: t.Callable, g: t.Callable) -> t.Callable: ------- Callable composed function + """ - return lambda *a, **kw: f(g(*a, **kw), **kw) + def out(*a, **kw): + return f(g(*a, **kw), **kw) + out.uses_alpha = getattr(f, "uses_alpha", False) or getattr(g, "uses_alpha", False) + return out def identity(x: t.Any, **kwargs) -> t.Any: - """Identity function that returns the input + """Identity function that returns the input. Parameters ---------- @@ -344,5 +354,6 @@ def identity(x: t.Any, **kwargs) -> t.Any: ------- Any input value returned back + """ return x diff --git a/solpolpy/errors.py b/solpolpy/errors.py index a4f69ea..e75a2e4 100644 --- a/solpolpy/errors.py +++ b/solpolpy/errors.py @@ -1,10 +1,18 @@ -class TooFewFilesError(Exception): +class SolpolpyError(Exception): pass -class UnsupportedInstrumentError(Exception): +class TooFewFilesError(SolpolpyError): pass -class UnsupportedTransformationError(Exception): +class UnsupportedInstrumentError(SolpolpyError): + pass + + +class UnsupportedTransformationError(SolpolpyError): + pass + + +class MissingAlphaError(SolpolpyError): pass diff --git a/solpolpy/graph.py b/solpolpy/graph.py index da4f1b5..7ed4f94 100644 --- a/solpolpy/graph.py +++ b/solpolpy/graph.py @@ -1,63 +1,28 @@ -"""Constructs transformation graph""" +"""Constructs transformation graph.""" +from enum import StrEnum +from inspect import getmembers, isfunction + +import astropy.units as u import networkx as nx -from solpolpy.polarizers import ( - bp3_to_bthp, - bp3_to_mzp, - bpb_to_btbr, - bpb_to_mzp, - btbr_to_bpb, - btbr_to_mzp, - btbr_to_npol, - fourpol_to_stokes, - mzp_to_bp3, - mzp_to_bpb, - mzp_to_npol, - mzp_to_stokes, - npol_to_mzp, - stokes_to_mzp, -) +from solpolpy import transforms + +System = StrEnum("System", ["bpb", "npol", "stokes", "mzp", "btbr", "bthp", "fourpol", "bp3"]) +SYSTEM_REQUIRED_KEYS = {System.bpb: {"B", "pB"}, + System.npol: set(), + System.stokes: {"I", "Q", "U"}, + System.mzp: {"M", "Z", "P"}, + System.btbr: {"Bt", "Br"}, + System.bp3: {"B", "pB", "pBp"}, + System.bthp: {"B", "theta", "p"}, + System.fourpol: {str(q) for q in [0.0, 45.0, 90.0, 135.0] * u.degree}, + } transform_graph = nx.DiGraph() -transform_graph.add_edge("npol", "mzp", - func=npol_to_mzp, - requires_alpha=False) -transform_graph.add_edge("mzp", "bpb", - func=mzp_to_bpb, - requires_alpha=True) -transform_graph.add_edge("bpb", "mzp", - func=bpb_to_mzp, - requires_alpha=True) -transform_graph.add_edge("bpb", "btbr", - func=bpb_to_btbr, - requires_alpha=False) -transform_graph.add_edge("btbr", "bpb", - func=btbr_to_bpb, - requires_alpha=False) -transform_graph.add_edge("mzp", "stokes", - func=mzp_to_stokes, - requires_alpha=False) -transform_graph.add_edge("stokes", "mzp", - func=stokes_to_mzp, - requires_alpha=False) -transform_graph.add_edge("mzp", "bp3", - func=mzp_to_bp3, - requires_alpha=True) -transform_graph.add_edge("bp3", "mzp", - func=bp3_to_mzp, - requires_alpha=True) -transform_graph.add_edge("btbr", "mzp", - func=btbr_to_mzp, - requires_alpha=True) -transform_graph.add_edge("bp3", "bthp", - func=bp3_to_bthp, - requires_alpha=True) -transform_graph.add_edge("btbr", "npol", - func=btbr_to_npol, - requires_alpha=True) -transform_graph.add_edge("fourpol", "stokes", - func=fourpol_to_stokes, - requires_alpha=False) -transform_graph.add_edge("mzp", "npol", - func=mzp_to_npol, - requires_alpha=False) +polarizer_functions = getmembers(transforms, isfunction) +for function_name, function in polarizer_functions: + if "_to_" in function_name: + source, destination = function_name.split("_to_") + transform_graph.add_edge(System[source.lower()], + System[destination.lower()], + func=function) diff --git a/solpolpy/instruments.py b/solpolpy/instruments.py index 9ccf238..4e1691d 100644 --- a/solpolpy/instruments.py +++ b/solpolpy/instruments.py @@ -1,5 +1,4 @@ -"""Instrument specific code""" -import typing as t +"""Instrument specific code.""" import numpy as np from astropy.io import fits @@ -19,15 +18,17 @@ def get_data_angle(header): try: angle = float(angle_hdr) except ValueError: - raise UnsupportedInstrumentError("Polar angle in the header could not be read for this instrument.") + msg = "Polar angle in the header could not be read for this instrument." + raise UnsupportedInstrumentError(msg) return angle -def load_data(path_list: t.List[str], - mask: t.Optional[np.ndarray] = None, +def load_data(path_list: list[str], + mask: np.ndarray | None = None, use_instrument_mask: bool = False) -> NDCollection: """Basic loading function. See `load_with_occulter_mask`. + Parameters ---------- path_list: List[str] @@ -44,13 +45,15 @@ def load_data(path_list: t.List[str], NDCollection The data are loaded as NDCollection object with WCS and header information available. The keys are labeled as 'angle_1', 'angle_2, 'angle_3', ... + """ # get length of list to determine how many files to process. if len(path_list) < 2: - raise TooFewFilesError("Requires at least 2 FITS files") + msg = "Requires at least 2 FITS files" + raise TooFewFilesError(msg) data_out = [] - for i, data_path in enumerate(path_list): + for _i, data_path in enumerate(path_list): with fits.open(data_path) as hdul: wcs = WCS(hdul[0].header) @@ -75,7 +78,7 @@ def construct_mask(inner_radius: float, outer_radius: float, center_x: int, center_y: int, - shape: t.Tuple[int, int]) -> np.ndarray: + shape: tuple[int, int]) -> np.ndarray: """Constructs a mask where False indicates the pixel is valid and True masks the pixel as invalid. Pixels with radial distance between `inner_radius` and `outer_radius` are valid. Every other pixel is invalid. @@ -99,6 +102,7 @@ def construct_mask(inner_radius: float, np.ndarray a coronograph mask that marks invalid pixels (those with radius less than `inner_radius` or greater than `outer_radius`) as True + """ xx, yy = np.ogrid[0:shape[0], 0:shape[1]] mask = np.zeros(shape, dtype=bool) @@ -108,7 +112,7 @@ def construct_mask(inner_radius: float, def get_instrument_mask(header: fits.Header) -> np.ndarray: - """ Gets a coronograph mask for common instruments + """Gets a coronograph mask for common instruments. Supports common solar instruments: LASCO, COSMO K, SECCHI @@ -121,34 +125,38 @@ def get_instrument_mask(header: fits.Header) -> np.ndarray: ------- np.ndarray a pixel mask where valid pixels are marked False, masked out invalid pixels are flagged as True + """ - x_shape = header['NAXIS1'] - y_shape = header['NAXIS2'] + x_shape = header["NAXIS1"] + y_shape = header["NAXIS2"] - center_x = header['CRPIX1'] - center_y = header['CRPIX2'] + center_x = header["CRPIX1"] + center_y = header["CRPIX2"] - R_sun = 960. / header['CDELT1'] # TODO: clarify where 960. comes from + R_sun = 960. / header["CDELT1"] # TODO: clarify where 960. comes from radius = 0 - if header['INSTRUME'] == 'LASCO': - if 'C2' in header['DETECTOR']: + if header["INSTRUME"] == "LASCO": + if "C2" in header["DETECTOR"]: radius = 2.5 - elif 'C3' in header['DETECTOR']: + elif "C3" in header["DETECTOR"]: radius = 4.0 else: - raise UnsupportedInstrumentError("The data in this file is not formatted according to a known instrument.") - elif header['INSTRUME'] == 'COSMO K-Coronagraph': + msg = "The data in this file is not formatted according to a known instrument." + raise UnsupportedInstrumentError(msg) + elif header["INSTRUME"] == "COSMO K-Coronagraph": radius = 1.15 - elif header['INSTRUME'] == 'SECCHI': - if 'COR1' in header['DETECTOR']: + elif header["INSTRUME"] == "SECCHI": + if "COR1" in header["DETECTOR"]: radius = 1.57 - elif 'COR2' in header['DETECTOR']: + elif "COR2" in header["DETECTOR"]: radius = 3.0 else: - raise UnsupportedInstrumentError("The data in this file is not formatted according to a known instrument.") + msg = "The data in this file is not formatted according to a known instrument." + raise UnsupportedInstrumentError(msg) else: - raise UnsupportedInstrumentError("The data in this file is not formatted according to a known instrument.") + msg = "The data in this file is not formatted according to a known instrument." + raise UnsupportedInstrumentError(msg) y_array = [(j_step - center_y) / R_sun for j_step in range(y_shape)] outer_edge = np.max(y_array) * R_sun diff --git a/solpolpy/plotting.py b/solpolpy/plotting.py index 1ae1028..51caafa 100644 --- a/solpolpy/plotting.py +++ b/solpolpy/plotting.py @@ -13,15 +13,15 @@ def plot_collection(collection: NDCollection, show_colorbar=False, lat_ticks=None, lon_ticks=None, - major_formatter='dd', + major_formatter="dd", xlabel="HP Longitude", ylabel="HP Latitude", vmin=None, vmax=None, - cmap='Greys_r', + cmap="Greys_r", ignore_alpha=True, fontsize=18): - """Plot a solpolpy NDCollection input or output + """Plot a solpolpy NDCollection input or output. Parameters ---------- @@ -56,6 +56,7 @@ def plot_collection(collection: NDCollection, ------- Matplotlib figure and axes the plotted figure and axes are returned for any additional edits + """ ax_count = len(collection) collection_keys = list(collection.keys()) @@ -80,7 +81,7 @@ def plot_collection(collection: NDCollection, ncols=ax_count, figsize=figsize, sharey=True, - subplot_kw={'projection': collection[collection_keys[0]].wcs}) + subplot_kw={"projection": collection[collection_keys[0]].wcs}) for i in range(ax_count): this_cube = collection[collection_keys[i]] this_cube.plot(axes=axs[i], cmap=cmap, vmin=vmin[i], vmax=vmax[i]) @@ -92,17 +93,18 @@ def plot_collection(collection: NDCollection, axs[i].coords[1].set_major_formatter(major_formatter) axs[i].set_xlabel(xlabel, fontsize=fontsize) axs[i].set_ylabel(ylabel, fontsize=fontsize) - axs[i].tick_params(axis='both', labelsize=fontsize) - axs[i].grid(color='white', ls='dotted') + axs[i].tick_params(axis="both", labelsize=fontsize) + axs[i].grid(color="white", ls="dotted") if show_colorbar: - fig.colorbar(im, orientation='horizontal', ax=axs, shrink=0.9) + fig.colorbar(im, orientation="horizontal", ax=axs, shrink=0.9) return fig, axs def get_colormap_str(meta: fits.Header) -> str: - """Retrieve a color map name from an input FITS file + """Retrieve a color map name from an input FITS file. + Parameters ---------- meta : fits.Header @@ -112,30 +114,30 @@ def get_colormap_str(meta: fits.Header) -> str: ------- str name of appropriate colormap - """ - if meta['INSTRUME'] == 'LASCO': - detector_name = meta['DETECTOR'] - if 'C2' in detector_name: - color_map = 'soholasco2' - elif 'C3' in detector_name: - color_map = 'soholasco3' + """ + if meta["INSTRUME"] == "LASCO": + detector_name = meta["DETECTOR"] + if "C2" in detector_name: + color_map = "soholasco2" + elif "C3" in detector_name: + color_map = "soholasco3" else: warnings.warn("No valid instrument found, setting color_map soholasco2") - color_map = 'soholasco2' - elif meta['INSTRUME'] == 'COSMO K-Coronagraph': - color_map = 'kcor' - elif meta['INSTRUME'] == 'SECCHI': - detector_name = meta['DETECTOR'] - if 'COR1' in detector_name: - color_map = 'stereocor1' - elif 'COR2' in detector_name: - color_map = 'stereocor2' + color_map = "soholasco2" + elif meta["INSTRUME"] == "COSMO K-Coronagraph": + color_map = "kcor" + elif meta["INSTRUME"] == "SECCHI": + detector_name = meta["DETECTOR"] + if "COR1" in detector_name: + color_map = "stereocor1" + elif "COR2" in detector_name: + color_map = "stereocor2" else: warnings.warn("No valid instrument found, setting color_map soholasco2") - color_map = 'soholasco2' + color_map = "soholasco2" else: warnings.warn("No valid instrument found, setting color_map soholasco2") - color_map = 'soholasco2' + color_map = "soholasco2" return color_map diff --git a/solpolpy/polarizers.py b/solpolpy/polarizers.py deleted file mode 100644 index c5d6dfe..0000000 --- a/solpolpy/polarizers.py +++ /dev/null @@ -1,474 +0,0 @@ -""" -Polarizer functions for solpolpy. -Found in: -DeForest, C. E., Seaton, D. B., & West, M. J. (2022). -Three-polarizer Treatment of Linear Polarization in Coronagraphs and Heliospheric Imagers. -The Astrophysical Journal, 927(1), 98. -""" -import copy - -import astropy.units as u -import numpy as np -from ndcube import NDCollection, NDCube - - -def conv_polar_from_head(input_cube): - return int(float(str(input_cube.meta['POLAR']).strip(" Deg"))) - - -# TODO: prepare a config file where the reference angle say of STEREO, KCor etc can be set -def npol_to_mzp(input_collection, offset_angle=0, **kwargs): - """ - Notes - ------ - Equation 44 in DeForest et al. 2022. - - """"" - input_keys = list(input_collection.keys()) - input_dict = {} - in_list = list(input_collection) - - for p_angle in in_list: - if p_angle == "alpha": - break - input_dict[(conv_polar_from_head(input_collection[p_angle])) * u.degree] = input_collection[p_angle].data - - mzp_ang = [-60 * u.degree, 0 * u.degree, 60 * u.degree] - Bmzp = {} - for ang in mzp_ang: - Bmzp[ang] = ((1 / 3) * np.sum([ith_polarizer_brightness * (1 + 2 * np.cos(2 * (ang - (ith_angle-offset_angle)))) - for ith_angle, ith_polarizer_brightness in input_dict.items()], axis=0)) - - # todo: update header properly; time info? - metaM, metaZ, metaP = (copy.copy(input_collection[input_keys[0]].meta), - copy.copy(input_collection[input_keys[0]].meta), - copy.copy(input_collection[input_keys[0]].meta)) - metaM.update(POLAR=-60) - metaZ.update(POLAR=0) - metaP.update(POLAR=60) - mask = combine_masks(*[input_collection[k].mask for k in input_keys]) - Bmzp_cube = [] - Bmzp_cube.append(("Bm", NDCube(Bmzp[-60 * u.degree], wcs=input_collection[input_keys[0]].wcs, mask=mask, meta=metaM))) - Bmzp_cube.append(("Bz", NDCube(Bmzp[0 * u.degree], wcs=input_collection[input_keys[0]].wcs, mask=mask, meta=metaZ))) - Bmzp_cube.append(("Bp", NDCube(Bmzp[60 * u.degree], wcs=input_collection[input_keys[0]].wcs, mask=mask, meta=metaP))) - for p_angle in in_list: - if p_angle.lower() == "alpha": - Bmzp_cube.append(("alpha", NDCube(input_collection['alpha'].data * u.radian, - wcs=input_collection[input_keys[0]].wcs, - meta=metaP))) - return NDCollection(Bmzp_cube, meta={}, aligned_axes="all") - - -def mzp_to_bpb(input_collection, **kwargs): - """ - Notes - ------ - Equation 7 and 9 in DeForest et al. 2022. - - """"" - # TODO: need to check if 3 angles are input. - # TODO: need to check if separated appropriately if not create quality warning. - input_dict = {} - in_list = list(input_collection) - - for p_angle in in_list: - if p_angle == "alpha": - break - input_dict[(input_collection[p_angle].meta['POLAR']) * u.degree] = input_collection[p_angle].data - - alpha = input_collection['alpha'].data * u.radian - B = (2 / 3) * (np.sum([ith_polarizer_brightness - for ith_angle, ith_polarizer_brightness - in input_dict.items() if ith_angle != "alpha"], axis=0)) - - pB = (-4 / 3) * (np.sum([ith_polarizer_brightness - * np.cos(2 * (ith_angle - alpha)) - for ith_angle, ith_polarizer_brightness - in input_dict.items() if ith_angle != "alpha"], axis=0)) - metaB, metapB = copy.copy(input_collection["Bm"].meta), copy.copy(input_collection["Bm"].meta) - metaB.update(POLAR='B'), metapB.update(POLAR='pB') - mask = combine_masks(input_collection["Bm"].mask, input_collection["Bz"].mask, input_collection["Bp"].mask) - BpB_cube = [] - BpB_cube.append(("B", NDCube(B, wcs=input_collection["Bm"].wcs, mask=mask, meta=metaB))) - BpB_cube.append(("pB", NDCube(pB, wcs=input_collection["Bm"].wcs, mask=mask, meta=metapB))) - BpB_cube.append(("alpha", NDCube(alpha, wcs=input_collection["Bm"].wcs, mask=mask))) - # todo: WCS for alpha needs to be generated wrt to solar north - - return NDCollection(BpB_cube, meta={}, aligned_axes="all") - - -def bpb_to_mzp(input_collection, **kwargs): - """ - Notes - ------ - Equation 4 in DeForest et al. 2022. - """ - - if "alpha" not in input_collection: - raise ValueError("missing alpha") - - alpha = input_collection['alpha'].data * u.radian - B, pB = input_collection["B"].data, input_collection["pB"].data - mzp_ang = [-60, 0, 60] - Bmzp = {} - for ang in mzp_ang: - Bmzp[ang * u.degree] = (1 / 2) * (B - pB * (np.cos(2 * (ang * u.degree - alpha)))) - - metaM, metaZ, metaP = copy.copy(input_collection["B"].meta), copy.copy(input_collection["B"].meta), copy.copy( - input_collection["B"].meta) - metaM.update(POLAR=-60) - metaZ.update(POLAR=0) - metaP.update(POLAR=60) - mask = combine_masks(input_collection["B"].mask, input_collection["pB"].mask) - Bmzp_cube = [] - Bmzp_cube.append(("Bm", NDCube(Bmzp[-60 * u.degree], wcs=input_collection["B"].wcs, mask=mask, meta=metaM))) - Bmzp_cube.append(("Bz", NDCube(Bmzp[0 * u.degree], wcs=input_collection["B"].wcs, mask=mask, meta=metaZ))) - Bmzp_cube.append(("Bp", NDCube(Bmzp[60 * u.degree], wcs=input_collection["B"].wcs, mask=mask, meta=metaP))) - Bmzp_cube.append(("alpha", NDCube(alpha, wcs=input_collection["B"].wcs))) - # todo: WCS for alpha needs to be generated wrt to solar north - - return NDCollection(Bmzp_cube, meta={}, aligned_axes="all") - - -def bpb_to_btbr(input_collection, **kwargs): - """ - Notes - ------ - Equation 1 and 2 in DeForest et al. 2022. - """ - - if "alpha" not in input_collection: - raise ValueError("missing alpha") - - alpha = input_collection['alpha'].data * u.radian - B, pB = input_collection['B'].data, input_collection['pB'].data - Br = (B - pB) / 2 - Bt = (B + pB) / 2 - - metaBr, metaBt = copy.copy(input_collection["B"].meta), copy.copy(input_collection["B"].meta) - metaBr.update(POLAR='Br'), metaBt.update(POLAR='Bt') - mask = combine_masks(input_collection["B"].mask, input_collection["pB"].mask) - BtBr_cube = [] - BtBr_cube.append(("Bt", NDCube(Bt, wcs=input_collection["B"].wcs, mask=mask, meta=metaBt))) - BtBr_cube.append(("Br", NDCube(Br, wcs=input_collection["B"].wcs, mask=mask, meta=metaBr))) - BtBr_cube.append(("alpha", NDCube(alpha, wcs=input_collection["B"].wcs))) - # todo: WCS for alpha needs to be generated wrt to solar north - - return NDCollection(BtBr_cube, meta={}, aligned_axes="all") - - -def btbr_to_bpb(input_collection, **kwargs): - """ - Notes - ------ - Equation in Table 1 in DeForest et al. 2022. - """ - - if "alpha" not in input_collection: - raise ValueError("missing alpha") - - alpha = input_collection['alpha'].data * u.radian - Bt, Br = input_collection['Bt'].data, input_collection['Br'].data - pB = (Bt - Br) - B = (Bt + Br) - - metaB, metapB = copy.copy(input_collection["Bt"].meta), copy.copy(input_collection["Bt"].meta) - metaB.update(POLAR='B'), metapB.update(POLAR='pB') - mask = combine_masks(input_collection["Bt"].mask, input_collection["Br"].mask) - BpB_cube = [] - BpB_cube.append(("B", NDCube(B, wcs=input_collection["Bt"].wcs, mask=mask, meta=metaB))) - BpB_cube.append(("pB", NDCube(pB, wcs=input_collection["Bt"].wcs, mask=mask, meta=metapB))) - BpB_cube.append(("alpha", NDCube(alpha, wcs=input_collection["Bt"].wcs, mask=mask))) - # todo: WCS for alpha needs to be generated wrt to solar north - - return NDCollection(BpB_cube, meta={}, aligned_axes="all") - - -def mzp_to_stokes(input_collection, **kwargs): - """ - Notes - ------ - Equation 9, 12 and 13 in DeForest et al. 2022. - """ - Bm, Bz, Bp = input_collection["Bm"].data, input_collection["Bz"].data, input_collection["Bp"].data - - mulmx = (2 / 3) * np.array([[1, 1, 1], [-1, 2, -1], [-np.sqrt(3), 0, np.sqrt(3)]]) - - Bi = mulmx[0, 0] * Bm + mulmx[0, 1] * Bz + mulmx[0, 2] * Bp - Bq = mulmx[1, 0] * Bm + mulmx[1, 1] * Bz + mulmx[1, 2] * Bp - Bu = mulmx[2, 0] * Bm + mulmx[2, 1] * Bz + mulmx[2, 2] * Bp - - metaI, metaQ, metaU = copy.copy(input_collection["Bm"].meta), copy.copy(input_collection["Bz"].meta), copy.copy( - input_collection["Bp"].meta) - metaI.update(POLAR='Stokes I'), metaQ.update(POLAR='Stokes Q'), metaU.update(POLAR='Stokes U') - mask = combine_masks(input_collection["Bm"].mask, input_collection["Bz"].mask, input_collection["Bp"].mask) - BStokes_cube = [] - BStokes_cube.append(("Bi", NDCube(Bi, wcs=input_collection["Bm"].wcs, mask=mask, meta=metaI))) - BStokes_cube.append(("Bq", NDCube(Bq, wcs=input_collection["Bm"].wcs, mask=mask, meta=metaQ))) - BStokes_cube.append(("Bu", NDCube(Bu, wcs=input_collection["Bm"].wcs, mask=mask, meta=metaU))) - return NDCollection(BStokes_cube, meta={}, aligned_axes="all") - - -def stokes_to_mzp(input_collection, **kwargs): - """ - Notes - ------ - Equation 11 in DeForest et al. 2022. with alpha = np.pi/2 - """ - - alpha = np.pi / 2 - Bi, Bq, Bu = input_collection["Bi"].data, input_collection["Bq"].data, input_collection["Bu"].data - - inv_mul_mx = (1 / 2) * np.array([[1, -np.cos(2 * (-np.pi / 3 - alpha)), -np.sin(2 * (-np.pi / 3 - alpha))], - [1, -np.cos(2 * (0 - alpha)), 0], - [1, -np.cos(2 * (np.pi / 3 - alpha)), -np.sin(2 * (np.pi / 3 - alpha))]]) - - Bm = inv_mul_mx[0, 0] * Bi + inv_mul_mx[0, 1] * Bq + inv_mul_mx[0, 2] * Bu - Bz = inv_mul_mx[1, 0] * Bi + inv_mul_mx[1, 1] * Bq + inv_mul_mx[1, 2] * Bu - Bp = inv_mul_mx[2, 0] * Bi + inv_mul_mx[2, 1] * Bq + inv_mul_mx[2, 2] * Bu - - metaM, metaZ, metaP = copy.copy(input_collection["Bi"].meta), copy.copy(input_collection["Bq"].meta), copy.copy( - input_collection["Bu"].meta) - metaM.update(POLAR=-60), metaZ.update(POLAR=0), metaP.update(POLAR=60) - mask = combine_masks(input_collection["Bi"].mask, input_collection["Bq"].mask, input_collection["Bu"].mask) - Bmzp_cube = [] - Bmzp_cube.append(("Bm", NDCube(Bm, wcs=input_collection["Bi"].wcs, mask=mask, meta=metaM))) - Bmzp_cube.append(("Bz", NDCube(Bz, wcs=input_collection["Bi"].wcs, mask=mask, meta=metaZ))) - Bmzp_cube.append(("Bp", NDCube(Bp, wcs=input_collection["Bi"].wcs, mask=mask, meta=metaP))) - Bmzp_cube.append(("alpha", NDCube(np.zeros_like(Bm) + alpha, wcs=input_collection["Bi"].wcs, mask=mask))) - - return NDCollection(Bmzp_cube, meta={}, aligned_axes="all") - - -def mzp_to_bp3(input_collection, **kwargs): - """ - Notes - ------ - Equation 7, 9 and 10 in DeForest et al. 2022. - """"" - input_dict = {} - in_list = list(input_collection) - - for p_angle in in_list: - if p_angle == "alpha": - break - input_dict[(input_collection[p_angle].meta['POLAR'] * u.degree)] = input_collection[p_angle].data - - alpha = input_collection['alpha'].data * u.radian - B = (2 / 3) * (np.sum([ith_polarizer_brightness for ith_angle, ith_polarizer_brightness - in input_dict.items() if ith_angle != "alpha"], axis=0)) - - pB = (-4 / 3) * (np.sum([ith_polarizer_brightness - * np.cos(2 * (ith_angle - alpha)) - for ith_angle, ith_polarizer_brightness - in input_dict.items() if ith_angle != "alpha"], axis=0)) - - pBp = (-4 / 3) * (np.sum([ith_polarizer_brightness * np.sin(2 * (ith_angle - alpha)) - for ith_angle, ith_polarizer_brightness - in input_dict.items() if ith_angle != "alpha"], axis=0)) - # todo: update header properly - metaB, metapB, metapBp = copy.copy(input_collection["Bm"].meta), copy.copy(input_collection["Bm"].meta), copy.copy( - input_collection["Bm"].meta) - metaB.update(POLAR='B'), metapB.update(POLAR='pB'), metapBp.update(POLAR='pB-prime') - mask = combine_masks(input_collection["Bm"].mask, input_collection["Bz"].mask, input_collection["Bp"].mask) - Bp3_cube = [] - Bp3_cube.append(("B", NDCube(B, wcs=input_collection["Bm"].wcs, mask=mask, meta=metaB))) - Bp3_cube.append(("pB", NDCube(pB, wcs=input_collection["Bm"].wcs, mask=mask, meta=metapB))) - Bp3_cube.append(("pBp", NDCube(pBp, wcs=input_collection["Bm"].wcs, mask=mask, meta=metapBp))) - Bp3_cube.append(("alpha", NDCube(alpha, wcs=input_collection["Bm"].wcs, mask=mask))) - # todo: WCS for alpha needs to be generated wrt to solar north - - return NDCollection(Bp3_cube, meta={}, aligned_axes="all") - - -def bp3_to_mzp(input_collection, **kwargs): - """ - Notes - ------ - Equation 11 in DeForest et al. 2022. - """"" - - if "alpha" not in input_collection: - raise ValueError("missing alpha") - - B, pB, pBp = input_collection['B'].data, input_collection['pB'].data, input_collection['pBp'].data - alpha = input_collection['alpha'].data * u.radian - - mzp_ang = [-60, 0, 60] * u.degree - Bmzp = {} - for ang in mzp_ang: - Bmzp[ang] = (1 / 2) * (B - np.cos(2 * (ang - alpha)) * pB - - np.cos(2 * (ang - alpha)) * pBp) - - metaM, metaZ, metaP = copy.copy(input_collection["B"].meta), copy.copy(input_collection["pB"].meta), copy.copy( - input_collection["pBp"].meta) - metaM.update(POLAR=-60), metaZ.update(POLAR=0), metaP.update(POLAR=60) - mask = combine_masks(input_collection["B"].mask, input_collection["pB"].mask, input_collection["pBp"].mask) - Bmzp_cube = [] - Bmzp_cube.append(("Bm", NDCube(Bmzp[-60 * u.degree], wcs=input_collection["B"].wcs, mask=mask, meta=metaM))) - Bmzp_cube.append(("Bz", NDCube(Bmzp[0 * u.degree], wcs=input_collection["B"].wcs, mask=mask, meta=metaZ))) - Bmzp_cube.append(("Bp", NDCube(Bmzp[60 * u.degree], wcs=input_collection["B"].wcs, mask=mask, meta=metaP))) - Bmzp_cube.append(("alpha", NDCube(alpha, wcs=input_collection["B"].wcs, mask=mask))) - - return NDCollection(Bmzp_cube, meta={}, aligned_axes="all") - - -def btbr_to_mzp(input_collection, **kwargs): - """ - Notes - ------ - Equation 3 in DeForest et al. 2022. - """ - if "alpha" not in input_collection: - raise ValueError("missing alpha") - - alpha = input_collection['alpha'].data * u.radian - Bt = input_collection['Bt'].data - Br = input_collection['Br'].data - - mzp_ang = [-60, 0, 60] * u.degree - Bmzp = {} - for ang in mzp_ang: - Bmzp[ang] = Bt * (np.sin(ang - alpha)) ** 2 + Br * (np.cos(ang - alpha)) ** 2 - - metaM, metaZ, metaP = copy.copy(input_collection["Bt"].meta), copy.copy(input_collection["Bt"].meta), copy.copy( - input_collection["Bt"].meta) - metaM.update(POLAR=-60), metaZ.update(POLAR=0), metaP.update(POLAR=60) - mask = combine_masks(input_collection["Bt"].mask, input_collection["Br"].mask) - Bmzp_cube = [] - Bmzp_cube.append(("Bm", NDCube(Bmzp[-60 * u.degree], wcs=input_collection["Bt"].wcs, mask=mask, meta=metaM))) - Bmzp_cube.append(("Bz", NDCube(Bmzp[0 * u.degree], wcs=input_collection["Bt"].wcs, mask=mask, meta=metaZ))) - Bmzp_cube.append(("Bp", NDCube(Bmzp[60 * u.degree], wcs=input_collection["Bt"].wcs, mask=mask, meta=metaP))) - Bmzp_cube.append(("alpha", NDCube(alpha, wcs=input_collection["Bt"].wcs, mask=mask))) - - return NDCollection(Bmzp_cube, meta={}, aligned_axes="all") - - -def bp3_to_bthp(input_collection, **kwargs): - """ - Notes - ------ - Equations 9, 15, 16 in DeForest et al. 2022. - """"" - if "alpha" not in input_collection: - raise ValueError("missing alpha") - - B, pB, pBp = input_collection['B'].data, input_collection['pB'].data, input_collection['pBp'].data - alpha = input_collection['alpha'].data * u.radian - - theta_mx = (1 / 2) * np.arctan2(pBp, pB) * u.radian + np.pi / 2 * u.radian + alpha - p = np.sqrt(pB ** 2 + pBp ** 2) / B - - metaTh, metaP = copy.copy(input_collection["B"].meta), copy.copy(input_collection["pB"].meta) - metaTh.update(POLAR='Theta'), metaP.update(POLAR='Degree of Polarization') - mask = combine_masks(input_collection["B"].mask, input_collection["pB"].mask, input_collection["pBp"].mask) - Bthp_cube = [] - Bthp_cube.append(("B", NDCube(B, wcs=input_collection["B"].wcs, mask=mask, meta=input_collection['B'].meta))) - Bthp_cube.append(("theta", NDCube(theta_mx, wcs=input_collection["B"].wcs, mask=mask, meta=metaTh))) - Bthp_cube.append(("p", NDCube(p, wcs=input_collection["B"].wcs, mask=mask, meta=metaP))) - - return NDCollection(Bthp_cube, meta={}, aligned_axes="all") - - -def btbr_to_npol(input_collection, out_angles=None, **kwargs): - """ - Notes - ------ - Equation 3 in DeForest et al. 2022. - angles: list of input angles in degree - """ - if "alpha" not in input_collection: - raise ValueError("missing alpha") - - if out_angles is None: - raise ValueError("out_angles must be defined as a list of floats") - - alpha = input_collection['alpha'].data * u.radian - Bt, Br = input_collection['Bt'].data, input_collection['Br'].data - - npol_ang = out_angles - Bnpol = {} - Bnpol_cube = [] - mask = combine_masks(input_collection["Bt"].mask, input_collection["Br"].mask) - for ang in npol_ang: - Bnpol[ang] = Bt * (np.sin(ang * u.degree - alpha)) ** 2 + Br * (np.cos(ang * u.degree - alpha)) ** 2 - meta_tmp = copy.copy(input_collection["Bt"].meta) - meta_tmp.update(POLAR=ang) - Bnpol_cube.append(('B' + str(ang), NDCube(Bnpol[ang], wcs=input_collection["Bt"].wcs, mask=mask, meta=meta_tmp))) - Bnpol_cube.append(("alpha", NDCube(alpha, wcs=input_collection["Bt"].wcs, mask=mask))) - - return NDCollection(Bnpol_cube, meta={}, aligned_axes="all") - - -def mzp_to_npol(input_collection, out_angles=None, offset_angle=0, **kwargs): - """ - Notes - ------ - Equation 45 in DeForest et al. 2022. - angles: list of input angles in degree - """ - if out_angles is None: - raise ValueError("out_angles must be defined as a list of floats") - - in_list = list(input_collection) - input_dict = {} - - for p_angle in in_list: - if p_angle == "alpha": - break - input_dict[(input_collection[p_angle].meta['POLAR'] * u.degree)] = input_collection[p_angle].data - - npol_ang = out_angles - Bnpol = {} - Bnpol_cube = [] - mask = combine_masks(*[input_collection[k].mask for k in in_list]) - for ang in npol_ang: - Bnpol[ang] = (1/3) * np.sum([v.data * (4 * np.power(np.cos(ang - k - offset_angle), 2)) - 1 for k, v in input_dict.items()], axis=0) - meta_tmp = copy.copy(input_collection[in_list[0]].meta) - meta_tmp.update(Polar=ang) - Bnpol_cube.append(('B' + str(ang), NDCube(Bnpol[ang], wcs=input_collection[in_list[0]].wcs, mask=mask, meta=meta_tmp))) - - if "alpha" in input_collection: - alpha = input_collection['alpha'].data * u.radian - Bnpol_cube.append(("alpha", NDCube(alpha, wcs=input_collection[in_list[0]].wcs, mask=mask))) - - return NDCollection(Bnpol_cube, meta={}, aligned_axes="all") - - -def fourpol_to_stokes(input_collection, **kwargs): - """ - Notes - ------ - Table 1 in DeForest et al. 2022. - - """"" - Bi = input_collection["B0"].data + input_collection["B90"].data - Bq = input_collection["B90"].data - input_collection["B0"].data - Bu = input_collection["B135"].data - input_collection["B45"].data - - metaI, metaQ, metaU = (copy.copy(input_collection["B0"].meta), - copy.copy(input_collection["B0"].meta), - copy.copy(input_collection["B0"].meta)) - mask = combine_masks(input_collection["B0"].mask, - input_collection["B45"].mask, - input_collection["B90"].mask, - input_collection["B135"].mask) - BStokes_cube = [] - BStokes_cube.append(("Bi", NDCube(Bi, wcs=input_collection["B0"].wcs, mask=mask, meta=metaI))) - BStokes_cube.append(("Bq", NDCube(Bq, wcs=input_collection["B0"].wcs, mask=mask, meta=metaQ))) - BStokes_cube.append(("Bu", NDCube(Bu, wcs=input_collection["B0"].wcs, mask=mask, meta=metaU))) - BStokes_cube.append(("Bu", NDCube(Bu, wcs=input_collection["B0"].wcs, mask=mask, meta=metaU))) - - return NDCollection(BStokes_cube, meta={}, aligned_axes="all") - - -def combine_masks(*args): - """ Combines masks - - If any of the masks are None, the result is None. - Otherwise, when combining any value that is masked in any of the input args, gets masked, i.e. it does a logical or. - """ - if any(arg is None for arg in args): - return None - else: - return np.logical_or.reduce(args) diff --git a/solpolpy/system.py b/solpolpy/system.py new file mode 100644 index 0000000..ff29529 --- /dev/null +++ b/solpolpy/system.py @@ -0,0 +1,44 @@ + +from enum import StrEnum + +from ndcube import NDCollection + +System = StrEnum("System", ["BpB", "angles", "Stokes"]) +SYSTEM_REQUIRED_KEYS = {System.BpB: ["B", "pB"], + System.Stokes: ["I", "Q", "U"]} + + +def transform_a(input: NDCollection) -> NDCollection: + return input + + +def use_alpha(transform): + def wrapped(*args, **kwargs) -> None: + pass + wrapped.uses_alpha = True + return wrapped + + +@use_alpha +def transform_b(input: NDCollection) -> NDCollection: + assert "alpha" in input + return input + + +def check_alpha(func): + return getattr(func, "uses_alpha", False) + + +if __name__ == "__main__": + from inspect import getmembers, isfunction + + import networkx as nx + + from solpolpy import polarizers + + transform_graph = nx.DiGraph() + polarizer_functions = getmembers(polarizers, isfunction) + for function_name, function in polarizer_functions: + if "_to_" in function_name: + source, destination = function_name.split("_to_") + transform_graph.add_edge(source, destination, func=function) diff --git a/solpolpy/transforms.py b/solpolpy/transforms.py new file mode 100644 index 0000000..de6a59b --- /dev/null +++ b/solpolpy/transforms.py @@ -0,0 +1,474 @@ +"""Polarizer functions for solpolpy. +Found in: +DeForest, C. E., Seaton, D. B., & West, M. J. (2022). +Three-polarizer Treatment of Linear Polarization in Coronagraphs and Heliospheric Imagers. +The Astrophysical Journal, 927(1), 98. +""" +import copy + +import astropy.units as u +import numpy as np +from ndcube import NDCollection, NDCube + +from solpolpy.util import combine_all_collection_masks, use_alpha + + +# TODO: prepare a config file where the reference angle say of STEREO, KCor etc can be set +def npol_to_mzp(input_collection, offset_angle=0, **kwargs): + """ + Notes + ------ + Equation 44 in DeForest et al. 2022. + + """"" + input_keys = list(input_collection.keys()) + input_dict = {} + in_list = list(input_collection) + + for p_angle in in_list: + if p_angle == "alpha": + break + input_dict[u.Quantity(p_angle)] = input_collection[p_angle].data + + mzp_angles = [-60, 0, 60] * u.degree + Bmzp = {} + for target_angle in mzp_angles: + Bmzp[target_angle] = ((1 / 3) * np.sum([ith_polarizer_brightness * (1 + 2 * np.cos(2 * (target_angle - (ith_angle-offset_angle)))) + for ith_angle, ith_polarizer_brightness in input_dict.items()], axis=0)) + + # TODO: update header properly; time info? + metaM, metaZ, metaP = (copy.copy(input_collection[input_keys[0]].meta), + copy.copy(input_collection[input_keys[0]].meta), + copy.copy(input_collection[input_keys[0]].meta)) + metaM.update(POLAR=-60*u.degree) + metaZ.update(POLAR=0*u.degree) + metaP.update(POLAR=60*u.degree) + mask = combine_all_collection_masks(input_collection) + Bmzp_cube = [("M", NDCube(Bmzp[-60 * u.degree], wcs=input_collection[input_keys[0]].wcs, mask=mask, meta=metaM)), + ("Z", NDCube(Bmzp[0 * u.degree], wcs=input_collection[input_keys[0]].wcs, mask=mask, meta=metaZ)), + ("P", NDCube(Bmzp[60 * u.degree], wcs=input_collection[input_keys[0]].wcs, mask=mask, meta=metaP))] + for p_angle in in_list: + if p_angle.lower() == "alpha": + Bmzp_cube.append(("alpha", NDCube(input_collection["alpha"].data * u.radian, + wcs=input_collection[input_keys[0]].wcs, + meta=metaP))) + return NDCollection(Bmzp_cube, meta={}, aligned_axes="all") + + +@use_alpha +def mzp_to_bpb(input_collection, **kwargs): + """ + Notes + ------ + Equation 7 and 9 in DeForest et al. 2022. + + """"" + # TODO: need to check if 3 angles are input. + # TODO: need to check if separated appropriately if not create quality warning. + input_dict = {} + in_list = list(input_collection) + + for p_angle in in_list: + if p_angle == "alpha": + break + input_dict[input_collection[p_angle].meta["POLAR"]] = input_collection[p_angle].data + + alpha = input_collection["alpha"].data * u.radian + B = (2 / 3) * (np.sum([ith_polarizer_brightness + for ith_angle, ith_polarizer_brightness + in input_dict.items() if ith_angle != "alpha"], axis=0)) + + pB = (-4 / 3) * (np.sum([ith_polarizer_brightness + * np.cos(2 * (ith_angle - alpha)) + for ith_angle, ith_polarizer_brightness + in input_dict.items() if ith_angle != "alpha"], axis=0)) + metaB, metapB = copy.copy(input_collection["M"].meta), copy.copy(input_collection["M"].meta) + metaB.update(POLAR="B") + metapB.update(POLAR="pB") + + mask = combine_all_collection_masks(input_collection) + + BpB_cube = [("B", NDCube(B, wcs=input_collection["M"].wcs, mask=mask, meta=metaB)), + ("pB", NDCube(pB, wcs=input_collection["M"].wcs, mask=mask, meta=metapB)), + ("alpha", NDCube(alpha, wcs=input_collection["M"].wcs, mask=mask))] + # TODO: WCS for alpha needs to be generated wrt to solar north + + return NDCollection(BpB_cube, meta={}, aligned_axes="all") + + +@use_alpha +def bpb_to_mzp(input_collection, **kwargs): + """Notes + ----- + Equation 4 in DeForest et al. 2022. + """ + if "alpha" not in input_collection: + msg = "missing alpha" + raise ValueError(msg) + + alpha = input_collection["alpha"].data * u.radian + B, pB = input_collection["B"].data, input_collection["pB"].data + mzp_angles = [-60, 0, 60] * u.degree + Bmzp = {} + for angle in mzp_angles: + Bmzp[angle] = (1 / 2) * (B - pB * (np.cos(2 * (angle - alpha)))) + + metaM, metaZ, metaP = copy.copy(input_collection["B"].meta), copy.copy(input_collection["B"].meta), copy.copy( + input_collection["B"].meta) + metaM.update(POLAR=-60*u.degree) + metaZ.update(POLAR=0*u.degree) + metaP.update(POLAR=60*u.degree) + mask = combine_all_collection_masks(input_collection) + Bmzp_cube = [("M", NDCube(Bmzp[-60 * u.degree], wcs=input_collection["B"].wcs, mask=mask, meta=metaM)), + ("Z", NDCube(Bmzp[0 * u.degree], wcs=input_collection["B"].wcs, mask=mask, meta=metaZ)), + ("P", NDCube(Bmzp[60 * u.degree], wcs=input_collection["B"].wcs, mask=mask, meta=metaP)), + ("alpha", NDCube(alpha, wcs=input_collection["B"].wcs))] + # TODO: WCS for alpha needs to be generated wrt to solar north + + return NDCollection(Bmzp_cube, meta={}, aligned_axes="all") + + +@use_alpha +def bpb_to_btbr(input_collection, **kwargs): + """Notes + ----- + Equation 1 and 2 in DeForest et al. 2022. + """ + if "alpha" not in input_collection: + msg = "missing alpha" + raise ValueError(msg) + + alpha = input_collection["alpha"].data * u.radian + B, pB = input_collection["B"].data, input_collection["pB"].data + Br = (B - pB) / 2 + Bt = (B + pB) / 2 + + metaBr, metaBt = copy.copy(input_collection["B"].meta), copy.copy(input_collection["B"].meta) + metaBr.update(POLAR="Br") + metaBt.update(POLAR="Bt") + + mask = combine_all_collection_masks(input_collection) + BtBr_cube = [("Bt", NDCube(Bt, wcs=input_collection["B"].wcs, mask=mask, meta=metaBt)), + ("Br", NDCube(Br, wcs=input_collection["B"].wcs, mask=mask, meta=metaBr)), + ("alpha", NDCube(alpha, wcs=input_collection["B"].wcs))] + # TODO: WCS for alpha needs to be generated wrt to solar north + + return NDCollection(BtBr_cube, meta={}, aligned_axes="all") + + +@use_alpha +def btbr_to_bpb(input_collection, **kwargs): + """Notes + ----- + Equation in Table 1 in DeForest et al. 2022. + """ + if "alpha" not in input_collection: + msg = "missing alpha" + raise ValueError(msg) + + alpha = input_collection["alpha"].data * u.radian + Bt, Br = input_collection["Bt"].data, input_collection["Br"].data + pB = (Bt - Br) + B = (Bt + Br) + + metaB, metapB = copy.copy(input_collection["Bt"].meta), copy.copy(input_collection["Bt"].meta) + metaB.update(POLAR="B") + metapB.update(POLAR="pB") + + mask = combine_all_collection_masks(input_collection) + BpB_cube = [("B", NDCube(B, wcs=input_collection["Bt"].wcs, mask=mask, meta=metaB)), + ("pB", NDCube(pB, wcs=input_collection["Bt"].wcs, mask=mask, meta=metapB)), + ("alpha", NDCube(alpha, wcs=input_collection["Bt"].wcs, mask=mask))] + # TODO: WCS for alpha needs to be generated wrt to solar north + + return NDCollection(BpB_cube, meta={}, aligned_axes="all") + + +def mzp_to_stokes(input_collection, **kwargs): + """Notes + ----- + Equation 9, 12 and 13 in DeForest et al. 2022. + """ + Bm, Bz, Bp = input_collection["M"].data, input_collection["Z"].data, input_collection["P"].data + + mueller_matrix = (2 / 3) * np.array([[1, 1, 1], [-1, 2, -1], [-np.sqrt(3), 0, np.sqrt(3)]]) + + Bi = mueller_matrix[0, 0] * Bm + mueller_matrix[0, 1] * Bz + mueller_matrix[0, 2] * Bp + Bq = mueller_matrix[1, 0] * Bm + mueller_matrix[1, 1] * Bz + mueller_matrix[1, 2] * Bp + Bu = mueller_matrix[2, 0] * Bm + mueller_matrix[2, 1] * Bz + mueller_matrix[2, 2] * Bp + + metaI, metaQ, metaU = copy.copy(input_collection["M"].meta), copy.copy(input_collection["Z"].meta), copy.copy( + input_collection["P"].meta) + metaI.update(POLAR="Stokes I") + metaQ.update(POLAR="Stokes Q") + metaU.update(POLAR="Stokes U") + + mask = combine_all_collection_masks(input_collection) + BStokes_cube = [("I", NDCube(Bi, wcs=input_collection["M"].wcs, mask=mask, meta=metaI)), + ("Q", NDCube(Bq, wcs=input_collection["M"].wcs, mask=mask, meta=metaQ)), + ("U", NDCube(Bu, wcs=input_collection["M"].wcs, mask=mask, meta=metaU))] + return NDCollection(BStokes_cube, meta={}, aligned_axes="all") + + +def stokes_to_mzp(input_collection, **kwargs): + """Notes + ----- + Equation 11 in DeForest et al. 2022. with alpha = np.pi/2 + """ + alpha = np.pi / 2 + Bi, Bq, Bu = input_collection["I"].data, input_collection["Q"].data, input_collection["U"].data + + inv_mul_mx = (1 / 2) * np.array([[1, -np.cos(2 * (-np.pi / 3 - alpha)), -np.sin(2 * (-np.pi / 3 - alpha))], + [1, -np.cos(2 * (0 - alpha)), 0], + [1, -np.cos(2 * (np.pi / 3 - alpha)), -np.sin(2 * (np.pi / 3 - alpha))]]) + + Bm = inv_mul_mx[0, 0] * Bi + inv_mul_mx[0, 1] * Bq + inv_mul_mx[0, 2] * Bu + Bz = inv_mul_mx[1, 0] * Bi + inv_mul_mx[1, 1] * Bq + inv_mul_mx[1, 2] * Bu + Bp = inv_mul_mx[2, 0] * Bi + inv_mul_mx[2, 1] * Bq + inv_mul_mx[2, 2] * Bu + + metaM, metaZ, metaP = copy.copy(input_collection["I"].meta), copy.copy(input_collection["Q"].meta), copy.copy( + input_collection["U"].meta) + metaM.update(POLAR=-60*u.degree) + metaZ.update(POLAR=0*u.degree) + metaP.update(POLAR=60*u.degree) + + mask = combine_all_collection_masks(input_collection) + Bmzp_cube = [("M", NDCube(Bm, wcs=input_collection["I"].wcs, mask=mask, meta=metaM)), + ("Z", NDCube(Bz, wcs=input_collection["I"].wcs, mask=mask, meta=metaZ)), + ("P", NDCube(Bp, wcs=input_collection["I"].wcs, mask=mask, meta=metaP)), + ("alpha", NDCube(np.zeros_like(Bm) + alpha, wcs=input_collection["I"].wcs, mask=mask))] + + return NDCollection(Bmzp_cube, meta={}, aligned_axes="all") + + +@use_alpha +def mzp_to_bp3(input_collection, **kwargs): + """ + Notes + ------ + Equation 7, 9 and 10 in DeForest et al. 2022. + """"" + input_dict = {} + in_list = list(input_collection) + + for p_angle in in_list: + if p_angle == "alpha": + break + input_dict[input_collection[p_angle].meta["POLAR"]] = input_collection[p_angle].data + + alpha = input_collection["alpha"].data * u.radian + B = (2 / 3) * (np.sum([ith_polarizer_brightness for ith_angle, ith_polarizer_brightness + in input_dict.items() if ith_angle != "alpha"], axis=0)) + + pB = (-4 / 3) * (np.sum([ith_polarizer_brightness + * np.cos(2 * (ith_angle - alpha)) + for ith_angle, ith_polarizer_brightness + in input_dict.items() if ith_angle != "alpha"], axis=0)) + + pBp = (-4 / 3) * (np.sum([ith_polarizer_brightness * np.sin(2 * (ith_angle - alpha)) + for ith_angle, ith_polarizer_brightness + in input_dict.items() if ith_angle != "alpha"], axis=0)) + # TODO: update header properly + metaB, metapB, metapBp = copy.copy(input_collection["M"].meta), copy.copy(input_collection["M"].meta), copy.copy( + input_collection["M"].meta) + metaB.update(POLAR="B") + metapB.update(POLAR="pB") + metapBp.update(POLAR="pB-prime") + + mask = combine_all_collection_masks(input_collection) + Bp3_cube = [("B", NDCube(B, wcs=input_collection["M"].wcs, mask=mask, meta=metaB)), + ("pB", NDCube(pB, wcs=input_collection["M"].wcs, mask=mask, meta=metapB)), + ("pBp", NDCube(pBp, wcs=input_collection["M"].wcs, mask=mask, meta=metapBp)), + ("alpha", NDCube(alpha, wcs=input_collection["M"].wcs, mask=mask))] + # TODO: WCS for alpha needs to be generated wrt to solar north + + return NDCollection(Bp3_cube, meta={}, aligned_axes="all") + + +@use_alpha +def bp3_to_mzp(input_collection, **kwargs): + """ + Notes + ------ + Equation 11 in DeForest et al. 2022. + """"" + + if "alpha" not in input_collection: + msg = "missing alpha" + raise ValueError(msg) + + B, pB, pBp = input_collection["B"].data, input_collection["pB"].data, input_collection["pBp"].data + alpha = input_collection["alpha"].data * u.radian + + mzp_angles = [-60, 0, 60] * u.degree + Bmzp = {} + for angle in mzp_angles: + Bmzp[angle] = (1 / 2) * (B - np.cos(2 * (angle - alpha)) * pB - + np.cos(2 * (angle - alpha)) * pBp) + + metaM, metaZ, metaP = copy.copy(input_collection["B"].meta), copy.copy(input_collection["pB"].meta), copy.copy( + input_collection["pBp"].meta) + metaM.update(POLAR=-60*u.degree) + metaZ.update(POLAR=0*u.degree) + metaP.update(POLAR=60*u.degree) + + mask = combine_all_collection_masks(input_collection) + Bmzp_cube = [("M", NDCube(Bmzp[-60 * u.degree], wcs=input_collection["B"].wcs, mask=mask, meta=metaM)), + ("Z", NDCube(Bmzp[0 * u.degree], wcs=input_collection["B"].wcs, mask=mask, meta=metaZ)), + ("P", NDCube(Bmzp[60 * u.degree], wcs=input_collection["B"].wcs, mask=mask, meta=metaP)), + ("alpha", NDCube(alpha, wcs=input_collection["B"].wcs, mask=mask))] + + return NDCollection(Bmzp_cube, meta={}, aligned_axes="all") + + +@use_alpha +def btbr_to_mzp(input_collection, **kwargs): + """Notes + ----- + Equation 3 in DeForest et al. 2022. + """ + if "alpha" not in input_collection: + msg = "missing alpha" + raise ValueError(msg) + + alpha = input_collection["alpha"].data * u.radian + Bt = input_collection["Bt"].data + Br = input_collection["Br"].data + + mzp_angles = [-60, 0, 60] * u.degree + Bmzp = {} + for angle in mzp_angles: + Bmzp[angle] = Bt * (np.sin(angle - alpha)) ** 2 + Br * (np.cos(angle - alpha)) ** 2 + + metaM, metaZ, metaP = copy.copy(input_collection["Bt"].meta), copy.copy(input_collection["Bt"].meta), copy.copy( + input_collection["Bt"].meta) + metaM.update(POLAR=-60*u.degree) + metaZ.update(POLAR=0*u.degree) + metaP.update(POLAR=60*u.degree) + + mask = combine_all_collection_masks(input_collection) + Bmzp_cube = [("M", NDCube(Bmzp[-60 * u.degree], wcs=input_collection["Bt"].wcs, mask=mask, meta=metaM)), + ("Z", NDCube(Bmzp[0 * u.degree], wcs=input_collection["Bt"].wcs, mask=mask, meta=metaZ)), + ("P", NDCube(Bmzp[60 * u.degree], wcs=input_collection["Bt"].wcs, mask=mask, meta=metaP)), + ("alpha", NDCube(alpha, wcs=input_collection["Bt"].wcs, mask=mask))] + + return NDCollection(Bmzp_cube, meta={}, aligned_axes="all") + + +@use_alpha +def bp3_to_bthp(input_collection, **kwargs): + """ + Notes + ------ + Equations 9, 15, 16 in DeForest et al. 2022. + """"" + if "alpha" not in input_collection: + msg = "missing alpha" + raise ValueError(msg) + + B, pB, pBp = input_collection["B"].data, input_collection["pB"].data, input_collection["pBp"].data + alpha = input_collection["alpha"].data * u.radian + + theta = (1 / 2) * np.arctan2(pBp, pB) * u.radian + np.pi / 2 * u.radian + alpha + p = np.sqrt(pB ** 2 + pBp ** 2) / B + + metaTh, metaP = copy.copy(input_collection["B"].meta), copy.copy(input_collection["pB"].meta) + metaTh.update(POLAR="Theta") + metaP.update(POLAR="Degree of Polarization") + + mask = combine_all_collection_masks(input_collection) + Bthp_cube = [("B", NDCube(B, wcs=input_collection["B"].wcs, mask=mask, meta=input_collection["B"].meta)), + ("theta", NDCube(theta, wcs=input_collection["B"].wcs, mask=mask, meta=metaTh)), + ("p", NDCube(p, wcs=input_collection["B"].wcs, mask=mask, meta=metaP))] + + return NDCollection(Bthp_cube, meta={}, aligned_axes="all") + + +@use_alpha +@u.quantity_input(out_angles="angle") +def btbr_to_npol(input_collection, out_angles=None, **kwargs): + """Notes + ----- + Equation 3 in DeForest et al. 2022. + angles: list of input angles in degree + """ + if "alpha" not in input_collection: + msg = "missing alpha" + raise ValueError(msg) + + if out_angles is None: + msg = "out_angles must be defined as a list of floats" + raise ValueError(msg) + + alpha = input_collection["alpha"].data * u.radian + Bt, Br = input_collection["Bt"].data, input_collection["Br"].data + + Bnpol = {} + Bnpol_cube = [] + mask = combine_all_collection_masks(input_collection) + for angle in out_angles: + Bnpol[angle] = Bt * (np.sin(angle - alpha)) ** 2 + Br * (np.cos(angle - alpha)) ** 2 + meta_tmp = copy.copy(input_collection["Bt"].meta) + meta_tmp.update(POLAR=angle) + Bnpol_cube.append((str(angle), NDCube(Bnpol[angle], wcs=input_collection["Bt"].wcs, mask=mask, meta=meta_tmp))) + Bnpol_cube.append(("alpha", NDCube(alpha, wcs=input_collection["Bt"].wcs, mask=mask))) + + return NDCollection(Bnpol_cube, meta={}, aligned_axes="all") + + +@u.quantity_input(out_angles="angle") +def mzp_to_npol(input_collection, out_angles=None, offset_angle=0, **kwargs): + """Notes + ----- + Equation 45 in DeForest et al. 2022. + angles: list of input angles in degree + """ + if out_angles is None: + msg = "out_angles must be defined as a list of floats" + raise ValueError(msg) + + in_list = list(input_collection) + input_dict = {} + + for p_angle in in_list: + if p_angle == "alpha": + break + input_dict[input_collection[p_angle].meta["POLAR"]] = input_collection[p_angle].data + + npol_ang = out_angles + Bnpol = {} + Bnpol_cube = [] + mask = combine_all_collection_masks(input_collection) + for ang in npol_ang: + Bnpol[ang] = (1/3) * np.sum([v.data * (4 * np.power(np.cos(ang - k - offset_angle), 2)) - 1 for k, v in input_dict.items()], axis=0) + meta_tmp = copy.copy(input_collection[in_list[0]].meta) + meta_tmp.update(Polar=ang) + Bnpol_cube.append((str(ang), NDCube(Bnpol[ang], wcs=input_collection[in_list[0]].wcs, mask=mask, meta=meta_tmp))) + + if "alpha" in input_collection: + alpha = input_collection["alpha"].data * u.radian + Bnpol_cube.append(("alpha", NDCube(alpha, wcs=input_collection[in_list[0]].wcs, mask=mask))) + + return NDCollection(Bnpol_cube, meta={}, aligned_axes="all") + + +def fourpol_to_stokes(input_collection, **kwargs): + """ + Notes + ------ + Table 1 in DeForest et al. 2022. + + """"" + Bi = input_collection[str(0 * u.degree)].data + input_collection[str(90 * u.degree)].data + Bq = input_collection[str(90 * u.degree)].data - input_collection[str(0 * u.degree)].data + Bu = input_collection[str(135 * u.degree)].data - input_collection[str(45 * u.degree)].data + + metaI, metaQ, metaU = (copy.copy(input_collection[str(0 * u.degree)].meta), + copy.copy(input_collection[str(0 * u.degree)].meta), + copy.copy(input_collection[str(0 * u.degree)].meta)) + mask = combine_all_collection_masks(input_collection) + BStokes_cube = [("I", NDCube(Bi, wcs=input_collection[str(0 * u.degree)].wcs, mask=mask, meta=metaI)), + ("Q", NDCube(Bq, wcs=input_collection[str(0 * u.degree)].wcs, mask=mask, meta=metaQ)), + ("U", NDCube(Bu, wcs=input_collection[str(0 * u.degree)].wcs, mask=mask, meta=metaU))] + + return NDCollection(BStokes_cube, meta={}, aligned_axes="all") diff --git a/solpolpy/util.py b/solpolpy/util.py new file mode 100644 index 0000000..eeab0ed --- /dev/null +++ b/solpolpy/util.py @@ -0,0 +1,34 @@ +import numpy as np +from ndcube import NDCollection + +from solpolpy.errors import MissingAlphaError + + +def combine_all_collection_masks(collection: NDCollection) -> np.ndarray | None: + """Combine all the masks in a given collection.""" + return combine_masks(*[cube.mask for key, cube in collection.items() if key != "alpha"]) + + +def combine_masks(*args) -> np.ndarray | None: + """Combine masks. + + If any of the masks are None, the result is None. + Otherwise, when combining any value that is masked in any of the input args, gets masked, i.e. it does a logical or. + """ + if any(arg is None for arg in args): + return None + else: + return np.logical_or.reduce(args) + + +def use_alpha(transform): + """Require a transform to use the alpha parameter and check for its existence.""" + + def wrapped(input_collection, *func_args, **func_kwargs): + if "alpha" not in input_collection: + msg = "alpha expected in input_collection but not found." + raise MissingAlphaError(msg) + return transform(input_collection, *func_args, **func_kwargs) + wrapped.uses_alpha = True + + return wrapped diff --git a/tests/fixtures.py b/tests/fixtures.py new file mode 100644 index 0000000..1beab76 --- /dev/null +++ b/tests/fixtures.py @@ -0,0 +1,141 @@ +import astropy.units as u +import astropy.wcs +import numpy as np +from ndcube import NDCollection, NDCube +from pytest import fixture + +wcs = astropy.wcs.WCS(naxis=3) +wcs.ctype = "WAVE", "HPLT-TAN", "HPLN-TAN" +wcs.cdelt = 0.2, 0.5, 0.4 +wcs.cunit = "Angstrom", "deg", "deg" +wcs.crpix = 0, 2, 2 +wcs.crval = 10, 0.5, 1 + + +@fixture() +def npol_ones(): + data_out = [(str(60 * u.degree), NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": 60*u.degree, "OBSRVTRY": "LASCO"})), + (str(0 * u.degree), NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": 0*u.degree, "OBSRVTRY": "LASCO"})), + (str(-60 * u.degree), NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": -60*u.degree, "OBSRVTRY": "LASCO"}))] + # data_out.append(("alpha", NDCube(np.array([0])*u.radian, wcs=wcs))) + return NDCollection(data_out, meta={}, aligned_axes="all") + + +@fixture() +def fourpol_ones(): + data_out = [(str(0 * u.degree), NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": 0*u.degree})), + (str(45 * u.degree), NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": 45*u.degree})), + (str(90 * u.degree), NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": 90*u.degree})), + (str(135 * u.degree), NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": 135*u.degree}))] + return NDCollection(data_out, meta={}, aligned_axes="all") + + +@fixture() +def mzp_ones(): + data_out = [("P", NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": 60*u.degree})), + ("Z", NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": 0*u.degree})), + ("M", NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": -60*u.degree}))] + return NDCollection(data_out, meta={}, aligned_axes="all") + + +@fixture() +def mzp_data(): + data_out = [("P", NDCube(np.random.random([50, 50]), wcs=wcs, meta={"POLAR": 60*u.degree})), + ("Z", NDCube(np.random.random([50, 50]), wcs=wcs, meta={"POLAR": 0*u.degree})), + ("M", NDCube(np.random.random([50, 50]), wcs=wcs, meta={"POLAR": -60*u.degree}))] + return NDCollection(data_out, meta={}, aligned_axes="all") + + +@fixture() +def bpb_data(): + data_out = [("B", NDCube(np.random.random([50, 50]), wcs=wcs, meta={"POLAR": "B"})), + ("pB", NDCube(np.random.random([50, 50]), wcs=wcs, meta={"POLAR": "pB"}))] + return NDCollection(data_out, meta={}, aligned_axes="all") + + +@fixture() +def mzp_ones_alpha(): + data_out = [("P", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": 60*u.degree})), + ("Z", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": 0*u.degree})), + ("M", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": -60*u.degree})), + ("alpha", NDCube(np.array([0]), wcs=wcs))] + return NDCollection(data_out, meta={}, aligned_axes="all") + + +@fixture() +def bpb_zeros(): + data_out = [("B", NDCube(np.array([0]), wcs=wcs, meta={"POLAR": "B"})), + ("pB", NDCube(np.array([0]), wcs=wcs, meta={"POLAR": "pB"})), + ("alpha", NDCube(np.array([0]) * u.degree, wcs=wcs))] + return NDCollection(data_out, meta={}, aligned_axes="all") + + +@fixture() +def bpb_ones(): + data_out = [("B", NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": "B"})), + ("pB", NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": "pB"})), + ("alpha", NDCube(np.array([[0]]), wcs=wcs))] + return NDCollection(data_out, meta={}, aligned_axes="all") + + +@fixture() +def bpb_ones_no_alpha(): + data_out = [("B", NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": "B"})), + ("pB", NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": "pB"}))] + return NDCollection(data_out, meta={}, aligned_axes="all") + + +@fixture() +def btbr_ones(): + data_out = [("Bt", NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": "Bt"})), + ("Br", NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": "Br"})), + ("alpha", NDCube(np.array([[0]]) * u.degree, wcs=wcs))] + return NDCollection(data_out, meta={}, aligned_axes="all") + + +@fixture() +def btbr_ones_no_alpha(): + data_out = [("Bt", NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": "Bt"})), + ("Br", NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": "Br"}))] + return NDCollection(data_out, meta={}, aligned_axes="all") + + +@fixture() +def stokes_ones(): + data_out = [("I", NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": "I"})), + ("Q", NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": "Q"})), + ("U", NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": "U"}))] + return NDCollection(data_out, meta={}, aligned_axes="all") + + +@fixture() +def bp3_ones(): + data_out = [("B", NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": "B"})), + ("pB", NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": "pB"})), + ("pBp", NDCube(np.array([[1]]), wcs=wcs, meta={"POLAR": "pBp"})), + ("alpha", NDCube(np.array([[0]]) * u.degree, wcs=wcs))] + return NDCollection(data_out, meta={}, aligned_axes="all") + + +@fixture() +def bp3_ones_no_alpha(): + data_out = [("B", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": "B"})), + ("pB", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": "pB"})), + ("pBp", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": "pBp"}))] + return NDCollection(data_out, meta={}, aligned_axes="all") + + +@fixture() +def bthp_ones(): + data_out = [("B", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": "B"})), + ("theta", NDCube(np.array([1]) * u.degree, wcs=wcs, meta={"POLAR": "Theta"})), + ("p", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": "Degree of Polarization"}))] + return NDCollection(data_out, meta={}, aligned_axes="all") + + +@fixture() +def example_fail(): + data_out = [("B", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": "B"})), + ("Bm", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": "Bm"})), + ("alpha", NDCube(np.array([0]) * u.degree, wcs=wcs))] + return NDCollection(data_out, meta={}, aligned_axes="all") diff --git a/tests/test_core.py b/tests/test_core.py index 207ffed..4a757e5 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,6 +1,4 @@ -# -*- coding: utf-8 -*- -""" -pytest test suite for the polarizers module of solpolpy +"""pytest test suite for the polarizers module of solpolpy """ import astropy.units as u @@ -8,154 +6,29 @@ import numpy as np import pytest from ndcube import NDCollection, NDCube -from pytest import fixture -from solpolpy.constants import VALID_KINDS from solpolpy.core import _determine_image_shape, add_alpha, determine_input_kind, get_transform_path, resolve from solpolpy.errors import UnsupportedTransformationError +from solpolpy.graph import System +from tests.fixtures import * wcs = astropy.wcs.WCS(naxis=3) -wcs.ctype = 'WAVE', 'HPLT-TAN', 'HPLN-TAN' +wcs.ctype = "WAVE", "HPLT-TAN", "HPLN-TAN" wcs.cdelt = 0.2, 0.5, 0.4 -wcs.cunit = 'Angstrom', 'deg', 'deg' +wcs.cunit = "Angstrom", "deg", "deg" wcs.crpix = 0, 2, 2 wcs.crval = 10, 0.5, 1 -@fixture -def npol_ones(): - data_out = [] - data_out.append(("B60.0", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': 60, 'OBSRVTRY': 'LASCO'}))) - data_out.append(("B0.0", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': 0, 'OBSRVTRY': 'LASCO'}))) - data_out.append(("B-60.0", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': -60, 'OBSRVTRY': 'LASCO'}))) - # data_out.append(("alpha", NDCube(np.array([0])*u.radian, wcs=wcs))) - return NDCollection(data_out, meta={}, aligned_axes="all") - -@fixture -def fourpol_ones(): - data_out = [] - data_out.append(("B0", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': 0}))) - data_out.append(("B45", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': 45}))) - data_out.append(("B90", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': 90}))) - data_out.append(("B135", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': 135}))) - return NDCollection(data_out, meta={}, aligned_axes="all") - -@fixture -def mzp_ones(): - data_out = [] - data_out.append(("Bp", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': 60}))) - data_out.append(("Bz", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': 0}))) - data_out.append(("Bm", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': -60}))) - return NDCollection(data_out, meta={}, aligned_axes="all") - -@fixture -def mzp_data(): - data_out = [] - data_out.append(("Bp", NDCube(np.random.random([50,50]), wcs=wcs, meta={'POLAR': 60}))) - data_out.append(("Bz", NDCube(np.random.random([50,50]), wcs=wcs, meta={'POLAR': 0}))) - data_out.append(("Bm", NDCube(np.random.random([50,50]), wcs=wcs, meta={'POLAR': -60}))) - return NDCollection(data_out, meta={}, aligned_axes="all") - -@fixture -def bpb_data(): - data_out = [] - data_out.append(("B", NDCube(np.random.random([50,50]), wcs=wcs, meta={'POLAR': 'B'}))) - data_out.append(("pB", NDCube(np.random.random([50,50]), wcs=wcs, meta={'POLAR': 'pB'}))) - return NDCollection(data_out, meta={}, aligned_axes="all") - -@fixture -def mzp_ones_alpha(): - data_out = [] - data_out.append(("Bp", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 60}))) - data_out.append(("Bz", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 0}))) - data_out.append(("Bm", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': -60}))) - data_out.append(("alpha", NDCube(np.array([0]), wcs=wcs))) - return NDCollection(data_out, meta={}, aligned_axes="all") - -@fixture -def bpb_ones(): - data_out = [] - data_out.append(("B", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': 'B'}))) - data_out.append(("pB", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': 'pB'}))) - data_out.append(("alpha", NDCube(np.array([[0]]), wcs=wcs))) - return NDCollection(data_out, meta={}, aligned_axes="all") - -@fixture -def bpb_ones_no_alpha(): - data_out = [] - data_out.append(("B", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': 'B'}))) - data_out.append(("pB", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': 'pB'}))) - # data_out.append(("alpha", NDCube(np.array([0])*u.degree, wcs=wcs))) - return NDCollection(data_out, meta={}, aligned_axes="all") - -@fixture -def btbr_ones(): - data_out = [] - data_out.append(("Bt", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': 'Bt'}))) - data_out.append(("Br", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': 'Br'}))) - data_out.append(("alpha", NDCube(np.array([[0]])*u.degree, wcs=wcs))) - return NDCollection(data_out, meta={}, aligned_axes="all") - -@fixture -def btbr_ones_no_alpha(): - data_out = [] - data_out.append(("Bt", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': 'Bt'}))) - data_out.append(("Br", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': 'Br'}))) - # data_out.append(("alpha", NDCube(np.array([0])*u.degree, wcs=wcs))) - return NDCollection(data_out, meta={}, aligned_axes="all") - -@fixture -def stokes_ones(): - data_out = [] - data_out.append(("Bi", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': 'Bi'}))) - data_out.append(("Bq", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': 'Bq'}))) - data_out.append(("Bu", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': 'Bu'}))) - return NDCollection(data_out, meta={}, aligned_axes="all") - -@fixture -def bp3_ones(): - data_out = [] - data_out.append(("B", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': 'B'}))) - data_out.append(("pB", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': 'pB'}))) - data_out.append(("pBp", NDCube(np.array([[1]]), wcs=wcs, meta={'POLAR': 'pBp'}))) - data_out.append(("alpha", NDCube(np.array([[0]])*u.degree, wcs=wcs))) - return NDCollection(data_out, meta={}, aligned_axes="all") - -@fixture -def bp3_ones_no_alpha(): - data_out = [] - data_out.append(("B", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 'B'}))) - data_out.append(("pB", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 'pB'}))) - data_out.append(("pBp", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 'pBp'}))) - # data_out.append(("alpha", NDCube(np.array([0])*u.degree, wcs=wcs))) - return NDCollection(data_out, meta={}, aligned_axes="all") - -@fixture -def bthp_ones(): - data_out = [] - data_out.append(("B", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 'B'}))) - data_out.append(("theta", NDCube(np.array([1])*u.degree, wcs=wcs, meta={'POLAR': 'Theta'}))) - data_out.append(("p", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 'Degree of Polarization'}))) - return NDCollection(data_out, meta={}, aligned_axes="all") - -@fixture -def example_fail(): - data_out = [] - data_out.append(("B", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 'B'}))) - data_out.append(("Bm", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 'Bm'}))) - data_out.append(("alpha", NDCube(np.array([0])*u.degree, wcs=wcs))) - return NDCollection(data_out, meta={}, aligned_axes="all") def test_determine_image_shape(): - data_out = [] - data_out.append(("B", NDCube(np.zeros((20, 20)), wcs=wcs, meta={'POLAR': 'B'}))) + data_out = [("B", NDCube(np.zeros((20, 20)), wcs=wcs, meta={"POLAR": "B"}))] collection = NDCollection(data_out, meta={}, aligned_axes="all") assert _determine_image_shape(collection) == (20, 20) def test_add_alpha(bpb_ones_no_alpha): - data_out = [] - data_out.append(("B", NDCube(np.zeros((10, 10)), wcs=wcs, meta={'POLAR': 'B'}))) - data_out.append(("pB", NDCube(np.zeros((10, 10)), wcs=wcs, meta={'POLAR': 'pB'}))) + data_out = [("B", NDCube(np.zeros((10, 10)), wcs=wcs, meta={"POLAR": "B"})), + ("pB", NDCube(np.zeros((10, 10)), wcs=wcs, meta={"POLAR": "pB"}))] collection = NDCollection(data_out, meta={}, aligned_axes="all") assert "alpha" not in collection out = add_alpha(collection) @@ -164,29 +37,43 @@ def test_add_alpha(bpb_ones_no_alpha): @pytest.mark.parametrize( "fixture_name, kind", - [("npol_ones", "npol"), ("fourpol_ones", "fourpol"), ("mzp_ones", "mzp"), ("mzp_ones_alpha", "mzp"), - ("bpb_ones", "bpb"), ("bpb_ones_no_alpha", "bpb"), ("btbr_ones", "btbr"), ("btbr_ones_no_alpha", "btbr"), - ("stokes_ones", "stokes"), ("bp3_ones", "bp3"), ("bp3_ones_no_alpha", "bp3"), ("bthp_ones", "bthp"),] + [("npol_ones", System.npol), + ("fourpol_ones", System.fourpol), + ("mzp_ones", System.mzp), + ("mzp_ones_alpha", System.mzp), + ("bpb_ones", System.bpb), + ("bpb_ones_no_alpha", System.bpb), + ("btbr_ones", System.btbr), + ("btbr_ones_no_alpha", System.btbr), + ("stokes_ones", System.stokes), + ("bp3_ones", System.bp3), + ("bp3_ones_no_alpha", System.bp3), + ("bthp_ones", System.bthp)], ) def test_determine_input_kind(fixture_name, kind, request): assert determine_input_kind(request.getfixturevalue(fixture_name)) == kind def test_determine_input_kind_fail(example_fail): - with pytest.raises(ValueError): + with pytest.raises(UnsupportedTransformationError): determine_input_kind(example_fail) def test_check_all_paths_resolve(request): - for kind_source in VALID_KINDS.keys(): - for kind_target in VALID_KINDS.keys(): - if kind_source != kind_target: # don't need to convert if the input and output systems match + for source_system in System: + for target_system in System: + # source_system = System.npol + # target_system = System.bpb + if source_system != target_system: # don't need to convert if the input and output systems are the same try: - get_transform_path(kind_source, kind_target) + path = get_transform_path(source_system, target_system) except UnsupportedTransformationError: pass # skip any time the transform shouldn't be possible else: - result = resolve(request.getfixturevalue(f"{kind_source}_ones"), kind_target, out_angles=[0]) + print(source_system, target_system, path) + result = resolve(request.getfixturevalue(f"{source_system}_ones"), + target_system, + out_angles=[0]*u.deg) assert isinstance(result, NDCollection) diff --git a/tests/test_instruments.py b/tests/test_instruments.py index 0f6b90f..07d5398 100644 --- a/tests/test_instruments.py +++ b/tests/test_instruments.py @@ -9,7 +9,7 @@ def test_load_data(): TESTDATA_DIR = os.path.dirname(__file__) - path_to_test_files = TESTDATA_DIR+'/test_support_files/' + path_to_test_files = TESTDATA_DIR+"/test_support_files/" file_list=[path_to_test_files+"stereo_0.fts", path_to_test_files+"stereo_120.fts", path_to_test_files+"stereo_240.fts"] @@ -22,7 +22,7 @@ def test_load_data(): def test_load_data_fails_with_one_file(): TESTDATA_DIR = os.path.dirname(__file__) - path_to_test_files = TESTDATA_DIR+'/test_support_files/' + path_to_test_files = TESTDATA_DIR+"/test_support_files/" file_list=[path_to_test_files+"stereo_0.fts"] with pytest.raises(TooFewFilesError): out = load_data(file_list) @@ -30,13 +30,13 @@ def test_load_data_fails_with_one_file(): def test_get_instrument_mask(): TESTDATA_DIR = os.path.dirname(__file__) - path_to_test_files = TESTDATA_DIR+'/test_support_files/' + path_to_test_files = TESTDATA_DIR+"/test_support_files/" file_list=[path_to_test_files+"stereo_0.fts", path_to_test_files+"stereo_120.fts", path_to_test_files+"stereo_240.fts"] out = load_data(file_list) - mask = get_instrument_mask(out['B0.0'].meta) + mask = get_instrument_mask(out["B0.0"].meta) assert mask.dtype == bool assert mask.shape == (2048, 2048) @@ -44,11 +44,11 @@ def test_get_instrument_mask(): def test_use_instrument_mask_overrides_in_load(): TESTDATA_DIR = os.path.dirname(__file__) - path_to_test_files = TESTDATA_DIR+'/test_support_files/' + path_to_test_files = TESTDATA_DIR+"/test_support_files/" file_list=[path_to_test_files+"stereo_0.fts", path_to_test_files+"stereo_120.fts", path_to_test_files+"stereo_240.fts"] out = load_data(file_list, use_instrument_mask=True) - assert out['B0.0'].mask.dtype == bool - assert out['B0.0'].mask.shape == (2048, 2048) + assert out["B0.0"].mask.dtype == bool + assert out["B0.0"].mask.shape == (2048, 2048) diff --git a/tests/test_plotting.py b/tests/test_plotting.py index 913540c..ea945ac 100644 --- a/tests/test_plotting.py +++ b/tests/test_plotting.py @@ -6,38 +6,38 @@ def test_get_colormap_str_stereo(): TESTDATA_DIR = os.path.dirname(__file__) - path_to_test_files = TESTDATA_DIR+'/test_support_files/' + path_to_test_files = TESTDATA_DIR+"/test_support_files/" file_list=[path_to_test_files+"stereo_0.fts", path_to_test_files+"stereo_120.fts", path_to_test_files+"stereo_240.fts"] out = load_data(file_list) - found = get_colormap_str(out['B0.0'].meta) + found = get_colormap_str(out["B0.0"].meta) assert found == "stereocor2" def test_get_colormap_str_lasco(): TESTDATA_DIR = os.path.dirname(__file__) - path_to_test_files = TESTDATA_DIR+'/test_support_files/' + path_to_test_files = TESTDATA_DIR+"/test_support_files/" file_list=[path_to_test_files+"lasco_0.fts", path_to_test_files+"lasco_-60.fts", path_to_test_files+"lasco_+60.fts"] out = load_data(file_list) - found = get_colormap_str(out['B0.0'].meta) + found = get_colormap_str(out["B0.0"].meta) assert found == "soholasco2" def test_get_colormap_str_unknown(): - meta = {'INSTRUME': 'unknown'} + meta = {"INSTRUME": "unknown"} found = get_colormap_str(meta) assert found == "soholasco2" def test_plot_collection_runs(): - """ it's hard to test matplotlib things... so this at least makes sure it runs without error""" + """It's hard to test matplotlib things... so this at least makes sure it runs without error""" TESTDATA_DIR = os.path.dirname(__file__) - path_to_test_files = TESTDATA_DIR+'/test_support_files/' + path_to_test_files = TESTDATA_DIR+"/test_support_files/" file_list=[path_to_test_files+"stereo_0.fts", path_to_test_files+"stereo_120.fts", path_to_test_files+"stereo_240.fts"] diff --git a/tests/test_polarizers.py b/tests/test_polarizers.py index 819e53d..f8ad5ea 100644 --- a/tests/test_polarizers.py +++ b/tests/test_polarizers.py @@ -1,178 +1,175 @@ import astropy.units as u import astropy.wcs import numpy as np +import pytest from ndcube import NDCollection, NDCube from pytest import fixture -import solpolpy.polarizers as pol +import solpolpy.transforms as transforms +from solpolpy.errors import MissingAlphaError +from tests.fixtures import * wcs = astropy.wcs.WCS(naxis=3) -wcs.ctype = 'WAVE', 'HPLT-TAN', 'HPLN-TAN' -wcs.cunit = 'Angstrom', 'deg', 'deg' +wcs.ctype = "WAVE", "HPLT-TAN", "HPLN-TAN" +wcs.cunit = "Angstrom", "deg", "deg" wcs.cdelt = 0.2, 0.5, 0.4 wcs.crpix = 0, 2, 2 wcs.crval = 10, 0.5, 1 -wcs.cname = 'wavelength', 'HPC lat', 'HPC lon' - - -@fixture -def npol_mzp_zeros(): - data_out = [] - data_out.append(("Bp", NDCube(np.array([0]), wcs=wcs, meta={'POLAR': 60.0, 'OBSRVTRY': 'STEREO_A'}))) - data_out.append(("Bz", NDCube(np.array([0]), wcs=wcs, meta={'POLAR': 0.0, 'OBSRVTRY': 'STEREO_A'}))) - data_out.append(("Bm", NDCube(np.array([0]), wcs=wcs, meta={'POLAR': -60.0, 'OBSRVTRY': 'STEREO_A'}))) - data_out.append(("alpha", NDCube(np.array([0])*u.degree, wcs=wcs))) - return NDCollection(data_out, meta={}, aligned_axes="all") - - -def test_npol_mzp_zeros(npol_mzp_zeros): - actual = pol.npol_to_mzp(npol_mzp_zeros) - expected_data = [] - expected_data.append(("Bm", NDCube(np.array([0]), wcs=wcs))) - expected_data.append(("Bz", NDCube(np.array([0]), wcs=wcs))) - expected_data.append(("Bp", NDCube(np.array([0]), wcs=wcs))) - expected = NDCollection(expected_data, meta={}, aligned_axes="all") - for k in list(expected): - assert np.allclose(actual[str(k)].data, expected[str(k)].data) - - -@fixture -def npol_mzp_ones(): - data_out = [] - data_out.append(("Bm", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 60, 'OBSRVTRY': 'STEREO_B'}))) - data_out.append(("Bz", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 0, 'OBSRVTRY': 'STEREO_B'}))) - data_out.append(("Bp", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': -60, 'OBSRVTRY': 'STEREO_B'}))) - return NDCollection(data_out, meta={}, aligned_axes="all") - - -def test_npol_mzp_ones(npol_mzp_ones): - actual = pol.npol_to_mzp(npol_mzp_ones) - expected_data = [] - expected_data.append(("Bm", NDCube(np.array([1]), wcs=wcs))) - expected_data.append(("Bz", NDCube(np.array([1]), wcs=wcs))) - expected_data.append(("Bp", NDCube(np.array([1]), wcs=wcs))) - expected = NDCollection(expected_data, meta={}, aligned_axes="all") - for k in list(expected): - assert np.allclose(actual[str(k)].data, expected[str(k)].data) - - -@fixture -def mzp_ones_alpha(): - data_out = [] - data_out.append(("Bp", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 60, 'OBSRVTRY': 'LASCO'}))) - data_out.append(("Bz", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 0, 'OBSRVTRY': 'LASCO'}))) - data_out.append(("Bm", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': -60, 'OBSRVTRY': 'LASCO'}))) - data_out.append(("alpha", NDCube(np.array([0]), wcs=wcs))) - return NDCollection(data_out, meta={}, aligned_axes="all") - - -def test_npol_mzp_ones_alpha(mzp_ones_alpha): - actual = pol.npol_to_mzp(mzp_ones_alpha) - expected_data = [] - expected_data.append(("Bm", NDCube(np.array([1]), wcs=wcs))) - expected_data.append(("Bz", NDCube(np.array([1]), wcs=wcs))) - expected_data.append(("Bp", NDCube(np.array([1]), wcs=wcs))) - expected_data.append(("alpha", NDCube(np.array([0]), wcs=wcs))) - expected = NDCollection(expected_data, meta={}, aligned_axes="all") - for k in list(expected): - assert np.allclose(actual[str(k)].data, expected[str(k)].data) - - -@fixture -def mzp_zeros(): - data_out = [] - data_out.append(("Bm", NDCube(np.array([0]), wcs=wcs, meta={'POLAR': 60}))) - data_out.append(("Bz", NDCube(np.array([0]), wcs=wcs, meta={'POLAR': 0}))) - data_out.append(("Bp", NDCube(np.array([0]), wcs=wcs, meta={'POLAR': -60}))) - data_out.append(("alpha", NDCube(np.array([0])*u.degree, wcs=wcs))) - return NDCollection(data_out, meta={}, aligned_axes="all") - - -def test_mzp_bpb_zeros(mzp_zeros): - actual = pol.mzp_to_bpb(mzp_zeros) - expected_data = [] - expected_data.append(("B", NDCube(np.array([0]), wcs=wcs))) - expected_data.append(("pB", NDCube(np.array([0]), wcs=wcs))) - expected_data.append(("alpha", NDCube(np.array([0])*u.radian, wcs=wcs))) - expected = NDCollection(expected_data, meta={}, aligned_axes="all") - for k in list(expected): - assert np.allclose(actual[str(k)].data, expected[str(k)].data) - - -@fixture -def mzp_ones(): - data_out = [] - data_out.append(("Bm", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 60}))) - data_out.append(("Bz", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 0}))) - data_out.append(("Bp", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': -60}))) - data_out.append(("alpha", NDCube(np.array([0])*u.degree, wcs=wcs))) - return NDCollection(data_out, meta={}, aligned_axes="all") - - -def test_mzp_bpb_ones(mzp_ones): - actual = pol.mzp_to_bpb(mzp_ones) - expected_data = [] - expected_data.append(("B", NDCube(np.array([2]), wcs=wcs))) - expected_data.append(("pB", NDCube(np.zeros(1), wcs=wcs))) - expected_data.append(("alpha", NDCube(np.array([0])*u.radian, wcs=wcs))) - expected = NDCollection(expected_data, meta={}, aligned_axes="all") - for k in list(expected): - assert np.allclose(actual[str(k)].data, expected[str(k)].data) - - -@fixture -def bpb_zeros(): - data_out = [] - data_out.append(("B", NDCube(np.array([0]), wcs=wcs, meta={'POLAR': 'B'}))) - data_out.append(("pB", NDCube(np.array([0]), wcs=wcs, meta={'POLAR': 'pB'}))) - data_out.append(("alpha", NDCube(np.array([0])*u.degree, wcs=wcs))) - return NDCollection(data_out, meta={}, aligned_axes="all") - +wcs.cname = "wavelength", "HPC lat", "HPC lon" + +# +# @fixture() +# def npol_mzp_zeros(): +# data_out = [] +# data_out.append(("Bp", NDCube(np.array([0]), wcs=wcs, meta={"POLAR": 60.0, "OBSRVTRY": "STEREO_A"}))) +# data_out.append(("Bz", NDCube(np.array([0]), wcs=wcs, meta={"POLAR": 0.0, "OBSRVTRY": "STEREO_A"}))) +# data_out.append(("Bm", NDCube(np.array([0]), wcs=wcs, meta={"POLAR": -60.0, "OBSRVTRY": "STEREO_A"}))) +# data_out.append(("alpha", NDCube(np.array([0])*u.degree, wcs=wcs))) +# return NDCollection(data_out, meta={}, aligned_axes="all") +# +# +# def test_npol_mzp_zeros(npol_mzp_zeros): +# actual = transforms.npol_to_mzp(npol_mzp_zeros) +# expected_data = [] +# expected_data.append(("Bm", NDCube(np.array([0]), wcs=wcs))) +# expected_data.append(("Bz", NDCube(np.array([0]), wcs=wcs))) +# expected_data.append(("Bp", NDCube(np.array([0]), wcs=wcs))) +# expected = NDCollection(expected_data, meta={}, aligned_axes="all") +# for k in list(expected): +# assert np.allclose(actual[str(k)].data, expected[str(k)].data) +# +# +# @fixture() +# def npol_mzp_ones(): +# data_out = [] +# data_out.append(("Bm", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": 60, "OBSRVTRY": "STEREO_B"}))) +# data_out.append(("Bz", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": 0, "OBSRVTRY": "STEREO_B"}))) +# data_out.append(("Bp", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": -60, "OBSRVTRY": "STEREO_B"}))) +# return NDCollection(data_out, meta={}, aligned_axes="all") +# +# +# def test_npol_mzp_ones(npol_mzp_ones): +# actual = transforms.npol_to_mzp(npol_mzp_ones) +# expected_data = [] +# expected_data.append(("Bm", NDCube(np.array([1]), wcs=wcs))) +# expected_data.append(("Bz", NDCube(np.array([1]), wcs=wcs))) +# expected_data.append(("Bp", NDCube(np.array([1]), wcs=wcs))) +# expected = NDCollection(expected_data, meta={}, aligned_axes="all") +# for k in list(expected): +# assert np.allclose(actual[str(k)].data, expected[str(k)].data) +# +# +# @fixture() +# def mzp_ones_alpha(): +# data_out = [] +# data_out.append(("Bp", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": 60, "OBSRVTRY": "LASCO"}))) +# data_out.append(("Bz", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": 0, "OBSRVTRY": "LASCO"}))) +# data_out.append(("Bm", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": -60, "OBSRVTRY": "LASCO"}))) +# data_out.append(("alpha", NDCube(np.array([0]), wcs=wcs))) +# return NDCollection(data_out, meta={}, aligned_axes="all") +# +# +# def test_npol_mzp_ones_alpha(mzp_ones_alpha): +# actual = transforms.npol_to_mzp(mzp_ones_alpha) +# expected_data = [] +# expected_data.append(("Bm", NDCube(np.array([1]), wcs=wcs))) +# expected_data.append(("Bz", NDCube(np.array([1]), wcs=wcs))) +# expected_data.append(("Bp", NDCube(np.array([1]), wcs=wcs))) +# expected_data.append(("alpha", NDCube(np.array([0]), wcs=wcs))) +# expected = NDCollection(expected_data, meta={}, aligned_axes="all") +# for k in list(expected): +# assert np.allclose(actual[str(k)].data, expected[str(k)].data) +# +# +# @fixture() +# def mzp_zeros(): +# data_out = [] +# data_out.append(("Bm", NDCube(np.array([0]), wcs=wcs, meta={"POLAR": 60}))) +# data_out.append(("Bz", NDCube(np.array([0]), wcs=wcs, meta={"POLAR": 0}))) +# data_out.append(("Bp", NDCube(np.array([0]), wcs=wcs, meta={"POLAR": -60}))) +# data_out.append(("alpha", NDCube(np.array([0])*u.degree, wcs=wcs))) +# return NDCollection(data_out, meta={}, aligned_axes="all") +# +# +# def test_mzp_bpb_zeros(mzp_zeros): +# actual = transforms.mzp_to_bpb(mzp_zeros) +# expected_data = [] +# expected_data.append(("B", NDCube(np.array([0]), wcs=wcs))) +# expected_data.append(("pB", NDCube(np.array([0]), wcs=wcs))) +# expected_data.append(("alpha", NDCube(np.array([0])*u.radian, wcs=wcs))) +# expected = NDCollection(expected_data, meta={}, aligned_axes="all") +# for k in list(expected): +# assert np.allclose(actual[str(k)].data, expected[str(k)].data) +# +# +# @fixture() +# def mzp_ones(): +# data_out = [] +# data_out.append(("Bm", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": 60}))) +# data_out.append(("Bz", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": 0}))) +# data_out.append(("Bp", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": -60}))) +# data_out.append(("alpha", NDCube(np.array([0])*u.degree, wcs=wcs))) +# return NDCollection(data_out, meta={}, aligned_axes="all") +# +# +# def test_mzp_bpb_ones(mzp_ones): +# actual = transforms.mzp_to_bpb(mzp_ones) +# expected_data = [] +# expected_data.append(("B", NDCube(np.array([2]), wcs=wcs))) +# expected_data.append(("pB", NDCube(np.zeros(1), wcs=wcs))) +# expected_data.append(("alpha", NDCube(np.array([0])*u.radian, wcs=wcs))) +# expected = NDCollection(expected_data, meta={}, aligned_axes="all") +# for k in list(expected): +# assert np.allclose(actual[str(k)].data, expected[str(k)].data) +# +# + +# def test_bpb_mzp_zeros(bpb_zeros): - actual = pol.bpb_to_mzp(bpb_zeros) + actual = transforms.bpb_to_mzp(bpb_zeros) expected_data = [] - expected_data.append(("Bm", NDCube(np.array([0]), wcs=wcs))) - expected_data.append(("Bz", NDCube(np.array([0]), wcs=wcs))) - expected_data.append(("Bp", NDCube(np.array([0]), wcs=wcs))) + expected_data.append(("M", NDCube(np.array([0]), wcs=wcs))) + expected_data.append(("Z", NDCube(np.array([0]), wcs=wcs))) + expected_data.append(("P", NDCube(np.array([0]), wcs=wcs))) expected_data.append(("alpha", NDCube(np.array([0])*u.radian, wcs=wcs))) expected = NDCollection(expected_data, meta={}, aligned_axes="all") for k in list(expected): assert np.allclose(actual[str(k)].data, expected[str(k)].data) -@fixture +@fixture() def bpb_ones(): data_out = [] - data_out.append(("B", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 'B'}))) - data_out.append(("pB", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 'pB'}))) + data_out.append(("B", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": "B"}))) + data_out.append(("pB", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": "pB"}))) data_out.append(("alpha", NDCube(np.array([0])*u.degree, wcs=wcs))) return NDCollection(data_out, meta={}, aligned_axes="all") def test_bpb_mzp_ones(bpb_ones): - actual = pol.bpb_to_mzp(bpb_ones) + actual = transforms.bpb_to_mzp(bpb_ones) expected_data = [] - expected_data.append(("Bm", NDCube(np.array([3/4]), wcs=wcs))) - expected_data.append(("Bz", NDCube(np.array([0]), wcs=wcs))) - expected_data.append(("Bp", NDCube(np.array([3/4]), wcs=wcs))) + expected_data.append(("M", NDCube(np.array([3/4]), wcs=wcs))) + expected_data.append(("Z", NDCube(np.array([0]), wcs=wcs))) + expected_data.append(("P", NDCube(np.array([3/4]), wcs=wcs))) expected_data.append(("alpha", NDCube(np.array([0])*u.radian, wcs=wcs))) expected = NDCollection(expected_data, meta={}, aligned_axes="all") for k in list(expected): assert np.allclose(actual[str(k)].data, expected[str(k)].data) -@fixture +@fixture() def btbr_bpb_ones(): data_out = [] - data_out.append(("B", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 'B'}))) - data_out.append(("pB", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 'pB'}))) + data_out.append(("B", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": "B"}))) + data_out.append(("pB", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": "pB"}))) data_out.append(("alpha", NDCube(np.array([0])*u.degree, wcs=wcs))) return NDCollection(data_out, meta={}, aligned_axes="all") def test_bpb_btbr_ones(btbr_bpb_ones): - actual = pol.bpb_to_btbr(btbr_bpb_ones) + actual = transforms.bpb_to_btbr(btbr_bpb_ones) expected_data = [] expected_data.append(("Bt", NDCube(np.array([1]), wcs=wcs))) expected_data.append(("Br", NDCube(np.array([0]), wcs=wcs))) @@ -182,17 +179,17 @@ def test_bpb_btbr_ones(btbr_bpb_ones): assert np.allclose(actual[str(k)].data, expected[str(k)].data) -@fixture +@fixture() def btbr_ones(): data_out = [] - data_out.append(("Bt", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 'Bt'}))) - data_out.append(("Br", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 'Br'}))) + data_out.append(("Bt", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": "Bt"}))) + data_out.append(("Br", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": "Br"}))) data_out.append(("alpha", NDCube(np.array([0])*u.degree, wcs=wcs))) return NDCollection(data_out, meta={}, aligned_axes="all") def test_btbr_bpb_ones(btbr_ones): - actual = pol.btbr_to_bpb(btbr_ones) + actual = transforms.btbr_to_bpb(btbr_ones) expected_data = [] expected_data.append(("B", NDCube(np.array([2]), wcs=wcs))) expected_data.append(("pB", NDCube(np.array([0]), wcs=wcs))) @@ -203,98 +200,91 @@ def test_btbr_bpb_ones(btbr_ones): def test_btbr_mzp_ways(btbr_ones): - actual_mzp_direct = pol.btbr_to_mzp(btbr_ones) - actual_bpb = pol.btbr_to_bpb(btbr_ones) - actual_mzp_indirect = pol.bpb_to_mzp(actual_bpb) + actual_mzp_direct = transforms.btbr_to_mzp(btbr_ones) + actual_bpb = transforms.btbr_to_bpb(btbr_ones) + actual_mzp_indirect = transforms.bpb_to_mzp(actual_bpb) for k in list(actual_mzp_direct): assert np.allclose(actual_mzp_direct[str(k)].data, actual_mzp_indirect[str(k)].data) def test_mzp_stokes_ones(mzp_ones): - actual = pol.mzp_to_stokes(mzp_ones) + actual = transforms.mzp_to_stokes(mzp_ones) expected_data = [] - expected_data.append(("Bi", NDCube(np.array([2]), wcs=wcs))) - expected_data.append(("Bq", NDCube(np.array([0]), wcs=wcs))) - expected_data.append(("Bu", NDCube(np.array([0]), wcs=wcs))) + expected_data.append(("I", NDCube(np.array([2]), wcs=wcs))) + expected_data.append(("Q", NDCube(np.array([0]), wcs=wcs))) + expected_data.append(("U", NDCube(np.array([0]), wcs=wcs))) expected = NDCollection(expected_data, meta={}, aligned_axes="all") for k in list(expected): assert np.allclose(actual[str(k)].data, expected[str(k)].data) -@fixture +@fixture() def stokes_ones(): data_out = [] - data_out.append(("Bi", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 'Bi'}))) - data_out.append(("Bq", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 'Bq'}))) - data_out.append(("Bu", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 'Bu'}))) + data_out.append(("I", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": "Bi"}))) + data_out.append(("Q", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": "Bq"}))) + data_out.append(("U", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": "Bu"}))) return NDCollection(data_out, meta={}, aligned_axes="all") def test_stokes_mzp_ones(stokes_ones): - actual = pol.stokes_to_mzp(stokes_ones) + actual = transforms.stokes_to_mzp(stokes_ones) expected_data = [] - expected_data.append(("Bm", NDCube(np.array([(-np.sqrt(3)+1)/4]), wcs=wcs))) - expected_data.append(("Bz", NDCube(np.array([1]), wcs=wcs))) - expected_data.append(("Bp", NDCube(np.array([(np.sqrt(3)+1)/4]), wcs=wcs))) + expected_data.append(("M", NDCube(np.array([(-np.sqrt(3)+1)/4]), wcs=wcs))) + expected_data.append(("Z", NDCube(np.array([1]), wcs=wcs))) + expected_data.append(("P", NDCube(np.array([(np.sqrt(3)+1)/4]), wcs=wcs))) expected = NDCollection(expected_data, meta={}, aligned_axes="all") for k in list(expected): assert np.allclose(actual[str(k)].data, expected[str(k)].data) -def test_mzp_bp3_ones(mzp_ones): - actual = pol.mzp_to_bp3(mzp_ones) - expected_data = [] - expected_data.append(("B", NDCube(np.array([2]), wcs=wcs))) - expected_data.append(("pB", NDCube(np.zeros(1), wcs=wcs))) - expected_data.append(("pBp", NDCube(np.zeros(1), wcs=wcs))) - expected_data.append(("alpha", NDCube(np.array([0])*u.radian, wcs=wcs))) - expected = NDCollection(expected_data, meta={}, aligned_axes="all") - for k in list(expected): - assert np.allclose(actual[str(k)].data, expected[str(k)].data) +def test_mzp_bp3_missing_alpha_errors(mzp_ones): + with pytest.raises(MissingAlphaError): + transforms.mzp_to_bp3(mzp_ones) -@fixture +@fixture() def bp3_ones(): data_out = [] - data_out.append(("B", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 'B'}))) - data_out.append(("pB", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 'pB'}))) - data_out.append(("pBp", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 'pBp'}))) + data_out.append(("B", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": "B"}))) + data_out.append(("pB", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": "pB"}))) + data_out.append(("pBp", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": "pBp"}))) data_out.append(("alpha", NDCube(np.array([0])*u.degree, wcs=wcs))) return NDCollection(data_out, meta={}, aligned_axes="all") def test_bp3_mzp_ones(bp3_ones): - actual = pol.bp3_to_mzp(bp3_ones) + actual = transforms.bp3_to_mzp(bp3_ones) expected_data = [] - expected_data.append(("Bm", NDCube(np.array([1]), wcs=wcs))) - expected_data.append(("Bz", NDCube(np.array([-0.5]), wcs=wcs))) - expected_data.append(("Bp", NDCube(np.array([1]), wcs=wcs))) + expected_data.append(("M", NDCube(np.array([1]), wcs=wcs))) + expected_data.append(("Z", NDCube(np.array([-0.5]), wcs=wcs))) + expected_data.append(("P", NDCube(np.array([1]), wcs=wcs))) expected = NDCollection(expected_data, meta={}, aligned_axes="all") for k in list(expected): assert np.allclose(actual[str(k)].data, expected[str(k)].data) -@fixture +@fixture() def btbr_ones_mzp(): data_out = [] - data_out.append(("Bt", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 'Bt'}))) - data_out.append(("Br", NDCube(np.array([1]), wcs=wcs, meta={'POLAR': 'Br'}))) + data_out.append(("Bt", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": "Bt"}))) + data_out.append(("Br", NDCube(np.array([1]), wcs=wcs, meta={"POLAR": "Br"}))) data_out.append(("alpha", NDCube(np.array([0])*u.degree, wcs=wcs))) return NDCollection(data_out, meta={}, aligned_axes="all") def test_btbr_mzp_ones(btbr_ones_mzp): - actual = pol.btbr_to_mzp(btbr_ones_mzp) + actual = transforms.btbr_to_mzp(btbr_ones_mzp) expected_data = [] - expected_data.append(("Bm", NDCube(np.array([1]), wcs=wcs))) - expected_data.append(("Bz", NDCube(np.array([1]), wcs=wcs))) - expected_data.append(("Bp", NDCube(np.array([1]), wcs=wcs))) + expected_data.append(("M", NDCube(np.array([1]), wcs=wcs))) + expected_data.append(("Z", NDCube(np.array([1]), wcs=wcs))) + expected_data.append(("P", NDCube(np.array([1]), wcs=wcs))) expected = NDCollection(expected_data, meta={}, aligned_axes="all") for k in list(expected): assert np.allclose(actual[str(k)].data, expected[str(k)].data) def test_bp3_bthp_ones(bp3_ones): - actual = pol.bp3_to_bthp(bp3_ones) + actual = transforms.bp3_to_bthp(bp3_ones) expected_data = [] expected_data.append(("B", NDCube(np.array([1]), wcs=wcs))) expected_data.append(("theta", NDCube(np.array([5*np.pi/8]), wcs=wcs))) @@ -305,88 +295,87 @@ def test_bp3_bthp_ones(bp3_ones): def test_btbr_npol_ones(btbr_ones): - actual = pol.btbr_to_npol(btbr_ones,[0,120,240]) - expected_data = [] - expected_data.append(("B0", NDCube(np.array([1]), wcs=wcs))) - expected_data.append(("B120", NDCube(np.array([1]), wcs=wcs))) - expected_data.append(("B240", NDCube(np.array([1]), wcs=wcs))) - expected_data.append(("alpha", NDCube(np.array([0])*u.radian, wcs=wcs))) + actual = transforms.btbr_to_npol(btbr_ones, [0, 120, 240]*u.degree) + expected_data = [(str(0 * u.degree), NDCube(np.array([1]), wcs=wcs)), + (str(120 * u.degree), NDCube(np.array([1]), wcs=wcs)), + (str(240 * u.degree), NDCube(np.array([1]), wcs=wcs)), + ("alpha", NDCube(np.array([0]) * u.radian, wcs=wcs))] expected = NDCollection(expected_data, meta={}, aligned_axes="all") for k in list(expected): assert np.allclose(actual[str(k)].data, expected[str(k)].data) -def test_mzp_t_npol_ones(npol_mzp_ones): - actual = pol.mzp_to_npol(npol_mzp_ones, out_angles=[0]) - expected_data = [("B0", NDCube(np.array([1]), wcs=wcs))] +def test_mzp_t_npol_ones(mzp_ones): + actual = transforms.mzp_to_npol(mzp_ones, out_angles=[0]*u.degree) + expected_data = [(str(0 * u.degree), NDCube(np.array([1]), wcs=wcs))] expected = NDCollection(expected_data, meta={}, aligned_axes="all") for k in list(expected): assert np.allclose(actual[str(k)].data, expected[str(k)].data) -@fixture +@fixture() def fourpol_ones(): wcs = astropy.wcs.WCS(naxis=1) - wcs.ctype = 'ONE' - wcs.cunit = 'deg' + wcs.ctype = "ONE" + wcs.cunit = "deg" wcs.cdelt = 0.1 wcs.crpix = 0 wcs.crval = 0 - wcs.cname = 'ONE' + wcs.cname = "ONE" - data_out = [] - data_out.append(("B0", NDCube(data=np.array([1]), wcs=wcs, meta={'POLAR': 0}))) - data_out.append(("B45", NDCube(data=np.array([1]), wcs=wcs, meta={'POLAR': 45}))) - data_out.append(("B90", NDCube(data=np.array([1]), wcs=wcs, meta={'POLAR': 90}))) - data_out.append(("B135", NDCube(data=np.array([1]), wcs=wcs, meta={'POLAR': 135}))) - return NDCollection(key_data_pairs=data_out, meta={}, aligned_axes='all') + data_out = [(str(0 * u.degree), NDCube(data=np.array([1]), wcs=wcs, meta={"POLAR": 0})), + (str(45 * u.degree), NDCube(data=np.array([1]), wcs=wcs, meta={"POLAR": 45})), + (str(90 * u.degree), NDCube(data=np.array([1]), wcs=wcs, meta={"POLAR": 90})), + (str(135 * u.degree), NDCube(data=np.array([1]), wcs=wcs, meta={"POLAR": 135}))] + return NDCollection(key_data_pairs=data_out, meta={}, aligned_axes="all") def test_fourpol_to_stokes_ones(fourpol_ones): - actual = pol.fourpol_to_stokes(fourpol_ones) + actual = transforms.fourpol_to_stokes(fourpol_ones) expected_data = [] - expected_data.append(("Bi", NDCube(np.array([2]), wcs=wcs))) - expected_data.append(("Bq", NDCube(np.array([0]), wcs=wcs))) - expected_data.append(("Bu", NDCube(np.array([0]), wcs=wcs))) + expected_data.append(("I", NDCube(np.array([2]), wcs=wcs))) + expected_data.append(("Q", NDCube(np.array([0]), wcs=wcs))) + expected_data.append(("U", NDCube(np.array([0]), wcs=wcs))) expected = NDCollection(expected_data, meta={}, aligned_axes="all") for k in list(expected): assert np.allclose(actual[str(k)].data, expected[str(k)].data) def test_mask_propagation_works_when_none_provided(fourpol_ones): - actual = pol.fourpol_to_stokes(fourpol_ones) - expected_data = [] - expected_data.append(("Bi", NDCube(np.array([2]), wcs=wcs, mask=None))) - expected_data.append(("Bq", NDCube(np.array([0]), wcs=wcs, mask=None))) - expected_data.append(("Bu", NDCube(np.array([0]), wcs=wcs, mask=None))) - expected = NDCollection(expected_data, meta={}, aligned_axes="all") - for k in list(expected): - assert np.allclose(actual[str(k)].data, expected[str(k)].data) - assert actual[str(k)].mask is None + actual = transforms.fourpol_to_stokes(fourpol_ones) + expected = NDCollection( + [("I", NDCube(np.array([2]), wcs=wcs, mask=None)), + ("Q", NDCube(np.array([0]), wcs=wcs, mask=None)), + ("U", NDCube(np.array([0]), wcs=wcs, mask=None))], + meta={}, + aligned_axes="all") + for key in list(expected): + assert np.allclose(actual[key].data, expected[key].data) + assert actual[key].mask is None def test_mask_propagation_works_mixed_normal_case(): # set up some data with mixed masks wcs = astropy.wcs.WCS(naxis=1) - wcs.ctype = 'ONE' - wcs.cunit = 'deg' + wcs.ctype = "ONE" + wcs.cunit = "deg" wcs.cdelt = 0.1 wcs.crpix = 0 wcs.crval = 0 - wcs.cname = 'ONE' + wcs.cname = "ONE" - data_out = [] - data_out.append(("B0", NDCube(data=np.array([1]), wcs=wcs, meta={'POLAR': 0}, mask=np.zeros(1, dtype=bool)))) - data_out.append(("B45", NDCube(data=np.array([1]), wcs=wcs, meta={'POLAR': 45}, mask=np.ones(1, dtype=bool)))) - data_out.append(("B90", NDCube(data=np.array([1]), wcs=wcs, meta={'POLAR': 90}, mask=np.zeros(1, dtype=bool)))) - data_out.append(("B135", NDCube(data=np.array([1]), wcs=wcs, meta={'POLAR': 135}, mask=np.zeros(1, dtype=bool)))) - fourpol_ones = NDCollection(key_data_pairs=data_out, meta={}, aligned_axes='all') + data_out = [ + (str(0 * u.degree), NDCube(data=np.array([1]), wcs=wcs, meta={"POLAR": 0}, mask=np.zeros(1, dtype=bool))), + (str(45 * u.degree), NDCube(data=np.array([1]), wcs=wcs, meta={"POLAR": 45}, mask=np.ones(1, dtype=bool))), + (str(90 * u.degree), NDCube(data=np.array([1]), wcs=wcs, meta={"POLAR": 90}, mask=np.zeros(1, dtype=bool))), + (str(135 * u.degree), NDCube(data=np.array([1]), wcs=wcs, meta={"POLAR": 135}, mask=np.zeros(1, dtype=bool)))] + fourpol_ones = NDCollection(key_data_pairs=data_out, meta={}, aligned_axes="all") - actual = pol.fourpol_to_stokes(fourpol_ones) + actual = transforms.fourpol_to_stokes(fourpol_ones) expected_data = [] - expected_data.append(("Bi", NDCube(np.array([2]), wcs=wcs, mask=np.ones(1, dtype=bool)))) - expected_data.append(("Bq", NDCube(np.array([0]), wcs=wcs, mask=np.ones(1, dtype=bool)))) - expected_data.append(("Bu", NDCube(np.array([0]), wcs=wcs, mask=np.ones(1, dtype=bool)))) + expected_data.append(("I", NDCube(np.array([2]), wcs=wcs, mask=np.ones(1, dtype=bool)))) + expected_data.append(("Q", NDCube(np.array([0]), wcs=wcs, mask=np.ones(1, dtype=bool)))) + expected_data.append(("U", NDCube(np.array([0]), wcs=wcs, mask=np.ones(1, dtype=bool)))) expected = NDCollection(expected_data, meta={}, aligned_axes="all") for k in list(expected): assert np.allclose(actual[str(k)].data, expected[str(k)].data) @@ -396,26 +385,26 @@ def test_mask_propagation_works_mixed_normal_case(): def test_mask_propagation_works_all_false_normal_case(): # set up some data with mixed masks wcs = astropy.wcs.WCS(naxis=1) - wcs.ctype = 'ONE' - wcs.cunit = 'deg' + wcs.ctype = "ONE" + wcs.cunit = "deg" wcs.cdelt = 0.1 wcs.crpix = 0 wcs.crval = 0 - wcs.cname = 'ONE' + wcs.cname = "ONE" - data_out = [] - data_out.append(("B0", NDCube(data=np.array([1]), wcs=wcs, meta={'POLAR': 0}, mask=np.zeros(1, dtype=bool)))) - data_out.append(("B45", NDCube(data=np.array([1]), wcs=wcs, meta={'POLAR': 45}, mask=np.zeros(1, dtype=bool)))) - data_out.append(("B90", NDCube(data=np.array([1]), wcs=wcs, meta={'POLAR': 90}, mask=np.zeros(1, dtype=bool)))) - data_out.append(("B135", NDCube(data=np.array([1]), wcs=wcs, meta={'POLAR': 135}, mask=np.zeros(1, dtype=bool)))) - fourpol_ones = NDCollection(key_data_pairs=data_out, meta={}, aligned_axes='all') + data_out = [ + (str(0 * u.degree), NDCube(data=np.array([1]), wcs=wcs, meta={"POLAR": 0}, mask=np.zeros(1, dtype=bool))), + (str(45 * u.degree), NDCube(data=np.array([1]), wcs=wcs, meta={"POLAR": 45}, mask=np.ones(1, dtype=bool))), + (str(90 * u.degree), NDCube(data=np.array([1]), wcs=wcs, meta={"POLAR": 90}, mask=np.zeros(1, dtype=bool))), + (str(135 * u.degree), NDCube(data=np.array([1]), wcs=wcs, meta={"POLAR": 135}, mask=np.zeros(1, dtype=bool)))] + fourpol_ones = NDCollection(key_data_pairs=data_out, meta={}, aligned_axes="all") - actual = pol.fourpol_to_stokes(fourpol_ones) + actual = transforms.fourpol_to_stokes(fourpol_ones) expected_data = [] - expected_data.append(("Bi", NDCube(np.array([2]), wcs=wcs, mask=np.zeros(1, dtype=bool)))) - expected_data.append(("Bq", NDCube(np.array([0]), wcs=wcs, mask=np.zeros(1, dtype=bool)))) - expected_data.append(("Bu", NDCube(np.array([0]), wcs=wcs, mask=np.zeros(1, dtype=bool)))) + expected_data.append(("I", NDCube(np.array([2]), wcs=wcs, mask=np.ones(1, dtype=bool)))) + expected_data.append(("Q", NDCube(np.array([0]), wcs=wcs, mask=np.ones(1, dtype=bool)))) + expected_data.append(("U", NDCube(np.array([0]), wcs=wcs, mask=np.ones(1, dtype=bool)))) expected = NDCollection(expected_data, meta={}, aligned_axes="all") for k in list(expected): assert np.allclose(actual[str(k)].data, expected[str(k)].data) @@ -425,26 +414,25 @@ def test_mask_propagation_works_all_false_normal_case(): def test_mask_propagation_works_when_not_all_specified(fourpol_ones): # set up some data with mixed masks wcs = astropy.wcs.WCS(naxis=1) - wcs.ctype = 'ONE' - wcs.cunit = 'deg' + wcs.ctype = "ONE" + wcs.cunit = "deg" wcs.cdelt = 0.1 wcs.crpix = 0 wcs.crval = 0 - wcs.cname = 'ONE' + wcs.cname = "ONE" - data_out = [] - data_out.append(("B0", NDCube(data=np.array([1]), wcs=wcs, meta={'POLAR': 0}))) - data_out.append(("B45", NDCube(data=np.array([1]), wcs=wcs, meta={'POLAR': 45}, mask=np.ones(1, dtype=bool)))) - data_out.append(("B90", NDCube(data=np.array([1]), wcs=wcs, meta={'POLAR': 90}, mask=np.zeros(1, dtype=bool)))) - data_out.append(("B135", NDCube(data=np.array([1]), wcs=wcs, meta={'POLAR': 135}, mask=np.zeros(1, dtype=bool)))) - fourpol_ones = NDCollection(key_data_pairs=data_out, meta={}, aligned_axes='all') + data_out = [ + (str(0 * u.degree), NDCube(data=np.array([1]), wcs=wcs, meta={"POLAR": 0})), + (str(45 * u.degree), NDCube(data=np.array([1]), wcs=wcs, meta={"POLAR": 45}, mask=np.ones(1, dtype=bool))), + (str(90 * u.degree), NDCube(data=np.array([1]), wcs=wcs, meta={"POLAR": 90}, mask=np.zeros(1, dtype=bool))), + (str(135 * u.degree), NDCube(data=np.array([1]), wcs=wcs, meta={"POLAR": 135}, mask=np.zeros(1, dtype=bool)))] + fourpol_ones = NDCollection(key_data_pairs=data_out, meta={}, aligned_axes="all") - actual = pol.fourpol_to_stokes(fourpol_ones) + actual = transforms.fourpol_to_stokes(fourpol_ones) - expected_data = [] - expected_data.append(("Bi", NDCube(np.array([2]), wcs=wcs, mask=np.ones(1, dtype=bool)))) - expected_data.append(("Bq", NDCube(np.array([0]), wcs=wcs, mask=np.ones(1, dtype=bool)))) - expected_data.append(("Bu", NDCube(np.array([0]), wcs=wcs, mask=np.ones(1, dtype=bool)))) + expected_data = [("I", NDCube(np.array([2]), wcs=wcs, mask=np.ones(1, dtype=bool))), + ("Q", NDCube(np.array([0]), wcs=wcs, mask=np.ones(1, dtype=bool))), + ("U", NDCube(np.array([0]), wcs=wcs, mask=np.ones(1, dtype=bool)))] expected = NDCollection(expected_data, meta={}, aligned_axes="all") for k in list(expected): assert np.allclose(actual[str(k)].data, expected[str(k)].data)