Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Ouranos folder structure #127

Closed
wants to merge 14 commits into from
2 changes: 1 addition & 1 deletion miranda/cv.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pyessv

VALIDATION_ENABLED = True
except OSError:
except (OSError, ModuleNotFoundError):
warnings.warn(
"Source files for pyessv-archive files not present. Data validation checks will be skipped."
)
Expand Down
8 changes: 8 additions & 0 deletions miranda/decode/_time.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from logging import config

from pandas import Timedelta
from pandas._libs.tslibs import NaTType # noqa

from miranda.scripting import LOGGING_CONFIG
Expand All @@ -12,6 +13,7 @@
"FREQUENCY_TO_POTENTIAL_TIME_UNITS",
"TIME_UNITS_TO_FREQUENCY",
"TIME_UNITS_TO_TIMEDELTA",
"freq_to_timedelta",
]

TIME_UNITS_TO_FREQUENCY = {
Expand Down Expand Up @@ -65,6 +67,7 @@
FREQUENCY_TO_POTENTIAL_TIME_UNITS.setdefault(value, list()).append(key)

TIME_UNITS_TO_TIMEDELTA = {
"fx": "nan",
"hourly": "1h",
"hours": "1h",
"hour": "1h",
Expand Down Expand Up @@ -105,5 +108,10 @@
}


def freq_to_timedelta(freq: str) -> Timedelta:
"""Convert a frequency string to a representative timedelta object."""
return Timedelta(TIME_UNITS_TO_TIMEDELTA[freq])


class DecoderError(Exception):
pass
219 changes: 129 additions & 90 deletions miranda/structure/_structure.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
import hashlib
import logging.config
import multiprocessing
import operator as op
import os
import shutil
import sys
from functools import partial
from functools import partial, reduce
from pathlib import Path
from types import GeneratorType
from typing import Dict, List, Optional, Union
from typing import Dict, List, Optional, Tuple, Union

import yaml
from pandas import isna
from schema import SchemaError

from miranda.convert.utils import date_parser
from miranda.cv import VALIDATION_ENABLED
from miranda.decode import Decoder, DecoderError, guess_project
from miranda.decode import Decoder, DecoderError, freq_to_timedelta, guess_project
from miranda.io import discover_data
from miranda.scripting import LOGGING_CONFIG

Expand Down Expand Up @@ -154,7 +157,7 @@ def create_version_hash_files(

def _structure_datasets(
in_file: Union[str, os.PathLike],
out_path: Union[str, os.PathLike],
out_file: Union[str, os.PathLike],
method: str,
dry_run: bool = False,
):
Expand All @@ -163,7 +166,7 @@ def _structure_datasets(

if method.lower() in ["move", "copy"]:
meth = "Moved" if method.lower() == "move" else "Copied"
output_file = Path(out_path).joinpath(in_file.name)
output_file = Path(out_file)
try:
if not dry_run:
method_mod = ""
Expand All @@ -181,9 +184,97 @@ def _structure_datasets(
print(f"{in_file.name} already exists at location. Continuing...")


def _parse_option(option: dict, facets: dict):
"""Parse an option element of the facet schema tree."""
facet_value = facets[option["option"]]
if "value" in option:
if isinstance(option["value"], str):
answer = facet_value == option["value"]
else: # A list
answer = facet_value in option["value"]
else:
answer = not isna(facet_value)

if "is_true" in option and answer:
return option["is_true"]
if "else" in option and not answer:
return option["else"]
return answer


def _parse_level(schema: Union[dict, str], facets: dict):
if isinstance(schema, str):
# A single facet:
if isna(facets[schema]):
raise ValueError(f"Schema requires a value for facet {schema}.")
return facets[schema]
if "option" in schema:
answer = _parse_option(schema, facets)
if isinstance(answer, bool) and not answer:
# Test failed with no "else" value, we skip this level.
return None
return _parse_level(answer, facets)
if "concat" in schema:
parts = []
for element in schema["concat"]:
part = _parse_level(element, facets)
if not isna(part):
parts.append(part)
return "_".join(parts)
if "text" in schema:
return schema["text"]
raise ValueError(f"Invalid schema : {schema}")


def _parse_dates(facets):
if facets["frequency"] == "fx":
return "fx"

start = date_parser(facets["date_start"], output_type="datetime")
end = date_parser(facets["date_end"], output_type="datetime")
freq = freq_to_timedelta(facets["frequency"])

# Full years : Starts on Jan 1st and is either annual or ends on Dec 31st (accepting Dec 30 for 360 cals)
if (
start.month == 1
and start.day == 1
and (freq >= freq_to_timedelta("yr") or (end.month == 12 and end.day > 29))
):
if start.year == end.year:
return f"{start:%Y}"
return f"{start:%Y}-{end:%Y}"
# Full months : Starts on the 1st and is either montly or ends on the last day
if start.day == 1 and (freq >= freq_to_timedelta("mon") or end.day > 27):
# Full months
if (start.year, start.month) == (end.year, end.month):
return f"{start:%Y%m}"
return f"{start:%Y%m}-{end:%Y%m}"
# The full range
return f"{start:%Y%m%d}-{end:%Y%m%d}"


def _parse_filename(schema: list, facets: dict) -> str:
return "_".join(
[
facets[element] if element != "dates" else _parse_dates(facets)
for element in schema
if element == "dates" or not isna(facets[element])
]
)


def _parse_structure(schema: list, facets: dict) -> list:
folder_tree = list()
for level in schema:
part = _parse_level(level, facets)
if not isna(part):
folder_tree.append(part)
return folder_tree


def parse_schema(
facets: dict, schema: Union[str, os.PathLike, dict], top_folder: str = "datasets"
) -> list:
) -> Tuple[List[str], str]:
"""Parse the schema from a YAML schema configuration and construct path using a dictionary of facets.

Parameters
Expand All @@ -194,80 +285,32 @@ def parse_schema(

Returns
-------
list
list of folders, filename without suffix
"""

def _parse_top_level(schematic: dict, facet_dict: dict, top: str):
try:
parent = schematic[top]
except KeyError:
logging.error("Schema is not a valid facet-tree reference.")
raise

for i, options in enumerate(parent):
if {"option", "structure", "value"}.issubset(options.keys()):
option = options["option"]
value = options["value"]

if option in facet_dict.keys():
if facet_dict[option] == value:
return {"branch": value, "structure": options["structure"]}
continue
raise ValueError("Couldn\nt parse top level.")

def _parse_structure(branch_dict: dict, facet_dict: dict) -> list:
structure = branch_dict.get("structure")
folder_tree = list()

for level in structure:
if isinstance(level, str):
folder_tree.append(level)
continue
elif isinstance(level, dict):
if {"option", "is_true"}.issubset(level.keys()):
option = level["option"]

if option not in facet_dict and "value" in level:
raise ValueError(
f"Necessary facet not found for schema: `{option}`."
)

is_true = level.get("is_true")
else_value = level.get("else")
facet = facet_dict.get(option)

if "value" not in level:
# The absence of "value" means that "is_true / else" refer to the presence or not of "option" in the facets
# We also treat falsy values (empty string, None) as the absence of "option" from the facets
if not bool(facet) and else_value:
folder_tree.append(else_value)
elif bool(facet):
folder_tree.append(is_true)
else:
# "option" absent from the facets and no "else": skip.
pass
else:
value = level["value"]
if facet_dict[option] == value:
folder_tree.append(is_true)
elif else_value:
folder_tree.append(else_value)
else:
# "option" not equal to "value", but no "else" : skip.
pass
else:
raise ValueError("Supplied schema is invalid.")
return folder_tree

if isinstance(schema, (str, os.PathLike)):
with Path(schema).open() as f:
schema = yaml.safe_load(f.read())

branch = _parse_top_level(schema, facets, top_folder)
tree = list() # noqa
tree.extend(_parse_structure(branch, facets))
try:
parent = schema[top_folder]
except KeyError:
logging.error("Schema is not a valid facet-tree reference.")
raise

return tree
for i, structure in enumerate(parent):
if {"with", "structure", "filename"} != set(structure.keys()):
raise ValueError("Invalid schema specification.")

match = reduce(
op.and_, map(partial(_parse_option, facets=facets), structure["with"])
)
if match:
return _parse_structure(structure["structure"], facets), _parse_filename(
structure["filename"], facets
)
raise ValueError(
f"This file doesn't match any registered structure. Facets:\n{facets}"
)


def build_path_from_schema(
Expand Down Expand Up @@ -295,24 +338,18 @@ def build_path_from_schema(
Returns
-------
Path or None
The Path includes the filename, without suffix.
"""

if schema is None:
schema = Path(__file__).parent.joinpath("data").joinpath("ouranos_schema.yml")

tree = parse_schema(facets, schema, top_folder)
branch = tree[0]

if validate and VALIDATION_ENABLED:
if facets[branch] in validation_schemas.keys():
if facets["type"] in validation_schemas.keys():
try:
validation_schemas[facets[branch]].validate(facets)
validation_schemas[facets["type"]].validate(facets)
except SchemaError as e:
logging.error(
f"Validation issues found for file matching schema: {facets}: {e}"
)
return
elif facets[branch] not in validation_schemas.keys():
elif facets["type"] not in validation_schemas.keys():
logging.error(
f"No appropriate data schemas found for file matching schema: {facets}",
DecoderError,
Expand All @@ -323,11 +360,11 @@ def build_path_from_schema(
"Facets validation requires pyessv-archive source files. Skipping validation checks."
)

file_location = list()
for facet in tree:
# Remove spaces in folder paths
file_location.append(str(facets[facet]).replace(" ", "-"))
return Path(output_folder).joinpath("/".join(file_location))
if schema is None:
schema = Path(__file__).parent.joinpath("data").joinpath("ouranos_schema.yml")

tree, filename = parse_schema(facets, schema, top_folder)
return Path(output_folder).joinpath("/".join(tree)) / filename


def structure_datasets(
Expand Down Expand Up @@ -396,7 +433,9 @@ def structure_datasets(
for file, facets in decoder.file_facets().items():
output_filepath = build_path_from_schema(facets, output_folder)
if isinstance(output_filepath, Path):
all_file_paths.update({Path(file): output_filepath})
all_file_paths.update(
{Path(file): output_filepath.with_suffix(Path(file).suffix)}
)
else:
errored_files.append(Path(file).name)
continue
Expand Down
Loading