Skip to content

Commit

Permalink
PR #630 improve save_result handling in VectorCube too (#623, #401, #583
Browse files Browse the repository at this point in the history
, #391)
  • Loading branch information
soxofaan committed Sep 25, 2024
1 parent deaa8b1 commit 855a4ca
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 96 deletions.
35 changes: 35 additions & 0 deletions openeo/rest/_datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,38 @@ def build_child_callback(
raise ValueError(process)

return PGNode.to_process_graph_argument(pg)


def _ensure_save_result(
cube: _ProcessGraphAbstraction,
*,
format: Optional[str] = None,
options: Optional[dict] = None,
weak_format: Optional[str] = None,
default_format: str,
method: str,
) -> _ProcessGraphAbstraction:
"""
Make sure there is a`save_result` node in the process graph.
:param format: (optional) desired `save_result` file format
:param options: (optional) desired `save_result` file format parameters
:param weak_format: (optional) weak format indicator guessed from file name
:param default_format: default format for data type to use when no format is specified by user
:return:
"""
# TODO #278 instead of standalone helper function, move this to common base class for raster cubes, vector cubes, ...
save_result_nodes = [n for n in cube.result_node().walk_nodes() if n.process_id == "save_result"]

if not save_result_nodes:
# No `save_result` node yet: automatically add it.
# TODO: the `save_result` method is not defined on _ProcessGraphAbstraction, but it is on DataCube and VectorCube
cube = cube.save_result(format=format or weak_format or default_format, options=options)
elif format or options:
raise OpenEoClientException(
f"{method} with explicit output {'format' if format else 'options'} {format or options!r},"
f" but the process graph already has `save_result` node(s)"
f" which is ambiguous and should not be combined."
)

return cube
66 changes: 24 additions & 42 deletions openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from openeo.rest._datacube import (
THIS,
UDF,
_ensure_save_result,
_ProcessGraphAbstraction,
build_child_callback,
)
Expand Down Expand Up @@ -2095,39 +2096,6 @@ def save_result(
}
)

def _ensure_save_result(
self,
*,
format: Optional[str] = None,
options: Optional[dict] = None,
weak_format: Optional[str] = None,
method: str,
) -> DataCube:
"""
Make sure there is a (final) `save_result` node in the process graph.
If there is already one: check if it is consistent with the given format/options (if any)
and add a new one otherwise.
:param format: (optional) desired `save_result` file format
:param options: (optional) desired `save_result` file format parameters
:return:
"""
# TODO #401 Unify with VectorCube._ensure_save_result and move to generic data cube parent class (not only for raster cubes, but also vector cubes)
save_result_nodes = [n for n in self.result_node().walk_nodes() if n.process_id == "save_result"]

cube = self
if not save_result_nodes:
# No `save_result` node yet: automatically add it.
cube = cube.save_result(format=format or weak_format or self._DEFAULT_RASTER_FORMAT, options=options)
elif format or options:
raise OpenEoClientException(
f"{method} with explicit output {'format' if format else 'options'} {format or options!r},"
f" but the process graph already has `save_result` node(s)"
f" which is ambiguous and should not be combined."
)

return cube

def download(
self,
outputfile: Optional[Union[str, pathlib.Path]] = None,
Expand All @@ -2149,9 +2117,14 @@ def download(
(overruling the connection's ``auto_validate`` setting).
:return: None if the result is stored to disk, or a bytes object returned by the backend.
"""
weak_format = guess_format(outputfile) if outputfile else None
cube = self._ensure_save_result(
format=format, options=options, weak_format=weak_format, method="DataCube.download()"
# TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ...
cube = _ensure_save_result(
cube=self,
format=format,
options=options,
weak_format=guess_format(outputfile) if outputfile else None,
default_format=self._DEFAULT_RASTER_FORMAT,
method="DataCube.download()",
)
return self._connection.download(cube.flat_graph(), outputfile, validate=validate)

Expand Down Expand Up @@ -2274,9 +2247,14 @@ def execute_batch(
if "format" in format_options and not out_format:
out_format = format_options["format"] # align with 'download' call arg name

weak_format = guess_format(outputfile) if outputfile else None
cube = self._ensure_save_result(
format=out_format, options=format_options, weak_format=weak_format, method="DataCube.execute_batch()"
# TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ...
cube = _ensure_save_result(
cube=self,
format=out_format,
options=format_options,
weak_format=guess_format(outputfile) if outputfile else None,
default_format=self._DEFAULT_RASTER_FORMAT,
method="DataCube.execute_batch()",
)

job = cube.create_job(job_options=job_options, validate=validate)
Expand Down Expand Up @@ -2320,9 +2298,13 @@ def create_job(
"""
# TODO: add option to also automatically start the job?
# TODO: avoid using all kwargs as format_options
# TODO: centralize `create_job` for `DataCube`, `VectorCube`, `MlModel`, ...
cube = self._ensure_save_result(
format=out_format, options=format_options or None, method="DataCube.create_job()"
# TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ...
cube = _ensure_save_result(
cube=self,
format=out_format,
options=format_options or None,
default_format=self._DEFAULT_RASTER_FORMAT,
method="DataCube.create_job()",
)
return self._connection.create_job(
process_graph=cube.flat_graph(),
Expand Down
73 changes: 29 additions & 44 deletions openeo/rest/vectorcube.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from openeo.rest._datacube import (
THIS,
UDF,
_ensure_save_result,
_ProcessGraphAbstraction,
build_child_callback,
)
Expand All @@ -37,6 +38,8 @@ class VectorCube(_ProcessGraphAbstraction):
A geometry is specified in a 'coordinate reference system'. https://www.w3.org/TR/sdw-bp/#dfn-coordinate-reference-system-(crs)
"""

_DEFAULT_VECTOR_FORMAT = "GeoJSON"

def __init__(self, graph: PGNode, connection: Connection, metadata: Optional[CubeMetadata] = None):
super().__init__(pgnode=graph, connection=connection)
self.metadata = metadata
Expand Down Expand Up @@ -195,38 +198,6 @@ def save_result(self, format: Union[str, None] = "GeoJSON", options: dict = None
},
)

def _ensure_save_result(
self,
format: Optional[str] = None,
options: Optional[dict] = None,
) -> VectorCube:
"""
Make sure there is a (final) `save_result` node in the process graph.
If there is already one: check if it is consistent with the given format/options (if any)
and add a new one otherwise.
:param format: (optional) desired `save_result` file format
:param options: (optional) desired `save_result` file format parameters
:return:
"""
# TODO #401 Unify with DataCube._ensure_save_result and move to generic data cube parent class
result_node = self.result_node()
if result_node.process_id == "save_result":
# There is already a `save_result` node:
# check if it is consistent with given format/options (if any)
args = result_node.arguments
if format is not None and format.lower() != args["format"].lower():
raise ValueError(f"Existing `save_result` node with different format {args['format']!r} != {format!r}")
if options is not None and options != args["options"]:
raise ValueError(
f"Existing `save_result` node with different options {args['options']!r} != {options!r}"
)
cube = self
else:
# No `save_result` node yet: automatically add it.
cube = self.save_result(format=format or "GeoJSON", options=options)
return cube

def execute(self, *, validate: Optional[bool] = None) -> dict:
"""Executes the process graph."""
return self._connection.execute(self.flat_graph(), validate=validate)
Expand Down Expand Up @@ -255,11 +226,15 @@ def download(
When not specified explicitly, output format is guessed from output file extension.
"""
# TODO #401 make outputfile optional (See DataCube.download)
# TODO #401/#449 don't guess/override format if there is already a save_result with format?
if format is None and outputfile:
format = guess_format(outputfile)
cube = self._ensure_save_result(format=format, options=options)
# TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ...
cube = _ensure_save_result(
cube=self,
format=format,
options=options,
weak_format=guess_format(outputfile) if outputfile else None,
default_format=self._DEFAULT_VECTOR_FORMAT,
method="VectorCube.download()",
)
return self._connection.download(cube.flat_graph(), outputfile=outputfile, validate=validate)

def execute_batch(
Expand Down Expand Up @@ -291,11 +266,15 @@ def execute_batch(
.. versionchanged:: 0.21.0
When not specified explicitly, output format is guessed from output file extension.
"""
if out_format is None and outputfile:
# TODO #401/#449 don't guess/override format if there is already a save_result with format?
out_format = guess_format(outputfile)

job = self.create_job(out_format, job_options=job_options, validate=validate, **format_options)
cube = _ensure_save_result(
cube=self,
format=out_format,
options=format_options,
weak_format=guess_format(outputfile) if outputfile else None,
default_format=self._DEFAULT_VECTOR_FORMAT,
method="VectorCube.execute_batch()",
)
job = cube.create_job(job_options=job_options, validate=validate)
return job.run_synchronous(
# TODO #135 support multi file result sets too
outputfile=outputfile,
Expand Down Expand Up @@ -331,8 +310,14 @@ def create_job(
:return: Created job.
"""
# TODO: avoid using all kwargs as format_options
# TODO: centralize `create_job` for `DataCube`, `VectorCube`, `MlModel`, ...
cube = self._ensure_save_result(format=out_format, options=format_options or None)
# TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ...
cube = _ensure_save_result(
cube=self,
format=out_format,
options=format_options or None,
default_format=self._DEFAULT_VECTOR_FORMAT,
method="VectorCube.create_job()",
)
return self._connection.create_job(
process_graph=cube.flat_graph(),
title=title,
Expand Down
7 changes: 5 additions & 2 deletions openeo/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,12 +416,15 @@ def deep_set(data: dict, *keys, value):
raise ValueError("No keys given")


def guess_format(filename: Union[str, Path]) -> str:
def guess_format(filename: Union[str, Path]) -> Union[str, None]:
"""
Guess the output format from a given filename and return the corrected format.
Any names not in the dict get passed through.
"""
extension = str(filename).rsplit(".", 1)[-1].lower()
extension = Path(filename).suffix
if not extension:
return None
extension = extension[1:].lower()

format_map = {
"gtiff": "GTiff",
Expand Down
Loading

0 comments on commit 855a4ca

Please sign in to comment.