Skip to content

Commit

Permalink
Support job_options in synchronous processing (experimental)
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed May 2, 2024
1 parent addcdb3 commit 0709e42
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 28 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@ and start a new "In Progress" section above it.

## In progress


## 0.99.0

- Support `job_options` in synchronous processing (experimental)
(related to [Open-EO/openeo-geopyspark-driver#531](https://github.com/Open-EO/openeo-geopyspark-driver/issues/531), eu-cdse/openeo-cdse-infra#114)

## 0.98.0

- Add `job_options` argument to `OpenEoBackendImplementation.request_costs()` API.
It's optional and unused for now, but allows openeo-geopyspark-driver to adapt already.
(related to [Open-EO/openeo-geopyspark-driver#531](https://github.com/Open-EO/openeo-geopyspark-driver/issues/531))
(related to [Open-EO/openeo-geopyspark-driver#531](https://github.com/Open-EO/openeo-geopyspark-driver/issues/531), eu-cdse/openeo-cdse-infra#114)

## 0.97.0

Expand Down
2 changes: 1 addition & 1 deletion openeo_driver/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.98.0a1"
__version__ = "0.99.0a1"
2 changes: 1 addition & 1 deletion openeo_driver/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def __init__(self, spec_root: Path = SPECS_ROOT / 'openeo-processes/1.x', argume
# Expected argument names that process function signature should start with
self._argument_names = argument_names

def _key(self, name: str, namespace: str = DEFAULT_NAMESPACE) -> tuple:
def _key(self, name: str, namespace: str = DEFAULT_NAMESPACE) -> Tuple[str, str]:
"""Lookup key for in `_processes` dict"""
return namespace, name

Expand Down
59 changes: 42 additions & 17 deletions openeo_driver/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
import re
import textwrap
from collections import namedtuple, defaultdict
from typing import Callable, Tuple, List, Optional
import typing
from typing import Callable, Tuple, List, Optional, Union

import flask
import flask_cors
Expand Down Expand Up @@ -588,6 +589,18 @@ def _extract_process_graph(post_data: dict) -> dict:
return pg


def _extract_job_options(post_data: dict, to_ignore: typing.Container[str]) -> Union[dict, None]:
"""
Extract "job options" from request data:
look for an explicit "job_options" property or collect non-predefined top-level properties
"""
if "job_options" in post_data:
# Return explicit "job_options" property as is
return post_data["job_options"]
# Collect all non-deny-listed top-level properties
return {k: v for k, v in post_data.items() if k not in to_ignore} or None


def register_views_processing(
blueprint: Blueprint, backend_implementation: OpenEoBackendImplementation, api_endpoint: EndpointRegistry,
auth_handler: HttpAuthHandler
Expand Down Expand Up @@ -615,18 +628,28 @@ def validation():
def result(user: User):
post_data = request.get_json()
process_graph = _extract_process_graph(post_data)
budget = post_data.get("budget")
plan = post_data.get("plan")
log_level = post_data.get("log_level")
job_options = _extract_job_options(
post_data, to_ignore=["process", "process_graph", "budget", "plan", "log_level"]
)

request_id = FlaskRequestCorrelationIdLogging.get_request_id()

env = EvalEnv({
"backend_implementation": backend_implementation,
'version': g.api_version,
'pyramid_levels': 'highest',
'user': user,
'require_bounds': True,
'correlation_id': request_id,
'node_caching': False
})
env = EvalEnv(
{
"backend_implementation": backend_implementation,
"version": g.api_version,
"pyramid_levels": "highest",
"user": user,
"require_bounds": True,
"correlation_id": request_id,
"node_caching": False,
# TODO: more explicit way of passing the job_options instead of putting it in the evaluation env?
"job_options": job_options,
}
)

try:
try:
Expand Down Expand Up @@ -659,14 +682,18 @@ def result(user: User):
result = to_save_result(data=result)
response = result.create_flask_response()

costs = backend_implementation.request_costs(success=True, user=user, request_id=request_id)
costs = backend_implementation.request_costs(
success=True, user=user, request_id=request_id, job_options=job_options
)
if costs:
# TODO not all costs are accounted for so don't expose in "OpenEO-Costs" yet
response.headers["OpenEO-Costs-experimental"] = costs

except Exception:
# TODO: also send "OpenEO-Costs" header on failure
backend_implementation.request_costs(success=False, user=user, request_id=request_id)
backend_implementation.request_costs(
success=False, user=user, request_id=request_id, job_options=job_options
)
raise

# Add request id as "OpenEO-Identifier" like we do for batch jobs.
Expand Down Expand Up @@ -823,12 +850,10 @@ def create_job(user: User):
# TODO: preserve original non-process_graph process fields too?
process = {"process_graph": _extract_process_graph(post_data)}
# TODO: this "job_options" is not part of official API. See https://github.com/Open-EO/openeo-api/issues/276
job_options = post_data.get("job_options")
job_options = _extract_job_options(
post_data, to_ignore=["process", "process_graph", "title", "description", "plan", "budget", "log_level"]
)
metadata_keywords = ["title", "description", "plan", "budget"]
if("job_options" not in post_data):
job_options = {k:v for (k,v) in post_data.items() if k not in metadata_keywords + ["process","process_graph"]}
if len(job_options)==0:
job_options = None
job_info = backend_implementation.batch_jobs.create_job(
**filter_supported_kwargs(
callable=backend_implementation.batch_jobs.create_job,
Expand Down
66 changes: 58 additions & 8 deletions tests/test_views_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
from openeo_driver.dummy import dummy_backend
from openeo_driver.dummy.dummy_backend import DummyVisitor
from openeo_driver.errors import ProcessGraphInvalidException, ProcessGraphMissingException
from openeo_driver.ProcessGraphDeserializer import custom_process_from_process_graph
from openeo_driver.ProcessGraphDeserializer import custom_process_from_process_graph, custom_process
from openeo_driver.processes import ProcessArgs, ProcessRegistry
from openeo_driver.testing import (
TEST_USER,
TEST_USER_BEARER_TOKEN,
Expand Down Expand Up @@ -3817,17 +3818,27 @@ def test_vector_buffer_returns_error_on_empty_result_geometry(api):


@pytest.mark.parametrize(
["request_costs", "expected_costs_header"],
["request_costs", "job_options", "expected_costs_header"],
[
# Default backend_implementation.request_costs
(None, None),
(None, None, None),
# request_costs override
(lambda user, request_id, success: 1234 + isinstance(user, User), "1235"),
(
lambda user, request_id, success, job_options: 1234 + isinstance(user, User),
None,
"1235",
),
# Extra job options handling
(
lambda user, request_id, success, job_options: 1234 * job_options.get("extra", 0),
{"extra": 2},
"2468",
),
],
)
@pytest.mark.parametrize("success", [False, True])
def test_synchronous_processing_request_costs(
api, backend_implementation, request_costs, success, expected_costs_header
api, backend_implementation, request_costs, job_options, success, expected_costs_header
):
if request_costs is None:
request_costs = backend_implementation.request_costs
Expand All @@ -3844,9 +3855,13 @@ def test_synchronous_processing_request_costs(
), mock.patch.object(
backend_implementation, "request_costs", side_effect=request_costs, autospec=request_costs
) as get_request_costs:
resp = api.result(
{"lc": {"process_id": "load_collection", "arguments": {"id": "S2_FAPAR_CLOUDCOVER"}, "result": True}}
)
api.ensure_auth_header()
pg = {"lc": {"process_id": "load_collection", "arguments": {"id": "S2_FAPAR_CLOUDCOVER"}, "result": True}}
post_data = {
"process": {"process_graph": pg},
"job_options": job_options,
}
resp = api.post(path="/result", json=post_data)
if success:
resp.assert_status_code(200)
if expected_costs_header:
Expand All @@ -3861,6 +3876,7 @@ def test_synchronous_processing_request_costs(

get_request_costs.assert_called_with(
user=User(TEST_USER, internal_auth_data={"authentication_method": "basic"}),
job_options=job_options,
success=success,
request_id="r-abc123",
)
Expand Down Expand Up @@ -4231,3 +4247,37 @@ def test_synchronous_processing_response_header_openeo_identifier(api):
res = api.result({"add1": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}})
assert res.assert_status_code(200).json == 8
assert res.headers["OpenEO-Identifier"] == "r-abc123"


@pytest.fixture
def custom_process_registry(backend_implementation) -> ProcessRegistry:
process_registry = ProcessRegistry()
with mock.patch.object(backend_implementation.processing, "get_process_registry", return_value=process_registry):
yield process_registry


@pytest.mark.parametrize(
["post_data_base", "expected_job_options"],
[
({}, None),
({"job_options": {"speed": "slow"}}, {"speed": "slow"}),
({"_x_speed": "slow"}, {"_x_speed": "slow"}),
],
)
def test_synchronous_processing_job_options(api, custom_process_registry, post_data_base, expected_job_options):
"""Test job options handling in synchronous processing in EvalEnv"""

def i_spy_with_my_little_eye(args: ProcessArgs, env: EvalEnv):
assert env.get("job_options") == expected_job_options
return args.get("x")

custom_process_registry.add_function(i_spy_with_my_little_eye, spec={"id": "i_spy_with_my_little_eye"})

pg = {"ispy": {"process_id": "i_spy_with_my_little_eye", "arguments": {"x": 123}, "result": True}}
post_data = {
**post_data_base,
**{"process": {"process_graph": pg}},
}
api.ensure_auth_header()
res = api.post(path="/result", json=post_data)
assert res.assert_status_code(200).json == 123

0 comments on commit 0709e42

Please sign in to comment.