diff --git a/openeo/rest/_datacube.py b/openeo/rest/_datacube.py index f0afeca47..6a8ffe1da 100644 --- a/openeo/rest/_datacube.py +++ b/openeo/rest/_datacube.py @@ -319,3 +319,38 @@ def build_child_callback( raise ValueError(process) return PGNode.to_process_graph_argument(pg) + + +def _ensure_save_result( + cube: _ProcessGraphAbstraction, + *, + format: Optional[str] = None, + options: Optional[dict] = None, + weak_format: Optional[str] = None, + default_format: str, + method: str, +) -> _ProcessGraphAbstraction: + """ + Make sure there is a`save_result` node in the process graph. + + :param format: (optional) desired `save_result` file format + :param options: (optional) desired `save_result` file format parameters + :param weak_format: (optional) weak format indicator guessed from file name + :param default_format: default format for data type to use when no format is specified by user + :return: + """ + # TODO #278 instead of standalone helper function, move this to common base class for raster cubes, vector cubes, ... + save_result_nodes = [n for n in cube.result_node().walk_nodes() if n.process_id == "save_result"] + + if not save_result_nodes: + # No `save_result` node yet: automatically add it. + # TODO: the `save_result` method is not defined on _ProcessGraphAbstraction, but it is on DataCube and VectorCube + cube = cube.save_result(format=format or weak_format or default_format, options=options) + elif format or options: + raise OpenEoClientException( + f"{method} with explicit output {'format' if format else 'options'} {format or options!r}," + f" but the process graph already has `save_result` node(s)" + f" which is ambiguous and should not be combined." + ) + + return cube diff --git a/openeo/rest/datacube.py b/openeo/rest/datacube.py index 7ea6b5054..e9ad729ca 100644 --- a/openeo/rest/datacube.py +++ b/openeo/rest/datacube.py @@ -46,6 +46,7 @@ from openeo.rest._datacube import ( THIS, UDF, + _ensure_save_result, _ProcessGraphAbstraction, build_child_callback, ) @@ -2095,39 +2096,6 @@ def save_result( } ) - def _ensure_save_result( - self, - *, - format: Optional[str] = None, - options: Optional[dict] = None, - weak_format: Optional[str] = None, - method: str, - ) -> DataCube: - """ - Make sure there is a (final) `save_result` node in the process graph. - If there is already one: check if it is consistent with the given format/options (if any) - and add a new one otherwise. - - :param format: (optional) desired `save_result` file format - :param options: (optional) desired `save_result` file format parameters - :return: - """ - # TODO #401 Unify with VectorCube._ensure_save_result and move to generic data cube parent class (not only for raster cubes, but also vector cubes) - save_result_nodes = [n for n in self.result_node().walk_nodes() if n.process_id == "save_result"] - - cube = self - if not save_result_nodes: - # No `save_result` node yet: automatically add it. - cube = cube.save_result(format=format or weak_format or self._DEFAULT_RASTER_FORMAT, options=options) - elif format or options: - raise OpenEoClientException( - f"{method} with explicit output {'format' if format else 'options'} {format or options!r}," - f" but the process graph already has `save_result` node(s)" - f" which is ambiguous and should not be combined." - ) - - return cube - def download( self, outputfile: Optional[Union[str, pathlib.Path]] = None, @@ -2149,9 +2117,14 @@ def download( (overruling the connection's ``auto_validate`` setting). :return: None if the result is stored to disk, or a bytes object returned by the backend. """ - weak_format = guess_format(outputfile) if outputfile else None - cube = self._ensure_save_result( - format=format, options=options, weak_format=weak_format, method="DataCube.download()" + # TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ... + cube = _ensure_save_result( + cube=self, + format=format, + options=options, + weak_format=guess_format(outputfile) if outputfile else None, + default_format=self._DEFAULT_RASTER_FORMAT, + method="DataCube.download()", ) return self._connection.download(cube.flat_graph(), outputfile, validate=validate) @@ -2274,9 +2247,14 @@ def execute_batch( if "format" in format_options and not out_format: out_format = format_options["format"] # align with 'download' call arg name - weak_format = guess_format(outputfile) if outputfile else None - cube = self._ensure_save_result( - format=out_format, options=format_options, weak_format=weak_format, method="DataCube.execute_batch()" + # TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ... + cube = _ensure_save_result( + cube=self, + format=out_format, + options=format_options, + weak_format=guess_format(outputfile) if outputfile else None, + default_format=self._DEFAULT_RASTER_FORMAT, + method="DataCube.execute_batch()", ) job = cube.create_job(job_options=job_options, validate=validate) @@ -2320,9 +2298,13 @@ def create_job( """ # TODO: add option to also automatically start the job? # TODO: avoid using all kwargs as format_options - # TODO: centralize `create_job` for `DataCube`, `VectorCube`, `MlModel`, ... - cube = self._ensure_save_result( - format=out_format, options=format_options or None, method="DataCube.create_job()" + # TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ... + cube = _ensure_save_result( + cube=self, + format=out_format, + options=format_options or None, + default_format=self._DEFAULT_RASTER_FORMAT, + method="DataCube.create_job()", ) return self._connection.create_job( process_graph=cube.flat_graph(), diff --git a/openeo/rest/vectorcube.py b/openeo/rest/vectorcube.py index 206130bfc..7f383cedb 100644 --- a/openeo/rest/vectorcube.py +++ b/openeo/rest/vectorcube.py @@ -16,6 +16,7 @@ from openeo.rest._datacube import ( THIS, UDF, + _ensure_save_result, _ProcessGraphAbstraction, build_child_callback, ) @@ -37,6 +38,8 @@ class VectorCube(_ProcessGraphAbstraction): A geometry is specified in a 'coordinate reference system'. https://www.w3.org/TR/sdw-bp/#dfn-coordinate-reference-system-(crs) """ + _DEFAULT_VECTOR_FORMAT = "GeoJSON" + def __init__(self, graph: PGNode, connection: Connection, metadata: Optional[CubeMetadata] = None): super().__init__(pgnode=graph, connection=connection) self.metadata = metadata @@ -195,38 +198,6 @@ def save_result(self, format: Union[str, None] = "GeoJSON", options: dict = None }, ) - def _ensure_save_result( - self, - format: Optional[str] = None, - options: Optional[dict] = None, - ) -> VectorCube: - """ - Make sure there is a (final) `save_result` node in the process graph. - If there is already one: check if it is consistent with the given format/options (if any) - and add a new one otherwise. - - :param format: (optional) desired `save_result` file format - :param options: (optional) desired `save_result` file format parameters - :return: - """ - # TODO #401 Unify with DataCube._ensure_save_result and move to generic data cube parent class - result_node = self.result_node() - if result_node.process_id == "save_result": - # There is already a `save_result` node: - # check if it is consistent with given format/options (if any) - args = result_node.arguments - if format is not None and format.lower() != args["format"].lower(): - raise ValueError(f"Existing `save_result` node with different format {args['format']!r} != {format!r}") - if options is not None and options != args["options"]: - raise ValueError( - f"Existing `save_result` node with different options {args['options']!r} != {options!r}" - ) - cube = self - else: - # No `save_result` node yet: automatically add it. - cube = self.save_result(format=format or "GeoJSON", options=options) - return cube - def execute(self, *, validate: Optional[bool] = None) -> dict: """Executes the process graph.""" return self._connection.execute(self.flat_graph(), validate=validate) @@ -255,11 +226,15 @@ def download( When not specified explicitly, output format is guessed from output file extension. """ - # TODO #401 make outputfile optional (See DataCube.download) - # TODO #401/#449 don't guess/override format if there is already a save_result with format? - if format is None and outputfile: - format = guess_format(outputfile) - cube = self._ensure_save_result(format=format, options=options) + # TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ... + cube = _ensure_save_result( + cube=self, + format=format, + options=options, + weak_format=guess_format(outputfile) if outputfile else None, + default_format=self._DEFAULT_VECTOR_FORMAT, + method="VectorCube.download()", + ) return self._connection.download(cube.flat_graph(), outputfile=outputfile, validate=validate) def execute_batch( @@ -291,11 +266,15 @@ def execute_batch( .. versionchanged:: 0.21.0 When not specified explicitly, output format is guessed from output file extension. """ - if out_format is None and outputfile: - # TODO #401/#449 don't guess/override format if there is already a save_result with format? - out_format = guess_format(outputfile) - - job = self.create_job(out_format, job_options=job_options, validate=validate, **format_options) + cube = _ensure_save_result( + cube=self, + format=out_format, + options=format_options, + weak_format=guess_format(outputfile) if outputfile else None, + default_format=self._DEFAULT_VECTOR_FORMAT, + method="VectorCube.execute_batch()", + ) + job = cube.create_job(job_options=job_options, validate=validate) return job.run_synchronous( # TODO #135 support multi file result sets too outputfile=outputfile, @@ -331,8 +310,14 @@ def create_job( :return: Created job. """ # TODO: avoid using all kwargs as format_options - # TODO: centralize `create_job` for `DataCube`, `VectorCube`, `MlModel`, ... - cube = self._ensure_save_result(format=out_format, options=format_options or None) + # TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ... + cube = _ensure_save_result( + cube=self, + format=out_format, + options=format_options or None, + default_format=self._DEFAULT_VECTOR_FORMAT, + method="VectorCube.create_job()", + ) return self._connection.create_job( process_graph=cube.flat_graph(), title=title, diff --git a/openeo/util.py b/openeo/util.py index 762bf5f7c..6bbd4d897 100644 --- a/openeo/util.py +++ b/openeo/util.py @@ -416,12 +416,15 @@ def deep_set(data: dict, *keys, value): raise ValueError("No keys given") -def guess_format(filename: Union[str, Path]) -> str: +def guess_format(filename: Union[str, Path]) -> Union[str, None]: """ Guess the output format from a given filename and return the corrected format. Any names not in the dict get passed through. """ - extension = str(filename).rsplit(".", 1)[-1].lower() + extension = Path(filename).suffix + if not extension: + return None + extension = extension[1:].lower() format_map = { "gtiff": "GTiff", diff --git a/tests/rest/datacube/test_vectorcube.py b/tests/rest/datacube/test_vectorcube.py index 06dd25c40..757de537a 100644 --- a/tests/rest/datacube/test_vectorcube.py +++ b/tests/rest/datacube/test_vectorcube.py @@ -7,6 +7,7 @@ import openeo.processes from openeo.api.process import Parameter +from openeo.rest import OpenEoClientException from openeo.rest._testing import DummyBackend, build_capabilities from openeo.rest.connection import Connection from openeo.rest.vectorcube import VectorCube @@ -98,7 +99,7 @@ def test_download_auto_save_result_only_file( @pytest.mark.parametrize( - ["filename", "format", "expected_format"], + ["filename", "execute_format", "expected_format"], [ ("result.json", "JSON", "JSON"), ("result.geojson", "GeoJSON", "GeoJSON"), @@ -113,13 +114,13 @@ def test_download_auto_save_result_only_file( ) @pytest.mark.parametrize("exec_mode", ["sync", "batch"]) def test_download_auto_save_result_with_format( - vector_cube, dummy_backend, tmp_path, filename, format, expected_format, exec_mode + vector_cube, dummy_backend, tmp_path, filename, execute_format, expected_format, exec_mode ): output_path = tmp_path / filename if exec_mode == "sync": - vector_cube.download(output_path, format=format) + vector_cube.download(output_path, format=execute_format) elif exec_mode == "batch": - vector_cube.execute_batch(outputfile=output_path, out_format=format) + vector_cube.execute_batch(outputfile=output_path, out_format=execute_format) else: raise ValueError(exec_mode) @@ -173,20 +174,21 @@ def test_download_auto_save_result_with_options(vector_cube, dummy_backend, tmp_ @pytest.mark.parametrize( - ["output_file", "format", "expected_format"], + ["output_file", "save_result_format", "expected_format"], [ ("result.geojson", None, "GeoJSON"), ("result.geojson", "GeoJSON", "GeoJSON"), ("result.json", "JSON", "JSON"), ("result.nc", "netCDF", "netCDF"), + ("result.data", "netCDF", "netCDF"), ], ) @pytest.mark.parametrize("exec_mode", ["sync", "batch"]) -def test_save_result_and_download( - vector_cube, dummy_backend, tmp_path, output_file, format, expected_format, exec_mode +def test_save_result_and_download_filename( + vector_cube, dummy_backend, tmp_path, output_file, save_result_format, expected_format, exec_mode ): """e.g. https://github.com/Open-EO/openeo-geopyspark-driver/issues/477""" - vector_cube = vector_cube.save_result(format=format) + vector_cube = vector_cube.save_result(format=save_result_format) output_path = tmp_path / output_file if exec_mode == "sync": vector_cube.download(output_path) @@ -209,6 +211,108 @@ def test_save_result_and_download( assert output_path.read_bytes() == DummyBackend.DEFAULT_RESULT +@pytest.mark.parametrize( + ["save_result_format", "execute_format", "output_file", "expected"], + [ + (None, None, None, "GeoJSON"), + (None, None, "result.geojson", "GeoJSON"), + ("GeoJSON", None, None, "GeoJSON"), + (None, "GeoJSON", None, "GeoJSON"), + ( + "GeoJSON", + "GeoJSON", + None, + OpenEoClientException( + "VectorCube.download() with explicit output format 'GeoJSON', but the process graph already has `save_result` node(s) which is ambiguous and should not be combined." + ), + ), + (None, None, "result.nc", "netCDF"), + ("netCDF", None, None, "netCDF"), + (None, "netCDF", None, "netCDF"), + ( + "GeoJson", + "netCDF", + None, + OpenEoClientException( + "VectorCube.download() with explicit output format 'netCDF', but the process graph already has `save_result` node(s) which is ambiguous and should not be combined." + ), + ), + ], +) +def test_save_result_and_download_with_format( + vector_cube, dummy_backend, tmp_path, save_result_format, execute_format, output_file, expected +): + if save_result_format: + vector_cube = vector_cube.save_result(format=save_result_format) + output_path = tmp_path / (output_file or "data") + + def do_it(): + vector_cube.download(output_path, format=execute_format) + + if isinstance(expected, Exception): + with pytest.raises(type(expected), match=re.escape(str(expected))): + do_it() + else: + do_it() + assert dummy_backend.get_pg()["saveresult1"] == { + "process_id": "save_result", + "arguments": {"data": {"from_node": "loadgeojson1"}, "format": expected, "options": {}}, + "result": True, + } + assert output_path.read_bytes() == DummyBackend.DEFAULT_RESULT + + +@pytest.mark.parametrize( + ["save_result_format", "execute_format", "output_file", "expected"], + [ + (None, None, None, "GeoJSON"), + (None, None, "result.geojson", "GeoJSON"), + ("GeoJSON", None, None, "GeoJSON"), + (None, "GeoJSON", None, "GeoJSON"), + ( + "GeoJSON", + "GeoJSON", + None, + OpenEoClientException( + "VectorCube.execute_batch() with explicit output format 'GeoJSON', but the process graph already has `save_result` node(s) which is ambiguous and should not be combined." + ), + ), + (None, None, "result.nc", "netCDF"), + ("netCDF", None, None, "netCDF"), + (None, "netCDF", None, "netCDF"), + ( + "GeoJson", + "netCDF", + None, + OpenEoClientException( + "VectorCube.execute_batch() with explicit output format 'netCDF', but the process graph already has `save_result` node(s) which is ambiguous and should not be combined." + ), + ), + ], +) +def test_save_result_and_execute_batch_with_format( + vector_cube, dummy_backend, tmp_path, save_result_format, execute_format, output_file, expected +): + if save_result_format: + vector_cube = vector_cube.save_result(format=save_result_format) + output_path = tmp_path / (output_file or "data") + + def do_it(): + vector_cube.execute_batch(outputfile=output_path, out_format=execute_format) + + if isinstance(expected, Exception): + with pytest.raises(type(expected), match=re.escape(str(expected))): + do_it() + else: + do_it() + assert dummy_backend.get_pg()["saveresult1"] == { + "process_id": "save_result", + "arguments": {"data": {"from_node": "loadgeojson1"}, "format": expected, "options": {}}, + "result": True, + } + assert output_path.read_bytes() == DummyBackend.DEFAULT_RESULT + + @pytest.mark.parametrize( "data", [ diff --git a/tests/test_util.py b/tests/test_util.py index 1c148c2ef..d1e71ce74 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -612,6 +612,9 @@ def test_guess_format(): assert guess_format("/folder/file.png") == "PNG" assert guess_format("../folder/file.notaformat") == "NOTAFORMAT" + assert guess_format("../folder/data") is None + assert guess_format("../folder/data.tmp.nc") == "netCDF" + class TestLazyLoadCache: def test_basic(self):