Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensemble reduction and changes to Ensembles #63

Merged
merged 43 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
939d930
reduce.py and improvements to ensembles
RondeauG Sep 1, 2022
725220e
Merge branch 'main' into reduce
RondeauG Sep 1, 2022
1ca0b44
Docstrings
RondeauG Sep 1, 2022
9988a9d
bugfixes
RondeauG Sep 7, 2022
f4e74d2
Merge branch 'main' into reduce
RondeauG Sep 7, 2022
61b8e9b
Merge branch 'main' into reduce
Zeitsperre Sep 8, 2022
3683435
Update xscen/utils.py
RondeauG Sep 12, 2022
55d8693
bugfixes and suggestions from code review
RondeauG Sep 12, 2022
21545e6
fixed docstrings
RondeauG Sep 12, 2022
98795bc
Update xscen/ensembles.py
RondeauG Sep 12, 2022
14a42d0
Update xscen/ensembles.py
RondeauG Sep 12, 2022
1e12a4d
Update xscen/ensembles.py
RondeauG Sep 12, 2022
633255e
Update xscen/ensembles.py
RondeauG Sep 12, 2022
1c95246
suggestions from code review
RondeauG Sep 12, 2022
ddb86c5
fixed imports
RondeauG Sep 12, 2022
31698e8
rename xrkwargs to common_attrs_open_kwargs
RondeauG Sep 12, 2022
acaffba
Merge branch 'main' into reduce
RondeauG Sep 12, 2022
708e819
upd HISTORY
RondeauG Sep 12, 2022
dd0c9c0
id generation in common_attrs_only
RondeauG Sep 12, 2022
92e4808
Update xscen/utils.py
RondeauG Sep 13, 2022
5f1831d
Update xscen/ensembles.py
RondeauG Sep 15, 2022
30aba84
Update xscen/ensembles.py
RondeauG Sep 15, 2022
4866e73
Update xscen/ensembles.py
RondeauG Sep 15, 2022
e59e02c
Update xscen/ensembles.py
RondeauG Sep 15, 2022
db72851
Update xscen/reduce.py
RondeauG Sep 15, 2022
aa9282b
Merge branch 'main' into reduce
RondeauG Sep 15, 2022
656637e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 15, 2022
d3f62c6
Update xscen/ensembles.py
RondeauG Sep 15, 2022
b8f19a7
apply suggestions from code review
RondeauG Sep 15, 2022
4289708
Merge branch 'reduce' of github.com:Ouranosinc/xscen into reduce
RondeauG Sep 15, 2022
6902b1e
Update xscen/ensembles.py
RondeauG Sep 15, 2022
53593cb
Merge branch 'reduce' of github.com:Ouranosinc/xscen into reduce
RondeauG Sep 15, 2022
281791b
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 15, 2022
23f7bff
fixed imports
RondeauG Sep 15, 2022
090f94b
Merge branch 'reduce' of github.com:Ouranosinc/xscen into reduce
RondeauG Sep 15, 2022
06d1b44
small fix
RondeauG Sep 15, 2022
ebbddd9
small fix
RondeauG Sep 15, 2022
63b1e69
upd notebooks and docs
RondeauG Sep 15, 2022
173db33
default typing for clusters and fig_data
RondeauG Sep 15, 2022
375fd96
Update xscen/utils.py
RondeauG Sep 15, 2022
7a9543f
removed redundant function
RondeauG Sep 15, 2022
ca976d5
prefix in get_cat_attrs
RondeauG Sep 15, 2022
9e7a387
Merge branch 'main' into reduce
RondeauG Sep 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions xscen/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
extract,
indicators,
io,
reduce,
regrid,
scripting,
utils,
Expand All @@ -27,6 +28,7 @@
from .extract import extract_dataset, search_data_catalogs # noqa
from .indicators import compute_indicators # noqa
from .io import save_to_netcdf, save_to_zarr # noqa
from .reduce import build_reduction_data, reduce_ensemble
from .regrid import *
from .scripting import (
TimeoutException,
Expand Down
56 changes: 54 additions & 2 deletions xscen/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"generate_id",
"parse_directory",
"parse_from_ds",
"unstack_id",
]


Expand Down Expand Up @@ -1184,20 +1185,71 @@ def _parse_date(date, fmts):
return date


def generate_id(df: pd.DataFrame, id_columns: Optional[list] = None):
def generate_id(df: Union[pd.DataFrame, xr.Dataset], id_columns: Optional[list] = None):
"""Utility to create an ID from column entries.

Parameters
----------
df: pd.DataFrame
df: pd.DataFrame, xr.Dataset
Data for which to create an ID.
id_columns : list
List of column names on which to base the dataset definition. Empty columns will be skipped.
If None (default), uses :py:data:`ID_COLUMNS`.
"""
if isinstance(df, xr.Dataset):
df = pd.DataFrame.from_dict(
{
key[4:]: [value]
for key, value in df.attrs.items()
if key.startswith("cat:")
}
)

id_columns = [x for x in (id_columns or ID_COLUMNS) if x in df.columns]

return df[id_columns].apply(
lambda row: "_".join(map(str, filter(pd.notna, row.values))), axis=1
)


def unstack_id(df: Union[pd.DataFrame, ProjectCatalog, DataCatalog]) -> dict:
"""
Utility that reverse-engineers an ID using catalog entries.

Parameters
----------
df: Union[pd.DataFrame, ProjectCatalog, DataCatalog]
Either a Project/DataCatalog or the pandas DataFrame.

Returns
-------
dict
Dictionary with one entry per unique ID, which are themselves dictionaries of all the individual parts of the ID.
"""

if isinstance(df, (ProjectCatalog, DataCatalog)):
df = df.df

out = {}
for ids in pd.unique(df["id"]):
subset = df[df["id"] == ids]

# Only keep relevant columns
subset = subset[
[
col
for col in subset.columns
if bool(re.search(f"((_)|(^)){str(subset[col].iloc[0])}((_)|($))", ids))
]
].drop("id", axis=1)

# Make sure that all elements are the same, if there are multiple lines
if len(subset) > 1:
if not all([subset[col].is_unique for col in subset.columns]):
raise ValueError(
"Not all elements of the columns are the same for a given ID!"
)

out[ids] = {attr: subset[attr].iloc[0] for attr in subset.columns}

return out
221 changes: 177 additions & 44 deletions xscen/ensembles.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,36 @@
import inspect
import logging
from copy import deepcopy
from pathlib import Path
from typing import Any
from typing import Any, Union

import pandas as pd
import numpy as np
import xarray as xr
import xclim as xc
from xclim import ensembles

from .catalog import generate_id # ProjectCatalog
from .catalog import generate_id
from .config import parse_config
from .utils import clean_up
RondeauG marked this conversation as resolved.
Show resolved Hide resolved

logger = logging.getLogger(__name__)

__all__ = ["ensemble_stats"]
__all__ = ["ensemble_stats", "generate_weights"]


@parse_config
def ensemble_stats(
datasets: Any,
statistics: dict,
*,
create_kwargs: dict = None,
statistics: str = "ensemble_percentiles",
stats_kwargs: dict = None,
weights: xr.DataArray = None,
common_attrs_only: bool = True,
to_level: str = "ensemble",
stats_kwargs=None,
) -> xr.Dataset:
"""
Create ensemble and calculate statistics on it.
Creates an ensemble and computes statistics on it.

Parameters
----------
Expand All @@ -31,13 +39,13 @@ def ensemble_stats(
A dictionary can be passed instead of a list, in which case the keys are used as coordinates along the new
`realization` axis.
Tip: With a project catalog, you can do: `datasets = pcat.search(**search_dict).to_dataset_dict()`.
statistics: dict
xclim.ensembles statistics to be called. Dictionary in the format {function: arguments}.
If a function requires 'ref', the dictionary entry should be the inputs of a .loc[], e.g. {"ref": {"horizons": "1981-2010"}}
create_kwargs: dict
Dictionary of arguments for xclim.ensembles.create_ensemble.
statistics: str
Name of the xclim.ensemble function to call on the ensemble
(e.g. "ensemble_percentiles","ensemble_mean_std_max_min")
stats_kwargs: dict
RondeauG marked this conversation as resolved.
Show resolved Hide resolved
Dictionary of arguments for the statistics function.
weights: xr.DataArray
Weights to apply along the 'realization' dimension. This array cannot contain missing values.
common_attrs_only:
If True, keeps only the global attributes that are the same for all datasets and generate new id.
If False, keeps global attrs of the first dataset (same behaviour as xclim.ensembles.create_ensemble)
Expand All @@ -49,11 +57,17 @@ def ensemble_stats(
xr.Dataset
Dataset with ensemble statistics
"""
if stats_kwargs is None:
stats_kwargs = {}
if create_kwargs is None:
create_kwargs = {}
logger.info(f"Creating ensemble with {len(datasets)} and calculating {statistics}.")
create_kwargs = create_kwargs or {}
logger.info(
f"Creating ensemble with {len(datasets)} simulations and calculating {statistics}."
)

if isinstance(statistics, str) and isinstance(stats_kwargs, dict):
logger.warning(
"DeprecationWarning: The usage of 'statistics: str' with 'stats_kwargs: dict' will be abandoned. Please use 'statistics: dict' instead."
)
RondeauG marked this conversation as resolved.
Show resolved Hide resolved
statistics = {statistics: stats_kwargs}
stats_kwargs = None

# if input files are .zarr, change the engine automatically
if isinstance(datasets, list) and isinstance(datasets[0], (str, Path)):
Expand All @@ -62,38 +76,157 @@ def ensemble_stats(
create_kwargs["engine"] = "zarr"

ens = ensembles.create_ensemble(datasets, **create_kwargs)
ens_stats = getattr(ensembles, statistics)(ens, **stats_kwargs)

ens_stats = xr.Dataset(attrs=ens.attrs)
for stat in statistics.keys():
stats_kwargs = deepcopy(statistics.get(stat, None) or {})
RondeauG marked this conversation as resolved.
Show resolved Hide resolved
if (
weights is not None
and "weights" in inspect.getfullargspec(getattr(ensembles, stat))[0]
):
stats_kwargs["weights"] = weights.reindex_like(ens.realization)
if "ref" in stats_kwargs:
stats_kwargs["ref"] = ens.loc[stats_kwargs["ref"]]

if stat == "change_significance":
for v in ens.data_vars:
with xc.set_options(keep_attrs=True):
RondeauG marked this conversation as resolved.
Show resolved Hide resolved
deltak = ens[v].attrs.get("delta_kind", None)
if stats_kwargs.get("ref", None):
RondeauG marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(
"{v} is a delta, but 'ref' was still specified."
)
if deltak in ["relative", "*", "/"]:
logging.info(
"Relative delta detected for {v}. Applying 'v - 1' before change_significance."
)
ens_v = ens[v] - 1
else:
ens_v = ens[v]
ens_stats[f"{v}_change_frac"], ens_stats[f"{v}_pos_frac"] = getattr(
ensembles, stat
)(ens_v, **stats_kwargs)
else:
ens_stats = ens_stats.merge(getattr(ensembles, stat)(ens, **stats_kwargs))

# delete the realization coordinate if there
if "realization" in ens_stats:
ens_stats = ens_stats.drop_vars("realization")

# delete attrs that are not common to all dataset
if common_attrs_only:
for i in range(len(datasets)):
if isinstance(datasets[i], (str, Path)):
# if they exist remove attrs specific to create_ensemble
create_kwargs.pop("mf_flag", None)
create_kwargs.pop("resample_freq", None)
create_kwargs.pop("calendar", None)
create_kwargs.pop("preprocess", None)
ds = xr.open_dataset(datasets[i], **create_kwargs)
else:
ds = datasets[i]
attributes = ens_stats.attrs.copy()
for a_key, a_val in attributes.items():
if (
(a_key not in ds.attrs)
or (a_key in ["cat:date_start", "cat:date_end"])
or (a_val != ds.attrs[a_key])
):
del ens_stats.attrs[a_key]
# create dataframe of catalogue attrs to generate new id
df = pd.DataFrame.from_dict(
{
key[4:]: [value]
for key, value in ens_stats.attrs.items()
if key.startswith("cat:")
}
# if they exist remove attrs specific to create_ensemble
create_kwargs.pop("mf_flag", None)
create_kwargs.pop("resample_freq", None)
create_kwargs.pop("calendar", None)
create_kwargs.pop("preprocess", None)

ens_stats = clean_up(
ds=ens_stats, common_attrs_only=datasets, xrkwargs=create_kwargs
)
ens_stats.attrs["cat:id"] = generate_id(df)[0]

# generate new id
ens_stats.attrs["cat:id"] = generate_id(ens_stats).iloc[0]

ens_stats.attrs["cat:processing_level"] = to_level

return ens_stats


def generate_weights(
datasets: Union[dict, list],
independence_level: str = "all",
) -> xr.DataArray:
"""
Uses realization attributes to automatically generate weights along the 'realization' dimension.
RondeauG marked this conversation as resolved.
Show resolved Hide resolved

Parameters
----------
datasets: dict
List of Dataset objects that will be included in the ensemble.
The datasets should include attributes to help recognize them - 'cat:activity','cat:source', and 'cat:driving_model' for regional models.
A dictionary can be passed instead of a list, in which case the keys are used for the 'realization' coordinate.
Tip: With a project catalog, you can do: `datasets = pcat.search(**search_dict).to_dataset_dict()`.
independence_level: str
'all': Weights using the method '1 model - 1 Vote', where every unique combination of 'source' and 'driving_model' is considered a model.
'GCM': Weights using the method '1 GCM - 1 Vote'

Returns
-------
xr.DataArray
Weights along the 'realization' dimension.
"""

# TODO: 2-D weights along the horizon dimension

if (isinstance(datasets, list)) and (isinstance(datasets[0], (str, Path))):
raise ValueError(
"explicit weights are required if the dataset is a list of paths"
)

# Use metadata to identify the simulation attributes
info = {}
keys = datasets.keys() if isinstance(datasets, dict) else range(len(datasets))
for key in keys:
info[key] = {
attr.replace("cat:", ""): datasets[key].attrs[attr]
for attr in datasets[key].attrs
if "cat:" in attr
}
RondeauG marked this conversation as resolved.
Show resolved Hide resolved

# Prepare an array of 0s, with size == nb. realization
weights = xr.DataArray(
[0] * len(keys), coords={"realization": ("realization", list(keys))}
)

for r in weights.realization:
r = r.item()
RondeauG marked this conversation as resolved.
Show resolved Hide resolved

# Weight == 0 means it hasn't been processed yet
if weights.sel(realization=r) == 0:
sim = info[r]

# Exact group corresponding to the current simulation
group = [
k
for k in info.keys()
if (info[k].get("source", None) == sim.get("source", None))
and (
info[k].get("driving_model", None) == sim.get("driving_model", None)
)
and (info[k].get("activity", None) == sim.get("activity", None))
]

if independence_level == "GCM":
if ("driving_model" in sim.keys()) and not (
str(sim.get("driving_model", None)) in (["nan", "None"])
RondeauG marked this conversation as resolved.
Show resolved Hide resolved
):
gcm = sim.get("driving_model", None)
else:
gcm = sim.get("source", None)

# Global models
group_g = [k for k in info.keys() if info[k].get("source", None) == gcm]
# Regional models with the same GCM
group_r = [
k for k in info.keys() if info[k].get("driving_model", None) == gcm
]
RondeauG marked this conversation as resolved.
Show resolved Hide resolved

# Divide the weight equally between the GCMs and RCMs
divisor = 1 / ((len(group_g) > 0) + (len(group_r) > 0))

# For regional models, divide between them
if r in group_r:
divisor = divisor / len(
np.unique([info[k].get("source", None) for k in group_r])
)
RondeauG marked this conversation as resolved.
Show resolved Hide resolved

elif independence_level == "all":
divisor = 1

weights = weights.where(
~weights.realization.isin(group),
divisor / len(group),
)

return weights
Loading