Skip to content

Commit

Permalink
Issue #401 Improve automatic adding of save_result
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Mar 28, 2023
1 parent 509b4a4 commit 8620d9d
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 37 deletions.
123 changes: 87 additions & 36 deletions openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
from openeo.udf import XarrayDataCube


DEFAULT_RASTER_FORMAT = "GTiff"

log = logging.getLogger(__name__)


Expand Down Expand Up @@ -1797,7 +1799,9 @@ def atmospheric_correction(
})

@openeo_process
def save_result(self, format: str = "GTiff", options: dict = None) -> 'DataCube':
def save_result(
self, format: str = DEFAULT_RASTER_FORMAT, options: Optional[dict] = None
) -> "DataCube":
formats = set(self._connection.list_output_formats().keys())
if format.lower() not in {f.lower() for f in formats}:
raise ValueError("Invalid format {f!r}. Should be one of {s}".format(f=format, s=formats))
Expand All @@ -1810,23 +1814,23 @@ def save_result(self, format: str = "GTiff", options: dict = None) -> 'DataCube'
}
)

def download(
self, outputfile: Union[str, pathlib.Path, None] = None, format: Optional[str] = None,
options: Optional[dict] = None
):
def _ensure_save_result(
self, format: Optional[str], options: Optional[dict]
) -> "DataCube":
"""
Download image collection, e.g. as GeoTIFF.
If outputfile is provided, the result is stored on disk locally, otherwise, a bytes object is returned.
The bytes object can be passed on to a suitable decoder for decoding.
Make sure there is (final) save_result node in the process graph.
If there is already one: check if it is consistent with desired format/options
and add a new one otherwise.
:param outputfile: Optional, an output file if the result needs to be stored on disk.
:param format: Optional, an output format supported by the backend.
:param options: Optional, file format options
:return: None if the result is stored to disk, or a bytes object returned by the backend.
:param format: (optional) desired `save_result` file format
:param options: (optional) desired `save_result` file format parameters
:return:
"""
if self.result_node().process_id == "save_result":
# There is already a `save_result` node: check if it is consistent with given format/options
args = self.result_node().arguments
result_node = self.result_node()
if result_node.process_id == "save_result":
# There is already a `save_result` node:
# check if it is consistent with given format/options (if any)
args = result_node.arguments
if format is not None and format.lower() != args["format"].lower():
raise ValueError(
f"Existing `save_result` node with different format {args['format']!r} != {format!r}"
Expand All @@ -1838,10 +1842,30 @@ def download(
cube = self
else:
# No `save_result` node yet: automatically add it.
if not format:
format = guess_format(outputfile) if outputfile else "GTiff"
cube = self.save_result(format=format, options=options)
cube = self.save_result(
format=format or DEFAULT_RASTER_FORMAT, options=options
)
return cube

def download(
self,
outputfile: Optional[Union[str, pathlib.Path]] = None,
format: Optional[str] = None,
options: Optional[dict] = None,
):
"""
Download image collection, e.g. as GeoTIFF.
If outputfile is provided, the result is stored on disk locally, otherwise, a bytes object is returned.
The bytes object can be passed on to a suitable decoder for decoding.
:param outputfile: Optional, an output file if the result needs to be stored on disk.
:param format: Optional, an output format supported by the backend.
:param options: Optional, file format options
:return: None if the result is stored to disk, or a bytes object returned by the backend.
"""
if format is None and outputfile is not None:
format = guess_format(outputfile)
cube = self._ensure_save_result(format=format, options=options)
return self._connection.download(cube.flat_graph(), outputfile)

def validate(self) -> List[dict]:
Expand All @@ -1856,10 +1880,16 @@ def tiled_viewing_service(self, type: str, **kwargs) -> Service:
return self._connection.create_service(self.flat_graph(), type=type, **kwargs)

def execute_batch(
self,
outputfile: Union[str, pathlib.Path] = None, out_format: str = None,
print=print, max_poll_interval=60, connection_retry_interval=30,
job_options=None, **format_options) -> BatchJob:
self,
outputfile: Union[str, pathlib.Path] = None,
out_format: str = None,
print=print,
max_poll_interval=60,
connection_retry_interval=30,
job_options=None,
format_options: Optional[dict] = None,
**kwargs,
) -> BatchJob:
"""
Evaluate the process graph by creating a batch job, and retrieving the results when it is finished.
This method is mostly recommended if the batch job is expected to run in a reasonable amount of time.
Expand All @@ -1872,19 +1902,31 @@ def execute_batch(
:param format_options: String Parameters for the job result format
"""
if "format" in format_options and not out_format:
out_format = format_options["format"] # align with 'download' call arg name
# TODO: messy format and format_options handling here
if "format" in kwargs and not out_format:
out_format = kwargs["format"] # align with 'download' call arg name
if not out_format:
out_format = guess_format(outputfile) if outputfile else "GTiff"
job = self.create_job(out_format, job_options=job_options, **format_options)
out_format = (
guess_format(outputfile) if outputfile else DEFAULT_RASTER_FORMAT
)
job = self.create_job(
out_format, job_options=job_options, format_options=kwargs
)
return job.run_synchronous(
outputfile=outputfile,
print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval
)

def create_job(
self, out_format=None, title: str = None, description: str = None, plan: str = None, budget=None,
job_options=None, **format_options
self,
out_format: Optional[str] = None,
title: Optional[str] = None,
description: Optional[str] = None,
plan: Optional[str] = None,
budget: Optional[float] = None,
job_options: Optional[dict] = None,
format_options: Optional[dict] = None,
**kwargs,
) -> BatchJob:
"""
Sends the datacube's process graph as a batch job to the back-end
Expand All @@ -1894,18 +1936,27 @@ def create_job(
it still needs to be started and tracked explicitly.
Use :py:meth:`execute_batch` instead to have the openEO Python client take care of that job management.
:param out_format: String Format of the job result.
:param job_options: A dictionary containing (custom) job options
:param format_options: String Parameters for the job result format
:return: status: Job resulting job.
:param out_format: (optional) output file format.
:param job_options: (optional) custom job options.
:param format_options: (optional) output file format parameters.
"""
# TODO: add option to also automatically start the job?
img = self
if kwargs:
warnings.warn(
"Deprecated usage of keyword arguments `create_job()` to set file format options. "
"Instead, use `format_options` argument or an explicit `save_result()` method call "
"to set file format options.",
category=UserDeprecationWarning,
)
# Legacy usage pattern of setting format options through keyword arguments
assert not format_options
format_options = kwargs

cube = self
if out_format:
# add `save_result` node
img = img.save_result(format=out_format, options=format_options)
cube = cube._ensure_save_result(format=out_format, options=format_options)
return self._connection.create_job(
process_graph=img.flat_graph(),
process_graph=cube.flat_graph(),
title=title, description=description, plan=plan, budget=budget, additional=job_options
)

Expand Down
2 changes: 1 addition & 1 deletion openeo/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ def deep_set(data: dict, *keys, value):
raise ValueError("No keys given")


def guess_format(filename: Union[str, Path]):
def guess_format(filename: Union[str, Path]) -> str:
"""
Guess the output format from a given filename and return the corrected format.
Any names not in the dict get passed through.
Expand Down

0 comments on commit 8620d9d

Please sign in to comment.