Skip to content

Commit

Permalink
Issue #297 Improve compliance with 'remote-udp' extension
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Jul 8, 2024
1 parent a6d8c0a commit ec8b3b0
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 63 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ and start a new "In Progress" section above it.

## In progress

## 0.107.0

- `evaluate_process_from_url`: drop support for URL guessing from folder-like URL ([#297)](https://github.com/Open-EO/openeo-python-driver/issues/297))
- `evaluate_process_from_url`: align with new (and experimental) "remote-udp" extension ([#297)](https://github.com/Open-EO/openeo-python-driver/issues/297))

## 0.106.0

Expand Down
40 changes: 18 additions & 22 deletions openeo_driver/ProcessGraphDeserializer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# TODO: rename this module to something in snake case? It doesn't even implement a ProcessGraphDeserializer class.
# TODO: and related: separate generic process graph handling from more concrete openEO process implementations

# pylint: disable=unused-argument

Expand All @@ -20,7 +21,6 @@
import openeo_processes
import pandas as pd
import pyproj
import requests
import shapely.geometry
import shapely.ops
from dateutil.relativedelta import relativedelta
Expand Down Expand Up @@ -51,11 +51,10 @@
ProcessParameterInvalidException,
FeatureUnsupportedException,
OpenEOApiException,
ProcessGraphInvalidException,
ProcessUnsupportedException,
CollectionNotFoundException,
)
from openeo_driver.processes import ProcessRegistry, ProcessSpec, DEFAULT_NAMESPACE, ProcessArgs
from openeo_driver.processgraph import get_process_definition_from_url
from openeo_driver.save_result import (
JSONResult,
SaveResult,
Expand Down Expand Up @@ -1590,8 +1589,9 @@ def apply_process(process_id: str, args: dict, namespace: Union[str, None], env:

# when all arguments and dependencies are resolved, we can run the process
if namespace and any(namespace.startswith(p) for p in ["http://", "https://"]):
# TODO: HTTPS only by default and config to also allow HTTP (e.g. for localhost dev and testing)
# TODO: security aspects: only allow for certain users, only allow whitelisted domains, ...?
if namespace.startswith("http://"):
_log.warning(f"HTTP protocol for namespace based remote process definitions is discouraged: {namespace!r}")
# TODO: security aspects: only allow for certain users, only allow whitelisted domains, support content hash verification ...?

return evaluate_process_from_url(
process_id=process_id, namespace=namespace, args=args, env=env
Expand Down Expand Up @@ -1859,32 +1859,28 @@ def evaluate_udp(process_id: str, udp: UserDefinedProcessMetadata, args: dict, e


def evaluate_process_from_url(process_id: str, namespace: str, args: dict, env: EvalEnv):
res = requests.get(namespace)
if res.status_code != 200:
raise ProcessUnsupportedException(process=process_id, namespace=namespace)

"""
Load remote process definition from URL (provided through `namespace` property
:param process_id: process id of process that should be available at given URL (namespace)
:param namespace: URL of process definition
"""
try:
spec = res.json()
if spec["id"].lower() != process_id.lower():
raise OpenEOApiException(
status_code=400,
code="ProcessIdMismatch",
message=f"Mismatch between expected process {process_id!r} and process {spec['id']!r} defined at {namespace!r}.",
)
process_graph = spec["process_graph"]
parameters = spec.get("parameters", [])
process_definition = get_process_definition_from_url(process_id=process_id, url=namespace)
except OpenEOApiException:
raise
except Exception as e:
_log.error(f"Failed to load process {process_id=} from {namespace=}: {e=}", exc_info=True)
raise OpenEOApiException(
status_code=400,
code="ProcessResourceInvalid",
message=f"Failed to load process {process_id!r} from {namespace!r}: {e!r}",
code="ProcessNamespaceInvalid",
message=f"Process '{process_id}' specified with invalid namespace '{namespace}': {e!r}",
) from e

return _evaluate_process_graph_process(
process_id=process_id, process_graph=process_graph, parameters=parameters, args=args, env=env
process_id=process_id,
process_graph=process_definition.process_graph,
parameters=process_definition.parameters,
args=args,
env=env,
)


Expand Down
2 changes: 1 addition & 1 deletion openeo_driver/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.106.0a1"
__version__ = "0.107.0a1"
6 changes: 6 additions & 0 deletions openeo_driver/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,10 @@ class UserDefinedProcessMetadata(NamedTuple):
"""
Container for user-defined process metadata.
"""

# TODO: generalize this to generic openEO process definitions
# (not only "user-defined", but also remote/public process definitions)

id: str
# Note: "process_graph" is optional for multiple UDP listings (`GET /process_graphs`),
# but required for full, single UDP metadata requests (`GET /process_graphs/{process_graph_id}`)
Expand Down Expand Up @@ -695,6 +699,8 @@ class OpenEoBackendImplementation:
DEFAULT_CONFORMANCE_CLASSES = [
# general openEO conformance class
"https://api.openeo.org/1.2.0",
# Remote process definition extension
"https://api.openeo.org/extensions/remote-udp/0.1.0",
]

def __init__(
Expand Down
77 changes: 77 additions & 0 deletions openeo_driver/processgraph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import logging
import requests
from typing import NamedTuple, List, Optional
from openeo_driver.errors import OpenEOApiException

_log = logging.getLogger(__name__)


class ProcessDefinition(NamedTuple):
"""
Like `UserDefinedProcessMetadata`, but with different defaults
(e.g. process graph and parameters are required).
"""

# Process id
id: str
# Flat-graph representation of the process
process_graph: dict
# List of parameters expected by the process
parameters: List[dict]
# Definition what the process returns
returns: Optional[dict] = None


def get_process_definition_from_url(process_id: str, url: str) -> ProcessDefinition:
"""
Get process definition (process graph, parameters, title, ...) from URL,
which should provide:
- a JSON document with the process definition, compatible with
the `GET /process_graphs/{process_graph_id}` openEO API endpoint.
- a JSON doc with process listing, compatible with
the `GET /process_graphs` openEO API endpoint.
"""
_log.debug(f"Trying to load process definition for {process_id=} from {url=}")
# TODO: send headers, e.g. with custom user agent?
# TODO: add/support caching. Add retrying too?
res = requests.get(url=url)
res.raise_for_status()
doc = res.json()
if not isinstance(doc, dict):
raise ValueError(f"Process definition should be a JSON object, but got {type(doc)}.")

# TODO: deeper validation (e.g. JSON Schema based)?
if "id" in doc and "process_graph" in doc:
_log.debug(f"Detected single process definition for {process_id=} at {url=}")
spec = doc
if spec["id"] != process_id:
raise OpenEOApiException(
status_code=400,
code="ProcessIdMismatch",
message=f"Mismatch between expected process {process_id!r} and process {spec['id']!r} defined at {url!r}.",
)
elif "processes" in doc and "links" in doc:
_log.debug(f"Searching for {process_id=} at process listing {url=}")
found = [
p for p in doc["processes"] if isinstance(p, dict) and p.get("id") == process_id and p.get("process_graph")
]
if len(found) != 1:
raise OpenEOApiException(
status_code=400,
code="ProcessNotFound",
message=f"Process {process_id!r} not found in process listing at {url!r}.",
)
spec = found[0]
else:
raise OpenEOApiException(
status_code=400,
code="ProcessNotFound",
message=f"No valid process definition for {process_id!r} found at {url!r}.",
)

return ProcessDefinition(
id=process_id,
process_graph=spec["process_graph"],
parameters=spec.get("parameters", []),
returns=spec.get("returns"),
)
55 changes: 55 additions & 0 deletions tests/test_processgraph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from openeo_driver.errors import OpenEOApiException
from openeo_driver.processgraph import get_process_definition_from_url

import pytest


class TestProcessDefinitionFromUrl:
PROCESS_ADD35 = {
"id": "add35",
"process_graph": {"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}},
"parameters": [],
"returns": {"schema": {"type": "number"}},
}

PROCESS_ADD3PARAM = {
"id": "add3param",
"process_graph": {
"add": {"process_id": "add", "arguments": {"x": 3, "y": {"from_parameter": "delta"}}, "result": True}
},
"parameters": [
{"name": "delta", "schema": {"type": "number", "optional": True, "default": 1}},
],
"returns": {"schema": {"type": "number"}},
}

def test_get_process_definition_from_url_single(self, requests_mock):
requests_mock.get("https://share.test/add3param.json", json=self.PROCESS_ADD3PARAM)

pd = get_process_definition_from_url("add3param", "https://share.test/add3param.json")
assert pd.id == "add3param"
assert pd.process_graph == {
"add": {"process_id": "add", "arguments": {"x": 3, "y": {"from_parameter": "delta"}}, "result": True},
}
assert pd.parameters == [{"name": "delta", "schema": {"type": "number", "optional": True, "default": 1}}]
assert pd.returns == {"schema": {"type": "number"}}

def test_get_process_definition_from_url_listing(self, requests_mock):
requests_mock.get(
"https://share.test/processes/",
json={
"processes": [
self.PROCESS_ADD35,
self.PROCESS_ADD3PARAM,
],
"links": [],
},
)

pd = get_process_definition_from_url("add3param", "https://share.test/processes/")
assert pd.id == "add3param"
assert pd.process_graph == {
"add": {"process_id": "add", "arguments": {"x": 3, "y": {"from_parameter": "delta"}}, "result": True},
}
assert pd.parameters == [{"name": "delta", "schema": {"type": "number", "optional": True, "default": 1}}]
assert pd.returns == {"schema": {"type": "number"}}
Loading

0 comments on commit ec8b3b0

Please sign in to comment.