Skip to content

Commit

Permalink
Issue #531 implement DynamicEtlApiJobCostCalculator
Browse files Browse the repository at this point in the history
Add TTL based caching feature to `get_etl_api` to avoid full EtlApi setup (including Http session andOIDC client cred auth) repeatedly
  • Loading branch information
soxofaan committed Dec 15, 2023
1 parent 608663d commit b817ef1
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 10 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ without compromising stable operations.
## Unreleased


## 0.21.5

- Initial implementation of `DynamicEtlApiJobCostCalculator` and added caching feature to `get_etl_api()` ([#531](https://github.com/Open-EO/openeo-geopyspark-driver/issues/531))

## 0.21.4

- Support for reading GeoPackage vector data
Expand Down
2 changes: 1 addition & 1 deletion openeogeotrellis/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.21.4a1"
__version__ = "0.21.5a1"
2 changes: 2 additions & 0 deletions openeogeotrellis/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -1472,6 +1472,8 @@ def request_costs(
and flask.request.args.get(backend_config.etl_dynamic_api_flag)
),
requests_session=requests_session,
# TODO #531 provide a TtlCache here
etl_api_cache=None,
)

costs = etl_api.log_resource_usage(
Expand Down
49 changes: 41 additions & 8 deletions openeogeotrellis/integrations/etl_api.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import logging
import os
from typing import Dict, Optional, Union
from typing import Dict, Optional, Union, Callable

import requests
from openeo_driver.config import get_backend_config
from openeo_driver.users import User
from openeo_driver.util.auth import ClientCredentials, ClientCredentialsAccessTokenHelper
from openeo_driver.util.caching import TtlCache

from openeogeotrellis.config import get_backend_config
from openeogeotrellis.config.config import EtlApiConfig
Expand Down Expand Up @@ -51,6 +52,8 @@ class EtlApi:
and deriving a cost estimate.
"""

__slots__ = ("_endpoint", "_source_id", "_session", "_access_token_helper")

def __init__(
self,
endpoint: str,
Expand All @@ -59,6 +62,7 @@ def __init__(
source_id: Optional[str] = None,
requests_session: Optional[requests.Session] = None,
):
_log.debug(f"EtlApi.__init__() with {endpoint=} {source_id=}")
self._endpoint = endpoint
self._source_id = source_id or get_backend_config().etl_source_id
self._session = requests_session or requests.Session()
Expand Down Expand Up @@ -237,27 +241,56 @@ def get_etl_api(
requests_session: Optional[requests.Session] = None,
# TODO #531 remove this temporary feature flag/toggle for dynamic ETL selection.
allow_dynamic_etl_api: bool = False,
etl_api_cache: Optional[TtlCache] = None,
) -> EtlApi:
"""Get EtlApi, possibly depending on additional data (pre-determined root_url, current user, ...)."""
"""
Get EtlApi, possibly depending on additional data (pre-determined root_url, current user, ...).
:param etl_api_cache: (optional) provide a `TtlCache` to fetch existing `EtlApi` instances from
to avoid repeatedly setting up `EtlApi` instances each time
(which can be costly due to client credentials related OIDC discovery requests).
Note that the caller is cache owner and responsible for providing the same cache instance on each call
and setting the default TTL (as desired).
"""
backend_config = get_backend_config()
etl_config: Optional[EtlApiConfig] = backend_config.etl_api_config

def get_cached_or_build(cache_key: tuple, build: Callable[[], EtlApi]) -> EtlApi:
"""Helper to build an EtlApi object, with optional caching."""
if etl_api_cache:
return etl_api_cache.get_or_call(cache_key, build)
else:
return build()

dynamic_etl_mode = allow_dynamic_etl_api and (etl_config is not None)
if dynamic_etl_mode:
_log.debug("get_etl_api: dynamic EtlApiConfig based ETL API selection")
# First get root URL as main ETL API identifier
if root_url is None:
root_url = etl_config.get_root_url(user=user, job_options=job_options)
client_credentials = etl_config.get_client_credentials(root_url=root_url)
return EtlApi(endpoint=root_url, credentials=client_credentials, requests_session=requests_session)

# Build EtlApi (or get from cache if possible)
return get_cached_or_build(
cache_key=("get_etl_api", root_url),
build=lambda: EtlApi(
endpoint=root_url,
credentials=etl_config.get_client_credentials(root_url=root_url),
requests_session=requests_session,
),
)
else:
# TODO #531 eliminate this code path
_log.debug("get_etl_api: legacy static EtlApi")
return EtlApi(
endpoint=backend_config.etl_api,
credentials=get_etl_api_credentials_from_env(),
requests_session=requests_session,
return get_cached_or_build(
cache_key=("get_etl_api", "__static_etl_api__"),
build=lambda: EtlApi(
endpoint=backend_config.etl_api,
credentials=get_etl_api_credentials_from_env(),
requests_session=requests_session,
),
)


def assert_resource_logging_possible():
# TODO: still necessary to keep this function around, or was this just a temp debugging thing?

Expand Down
27 changes: 26 additions & 1 deletion openeogeotrellis/job_costs_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import logging
from typing import NamedTuple, Optional, List

from openeogeotrellis.integrations.etl_api import EtlApi, ETL_API_STATUS
from openeo_driver.util.caching import TtlCache
from openeo_driver.util.http import requests_with_retry
from openeogeotrellis.integrations.etl_api import EtlApi, ETL_API_STATUS, get_etl_api

_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -48,6 +50,7 @@ class EtlApiJobCostsCalculator(JobCostsCalculator):
Base class for cost calculators based on resource reporting with ETL API.
"""
def __init__(self, etl_api: EtlApi):
super().__init__()
self._etl_api = etl_api

def calculate_costs(self, details: CostsDetails) -> float:
Expand Down Expand Up @@ -88,3 +91,25 @@ def calculate_costs(self, details: CostsDetails) -> float:
) for process_id in details.unique_process_ids)

return resource_costs_in_credits + added_value_costs_in_credits


class DynamicEtlApiJobCostCalculator(JobCostsCalculator):
"""
Like EtlApiJobCostsCalculator but with an ETL API endpoint that is determined based on user or job data
"""

def __init__(self, cache_ttl: int = 5 * 60):
self._request_session = requests_with_retry(total=3, backoff_factor=2)
self._etl_cache: Optional[TtlCache] = TtlCache(default_ttl=cache_ttl) if cache_ttl > 0 else None

def calculate_costs(self, details: CostsDetails) -> float:
job_options = details.job_options or {}
etl_api = get_etl_api(
job_options=job_options,
allow_dynamic_etl_api=True,
requests_session=self._request_session,
etl_api_cache=self._etl_cache,
)
_log.debug(f"DynamicEtlApiJobCostCalculator.calculate_costs with {etl_api=}")
# Reuse logic from EtlApiJobCostsCalculator
return EtlApiJobCostsCalculator(etl_api=etl_api).calculate_costs(details=details)
61 changes: 61 additions & 0 deletions tests/integrations/test_etl_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from openeo.rest.auth.testing import OidcMock
from openeo_driver.users import User
from openeo_driver.util.auth import ClientCredentials
from openeo_driver.util.caching import TtlCache
from openeogeotrellis.integrations.etl_api import (
ETL_API_STATE,
get_etl_api_credentials_from_env,
Expand Down Expand Up @@ -146,3 +147,63 @@ def test_dynamic_etl_api_with_job_options(self, custom_etl_api_config, requests_
assert isinstance(etl_api, EtlApi)
assert etl_api.root_url == "https://etl.planb.test"
self.assert_etl_access_token(etl_api=etl_api, requests_mock=requests_mock, oidc_mock=oidc_mock)

def test_legacy_mode_with_caching(self, etl_credentials_in_env, time_machine, oidc_mock):
assert oidc_mock.mocks["oidc_discovery"].call_count == 0

time_machine.move_to("2023-04-05T12:00:00Z")
etl_api_cache = TtlCache(default_ttl=60)
etl_api1 = get_etl_api(etl_api_cache=etl_api_cache)
assert isinstance(etl_api1, EtlApi)
assert oidc_mock.mocks["oidc_discovery"].call_count == 1

time_machine.move_to("2023-04-05T12:00:10Z")
etl_api2 = get_etl_api(etl_api_cache=etl_api_cache)
assert isinstance(etl_api2, EtlApi)
assert etl_api2 is etl_api1
assert oidc_mock.mocks["oidc_discovery"].call_count == 1

time_machine.move_to("2023-04-05T12:30:00Z")
etl_api3 = get_etl_api(etl_api_cache=etl_api_cache)
assert isinstance(etl_api3, EtlApi)
assert etl_api3 is not etl_api1
assert etl_api3 is not etl_api2
assert oidc_mock.mocks["oidc_discovery"].call_count == 2
etl_api4 = get_etl_api(etl_api_cache=etl_api_cache)
assert isinstance(etl_api4, EtlApi)
assert etl_api4 is etl_api3
assert oidc_mock.mocks["oidc_discovery"].call_count == 2

def test_dynamic_mode_with_caching(self, custom_etl_api_config, time_machine, oidc_mock):
with gps_config_overrides(etl_api_config=custom_etl_api_config):
etl_api_cache = TtlCache(default_ttl=60)
assert oidc_mock.mocks["oidc_discovery"].call_count == 0

time_machine.move_to("2023-04-05T12:00:00Z")
etl_api1 = get_etl_api(user=User("alice"), allow_dynamic_etl_api=True, etl_api_cache=etl_api_cache)
assert isinstance(etl_api1, EtlApi)
assert oidc_mock.mocks["oidc_discovery"].call_count == 1

etl_api2 = get_etl_api(user=User("alphonse"), allow_dynamic_etl_api=True, etl_api_cache=etl_api_cache)
assert isinstance(etl_api2, EtlApi)
assert etl_api2 is etl_api1
assert oidc_mock.mocks["oidc_discovery"].call_count == 1

time_machine.move_to("2023-04-05T12:00:10Z")
etl_api3 = get_etl_api(user=User("alice"), allow_dynamic_etl_api=True, etl_api_cache=etl_api_cache)
assert isinstance(etl_api3, EtlApi)
assert etl_api3 is etl_api1
assert oidc_mock.mocks["oidc_discovery"].call_count == 1

time_machine.move_to("2023-04-05T12:30:00Z")
etl_api4 = get_etl_api(user=User("alice"), allow_dynamic_etl_api=True, etl_api_cache=etl_api_cache)
assert isinstance(etl_api4, EtlApi)
assert etl_api4 is not etl_api1
assert etl_api4 is not etl_api2
assert etl_api4 is not etl_api3
assert oidc_mock.mocks["oidc_discovery"].call_count == 2

etl_api5 = get_etl_api(user=User("alfred"), allow_dynamic_etl_api=True, etl_api_cache=etl_api_cache)
assert isinstance(etl_api5, EtlApi)
assert etl_api5 is etl_api4
assert oidc_mock.mocks["oidc_discovery"].call_count == 2
111 changes: 111 additions & 0 deletions tests/test_job_cost_calculator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from typing import Optional, List

import pytest

from openeo.rest.auth.testing import OidcMock
from openeo_driver.testing import DictSubSet
from openeo_driver.users import User
from openeo_driver.util.auth import ClientCredentials
from openeogeotrellis.integrations.etl_api import DynamicEtlApiConfig
from openeogeotrellis.job_costs_calculator import DynamicEtlApiJobCostCalculator, CostsDetails
from openeogeotrellis.testing import gps_config_overrides


class TestDynamicEtlApiJobCostCalculator:
@pytest.fixture
def etl_credentials(self) -> ClientCredentials:
"""Default client credentials for ETL API access"""
return ClientCredentials(oidc_issuer="https://oidc.test", client_id="client123", client_secret="s3cr3t")

@pytest.fixture
def custom_etl_api_config(self, etl_credentials):
class CustomEtlConfig(DynamicEtlApiConfig):
def get_root_url(self, *, user: Optional[User] = None, job_options: Optional[dict] = None) -> str:
return {"alt": "https://etl-alt.test", "planb": "https://etl.planb.test"}[job_options["my_etl"]]

return CustomEtlConfig(
urls_and_credentials={
# Note using same credentials for all ETL API instances, to keep testing here simple
"https://etl-alt.test": etl_credentials,
"https://etl.planb.test": etl_credentials,
}
)

@pytest.fixture(autouse=True)
def oidc_mock(self, requests_mock, etl_credentials: ClientCredentials) -> OidcMock:
oidc_mock = OidcMock(
requests_mock=requests_mock,
oidc_issuer=etl_credentials.oidc_issuer,
expected_grant_type="client_credentials",
expected_client_id=etl_credentials.client_id,
expected_fields={"client_secret": etl_credentials.client_secret, "scope": "openid"},
)
return oidc_mock

def _build_post_resources_handler(self, oidc_mock: OidcMock, expected_data: dict, expected_result: List[dict]):
"""Build a requests_mock handler for etl api `POST /resources` request"""

def post_resources(request, context):
"""Handler for etl api `POST /resources` request"""
assert request.headers["Authorization"] == "Bearer " + oidc_mock.state["access_token"]
assert request.json() == DictSubSet(expected_data)
return expected_result

return post_resources

def test_calculate_cost(self, custom_etl_api_config, requests_mock, oidc_mock):
with gps_config_overrides(etl_api_config=custom_etl_api_config):
calculator = DynamicEtlApiJobCostCalculator()

mock_alt = requests_mock.post(
"https://etl-alt.test/resources",
json=self._build_post_resources_handler(
oidc_mock,
expected_data={
"jobId": "job-123",
"userId": "john",
"executionId": "exec123",
"state": "FINISHED",
"status": "UNDEFINED", # TODO #610
},
expected_result=[{"cost": 33}, {"cost": 55}],
),
)
mock_planb = requests_mock.post(
"https://etl.planb.test/resources",
json=self._build_post_resources_handler(
oidc_mock,
expected_data={
"jobId": "job-456",
"userId": "john",
"executionId": "exec456",
"state": "FAILED",
"status": "UNDEFINED", # TODO #610
},
expected_result=[{"cost": 100}, {"cost": 2000}],
),
)

costs_details = CostsDetails(
job_id="job-123",
user_id="john",
execution_id="exec123",
job_options={"my_etl": "alt"},
app_state_etl_api_deprecated="FINISHED",
job_status="finished",
)
costs = calculator.calculate_costs(costs_details)
assert costs == 88.0
assert (mock_alt.call_count, mock_planb.call_count) == (1, 0)

costs_details = CostsDetails(
job_id="job-456",
user_id="john",
execution_id="exec456",
job_options={"my_etl": "planb"},
app_state_etl_api_deprecated="FAILED",
job_status="failed",
)
costs = calculator.calculate_costs(costs_details)
assert costs == 2100.0
assert (mock_alt.call_count, mock_planb.call_count) == (1, 1)

0 comments on commit b817ef1

Please sign in to comment.