diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d14c5f3..21be51f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,10 @@ and start a new "In Progress" section above it. ## In progress +## 0.107.0 + - `evaluate_process_from_url`: drop support for URL guessing from folder-like URL ([#297)](https://github.com/Open-EO/openeo-python-driver/issues/297)) +- `evaluate_process_from_url`: align with new (and experimental) "remote-udp" extension ([#297)](https://github.com/Open-EO/openeo-python-driver/issues/297)) ## 0.106.0 diff --git a/openeo_driver/ProcessGraphDeserializer.py b/openeo_driver/ProcessGraphDeserializer.py index 9ee1ac6e..eaed4858 100644 --- a/openeo_driver/ProcessGraphDeserializer.py +++ b/openeo_driver/ProcessGraphDeserializer.py @@ -1,4 +1,5 @@ # TODO: rename this module to something in snake case? It doesn't even implement a ProcessGraphDeserializer class. +# TODO: and related: separate generic process graph handling from more concrete openEO process implementations # pylint: disable=unused-argument @@ -20,7 +21,6 @@ import openeo_processes import pandas as pd import pyproj -import requests import shapely.geometry import shapely.ops from dateutil.relativedelta import relativedelta @@ -51,11 +51,10 @@ ProcessParameterInvalidException, FeatureUnsupportedException, OpenEOApiException, - ProcessGraphInvalidException, - ProcessUnsupportedException, CollectionNotFoundException, ) from openeo_driver.processes import ProcessRegistry, ProcessSpec, DEFAULT_NAMESPACE, ProcessArgs +from openeo_driver.processgraph import get_process_definition_from_url from openeo_driver.save_result import ( JSONResult, SaveResult, @@ -1590,8 +1589,9 @@ def apply_process(process_id: str, args: dict, namespace: Union[str, None], env: # when all arguments and dependencies are resolved, we can run the process if namespace and any(namespace.startswith(p) for p in ["http://", "https://"]): - # TODO: HTTPS only by default and config to also allow HTTP (e.g. for localhost dev and testing) - # TODO: security aspects: only allow for certain users, only allow whitelisted domains, ...? + if namespace.startswith("http://"): + _log.warning(f"HTTP protocol for namespace based remote process definitions is discouraged: {namespace!r}") + # TODO: security aspects: only allow for certain users, only allow whitelisted domains, support content hash verification ...? return evaluate_process_from_url( process_id=process_id, namespace=namespace, args=args, env=env @@ -1859,32 +1859,28 @@ def evaluate_udp(process_id: str, udp: UserDefinedProcessMetadata, args: dict, e def evaluate_process_from_url(process_id: str, namespace: str, args: dict, env: EvalEnv): - res = requests.get(namespace) - if res.status_code != 200: - raise ProcessUnsupportedException(process=process_id, namespace=namespace) - + """ + Load remote process definition from URL (provided through `namespace` property + :param process_id: process id of process that should be available at given URL (namespace) + :param namespace: URL of process definition + """ try: - spec = res.json() - if spec["id"].lower() != process_id.lower(): - raise OpenEOApiException( - status_code=400, - code="ProcessIdMismatch", - message=f"Mismatch between expected process {process_id!r} and process {spec['id']!r} defined at {namespace!r}.", - ) - process_graph = spec["process_graph"] - parameters = spec.get("parameters", []) + process_definition = get_process_definition_from_url(process_id=process_id, url=namespace) except OpenEOApiException: raise except Exception as e: - _log.error(f"Failed to load process {process_id=} from {namespace=}: {e=}", exc_info=True) raise OpenEOApiException( status_code=400, - code="ProcessResourceInvalid", - message=f"Failed to load process {process_id!r} from {namespace!r}: {e!r}", + code="ProcessNamespaceInvalid", + message=f"Process '{process_id}' specified with invalid namespace '{namespace}': {e!r}", ) from e return _evaluate_process_graph_process( - process_id=process_id, process_graph=process_graph, parameters=parameters, args=args, env=env + process_id=process_id, + process_graph=process_definition.process_graph, + parameters=process_definition.parameters, + args=args, + env=env, ) diff --git a/openeo_driver/_version.py b/openeo_driver/_version.py index e0fb33e2..ad2924d3 100644 --- a/openeo_driver/_version.py +++ b/openeo_driver/_version.py @@ -1 +1 @@ -__version__ = "0.106.0a1" +__version__ = "0.107.0a1" diff --git a/openeo_driver/backend.py b/openeo_driver/backend.py index 98186878..af0125e5 100644 --- a/openeo_driver/backend.py +++ b/openeo_driver/backend.py @@ -519,6 +519,10 @@ class UserDefinedProcessMetadata(NamedTuple): """ Container for user-defined process metadata. """ + + # TODO: generalize this to generic openEO process definitions + # (not only "user-defined", but also remote/public process definitions) + id: str # Note: "process_graph" is optional for multiple UDP listings (`GET /process_graphs`), # but required for full, single UDP metadata requests (`GET /process_graphs/{process_graph_id}`) @@ -695,6 +699,8 @@ class OpenEoBackendImplementation: DEFAULT_CONFORMANCE_CLASSES = [ # general openEO conformance class "https://api.openeo.org/1.2.0", + # Remote process definition extension + "https://api.openeo.org/extensions/remote-udp/0.1.0", ] def __init__( diff --git a/openeo_driver/processgraph.py b/openeo_driver/processgraph.py new file mode 100644 index 00000000..7e8c31c4 --- /dev/null +++ b/openeo_driver/processgraph.py @@ -0,0 +1,77 @@ +import logging +import requests +from typing import NamedTuple, List, Optional +from openeo_driver.errors import OpenEOApiException + +_log = logging.getLogger(__name__) + + +class ProcessDefinition(NamedTuple): + """ + Like `UserDefinedProcessMetadata`, but with different defaults + (e.g. process graph and parameters are required). + """ + + # Process id + id: str + # Flat-graph representation of the process + process_graph: dict + # List of parameters expected by the process + parameters: List[dict] + # Definition what the process returns + returns: Optional[dict] = None + + +def get_process_definition_from_url(process_id: str, url: str) -> ProcessDefinition: + """ + Get process definition (process graph, parameters, title, ...) from URL, + which should provide: + - a JSON document with the process definition, compatible with + the `GET /process_graphs/{process_graph_id}` openEO API endpoint. + - a JSON doc with process listing, compatible with + the `GET /process_graphs` openEO API endpoint. + """ + _log.debug(f"Trying to load process definition for {process_id=} from {url=}") + # TODO: send headers, e.g. with custom user agent? + # TODO: add/support caching. Add retrying too? + res = requests.get(url=url) + res.raise_for_status() + doc = res.json() + if not isinstance(doc, dict): + raise ValueError(f"Process definition should be a JSON object, but got {type(doc)}.") + + # TODO: deeper validation (e.g. JSON Schema based)? + if "id" in doc and "process_graph" in doc: + _log.debug(f"Detected single process definition for {process_id=} at {url=}") + spec = doc + if spec["id"] != process_id: + raise OpenEOApiException( + status_code=400, + code="ProcessIdMismatch", + message=f"Mismatch between expected process {process_id!r} and process {spec['id']!r} defined at {url!r}.", + ) + elif "processes" in doc and "links" in doc: + _log.debug(f"Searching for {process_id=} at process listing {url=}") + found = [ + p for p in doc["processes"] if isinstance(p, dict) and p.get("id") == process_id and p.get("process_graph") + ] + if len(found) != 1: + raise OpenEOApiException( + status_code=400, + code="ProcessNotFound", + message=f"Process {process_id!r} not found in process listing at {url!r}.", + ) + spec = found[0] + else: + raise OpenEOApiException( + status_code=400, + code="ProcessNotFound", + message=f"No valid process definition for {process_id!r} found at {url!r}.", + ) + + return ProcessDefinition( + id=process_id, + process_graph=spec["process_graph"], + parameters=spec.get("parameters", []), + returns=spec.get("returns"), + ) diff --git a/tests/test_processgraph.py b/tests/test_processgraph.py new file mode 100644 index 00000000..cf520bb8 --- /dev/null +++ b/tests/test_processgraph.py @@ -0,0 +1,55 @@ +from openeo_driver.errors import OpenEOApiException +from openeo_driver.processgraph import get_process_definition_from_url + +import pytest + + +class TestProcessDefinitionFromUrl: + PROCESS_ADD35 = { + "id": "add35", + "process_graph": {"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}}, + "parameters": [], + "returns": {"schema": {"type": "number"}}, + } + + PROCESS_ADD3PARAM = { + "id": "add3param", + "process_graph": { + "add": {"process_id": "add", "arguments": {"x": 3, "y": {"from_parameter": "delta"}}, "result": True} + }, + "parameters": [ + {"name": "delta", "schema": {"type": "number", "optional": True, "default": 1}}, + ], + "returns": {"schema": {"type": "number"}}, + } + + def test_get_process_definition_from_url_single(self, requests_mock): + requests_mock.get("https://share.test/add3param.json", json=self.PROCESS_ADD3PARAM) + + pd = get_process_definition_from_url("add3param", "https://share.test/add3param.json") + assert pd.id == "add3param" + assert pd.process_graph == { + "add": {"process_id": "add", "arguments": {"x": 3, "y": {"from_parameter": "delta"}}, "result": True}, + } + assert pd.parameters == [{"name": "delta", "schema": {"type": "number", "optional": True, "default": 1}}] + assert pd.returns == {"schema": {"type": "number"}} + + def test_get_process_definition_from_url_listing(self, requests_mock): + requests_mock.get( + "https://share.test/processes/", + json={ + "processes": [ + self.PROCESS_ADD35, + self.PROCESS_ADD3PARAM, + ], + "links": [], + }, + ) + + pd = get_process_definition_from_url("add3param", "https://share.test/processes/") + assert pd.id == "add3param" + assert pd.process_graph == { + "add": {"process_id": "add", "arguments": {"x": 3, "y": {"from_parameter": "delta"}}, "result": True}, + } + assert pd.parameters == [{"name": "delta", "schema": {"type": "number", "optional": True, "default": 1}}] + assert pd.returns == {"schema": {"type": "number"}} diff --git a/tests/test_views_execute.py b/tests/test_views_execute.py index d189d929..821377b9 100644 --- a/tests/test_views_execute.py +++ b/tests/test_views_execute.py @@ -11,6 +11,7 @@ from typing import Iterable, Optional from unittest import mock, skip from zipfile import ZipFile +import http.client import geopandas as gpd import numpy as np @@ -2685,72 +2686,125 @@ def test_discard_result(api): assert res.json is None -@pytest.mark.parametrize(["namespace", "url_mocks", "expected_error"], [ - ( - "https://oeo.test/u/42/udp/bbox_mol", - {"https://oeo.test/u/42/udp/bbox_mol": "udp/bbox_mol.json"}, +@pytest.mark.parametrize( + ["namespace", "url_mocks", "expected_error"], + [ + pytest.param( + "https://share.test/u42/bbox_mol.json", + {"https://share.test/u42/bbox_mol.json": "udp/bbox_mol.json"}, None, - ), - ( - "https://share.example/u42/bbox_mol.json", - {"https://share.example/u42/bbox_mol.json": "udp/bbox_mol.json"}, + id="basic", + ), + pytest.param( + "https://share.test/u42/bbox_mol", + {"https://share.test/u42/bbox_mol": "udp/bbox_mol.json"}, None, - ), - ( + id="simple-no-extension", + ), + pytest.param( "https://share.test/u42/bbox_mol.json", { "https://share.test/u42/bbox_mol.json": (302, "https://shr976.test/45435"), "https://shr976.test/45435": "udp/bbox_mol.json", }, None, - ), - ( - "https://oeo.test/u/42/udp/bbox_mol", - {"https://oeo.test/u/42/udp/bbox_mol": 404}, + id="redirect", + ), + pytest.param( + "https://share.test/u42/", + { + "https://share.test/u42/": { + "processes": [ + {"id": "foo", "process_graph": {}}, + load_json("pg/1.0/udp/bbox_mol.json"), + ], + "links": [], + } + }, + None, + id="process-listing", + ), + pytest.param( + "https://share.test/u42/bbox_mol.json", + {"https://share.test/u42/bbox_mol.json": 404}, ( - 400, "ProcessUnsupported", - "'bbox_mol' is not available in namespace 'https://oeo.test/u/42/udp/bbox_mol'." + 400, + "ProcessNamespaceInvalid", + "Process 'bbox_mol' specified with invalid namespace 'https://share.test/u42/bbox_mol.json': HTTPError('404 Client Error: Not Found for url: https://share.test/u42/bbox_mol.json')", ), - ), - ( - "https://oeo.test/u/42/udp/bbox_mol", - {"https://oeo.test/u/42/udp/bbox_mol": {"foo": "bar"}}, + id="error-404", + ), + pytest.param( + "https://share.test/u42/bbox_mol.json", + {"https://share.test/u42/bbox_mol.json": "[1,2,3]"}, ( 400, - "ProcessResourceInvalid", - "Failed to load process 'bbox_mol' from 'https://oeo.test/u/42/udp/bbox_mol': KeyError('id')", + "ProcessNamespaceInvalid", + "Process 'bbox_mol' specified with invalid namespace 'https://share.test/u42/bbox_mol.json': ValueError(\"Process definition should be a JSON object, but got .\")", ), + id="error-no-dict", ), - ( - "https://oeo.test/u/42/udp/bbox_mol", - {"https://oeo.test/u/42/udp/bbox_mol": '{"foo": invalid json'}, + pytest.param( + "https://share.test/u42/bbox_mol.json", + {"https://share.test/u42/bbox_mol.json": {"foo": "bar"}}, ( 400, - "ProcessResourceInvalid", - "Failed to load process 'bbox_mol' from 'https://oeo.test/u/42/udp/bbox_mol': JSONDecodeError", + "ProcessNotFound", + "No valid process definition for 'bbox_mol' found at 'https://share.test/u42/bbox_mol.json'.", ), + id="error-no-id", ), - ( - "https://share.example/u42/bbox_mol.json", + pytest.param( + "https://share.test/u42/bbox_mol.json", + {"https://share.test/u42/bbox_mol.json": '{"foo": invalid json'}, + ( + 400, + "ProcessNamespaceInvalid", + "Process 'bbox_mol' specified with invalid namespace 'https://share.test/u42/bbox_mol.json': JSONDecodeError('Expecting value: line 1 column 9 (char 8)')", + ), + id="error-invalid-json", + ), + pytest.param( + "https://share.test/u42/bbox_mol.json", { - "https://share.example/u42/bbox_mol.json": load_json( + "https://share.test/u42/bbox_mol.json": load_json( "pg/1.0/udp/bbox_mol.json", preprocess=lambda t: t.replace("bbox_mol", "BBox_Mol") ) }, - None, + ( + 400, + "ProcessIdMismatch", + "Mismatch between expected process 'bbox_mol' and process 'BBox_Mol' defined at 'https://share.test/u42/bbox_mol.json'.", + ), + id="error-id-mismatch-capitalization", ), - ( - "https://share.example/u42/bbox_mol.json", + pytest.param( + "https://share.test/u42/bbox_mol.json", { - "https://share.example/u42/bbox_mol.json": load_json( + "https://share.test/u42/bbox_mol.json": load_json( "pg/1.0/udp/bbox_mol.json", preprocess=lambda t: t.replace("bbox_mol", "BoundingBox-Mol") ) }, ( 400, "ProcessIdMismatch", - "Mismatch between expected process 'bbox_mol' and process 'BoundingBox-Mol' defined at 'https://share.example/u42/bbox_mol.json'.", + "Mismatch between expected process 'bbox_mol' and process 'BoundingBox-Mol' defined at 'https://share.test/u42/bbox_mol.json'.", ), + id="error-id-mismatch-different-id", + ), + pytest.param( + "https://share.test/u42/", + { + "https://share.test/u42/": { + "processes": [ + {"id": "foo", "process_graph": {}}, + {"id": "bar", "process_graph": {}}, + ], + "links": [], + } + }, + (400, "ProcessNotFound", "Process 'bbox_mol' not found in process listing at 'https://share.test/u42/'."), + id="process-listing-missing", ), ], ) @@ -2765,7 +2819,7 @@ def test_evaluate_process_from_url(api, requests_mock, namespace, url_mocks, exp elif isinstance(value, dict): requests_mock.get(url, json=value) elif value in [404, 500]: - requests_mock.get(url, status_code=value) + requests_mock.get(url, status_code=value, reason=http.client.responses.get(value)) elif isinstance(value, tuple) and value[0] in [302]: status_code, target = value requests_mock.get(url, status_code=status_code, headers={"Location": target}) @@ -2773,10 +2827,18 @@ def test_evaluate_process_from_url(api, requests_mock, namespace, url_mocks, exp raise ValueError(value) # Evaluate process graph (with URL namespace) - pg = api.load_json("udp_bbox_mol_basic.json") - assert pg["bboxmol1"]["process_id"] == "bbox_mol" - pg["bboxmol1"]["namespace"] = namespace - + pg = { + "loadcollection1": { + "process_id": "load_collection", + "arguments": {"id": "S2_FOOBAR"}, + }, + "bboxmol1": { + "process_id": "bbox_mol", + "namespace": namespace, + "arguments": {"data": {"from_node": "loadcollection1"}}, + "result": True, + }, + } res = api.result(pg) if expected_error: status_code, error_code, message = expected_error