Skip to content

Commit

Permalink
Issue #531 implement DynamicEtlApiJobCostCalculator
Browse files Browse the repository at this point in the history
Add TTL based caching feature to `get_etl_api` to avoid full EtlApi setup (including Http session andOIDC client cred auth) repeatedly
  • Loading branch information
soxofaan committed Dec 15, 2023
1 parent 608663d commit ebab59b
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 9 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ without compromising stable operations.
## Unreleased


## 0.21.5

- Initial implementation of `DynamicEtlApiJobCostCalculator` and added caching feature to `get_etl_api()` ([#531](https://github.com/Open-EO/openeo-geopyspark-driver/issues/531))

## 0.21.4

- Support for reading GeoPackage vector data
Expand Down
2 changes: 1 addition & 1 deletion openeogeotrellis/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.21.4a1"
__version__ = "0.21.5a1"
2 changes: 2 additions & 0 deletions openeogeotrellis/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -1472,6 +1472,8 @@ def request_costs(
and flask.request.args.get(backend_config.etl_dynamic_api_flag)
),
requests_session=requests_session,
# No EtlApi caching to avoid issues with expiring access tokens.
etl_api_cache=None,
)

costs = etl_api.log_resource_usage(
Expand Down
39 changes: 32 additions & 7 deletions openeogeotrellis/integrations/etl_api.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import logging
import os
from typing import Dict, Optional, Union
from typing import Dict, Optional, Union, Callable

import requests
from openeo_driver.config import get_backend_config
from openeo_driver.users import User
from openeo_driver.util.auth import ClientCredentials, ClientCredentialsAccessTokenHelper
from openeo_driver.util.caching import TtlCache

from openeogeotrellis.config import get_backend_config
from openeogeotrellis.config.config import EtlApiConfig
Expand Down Expand Up @@ -51,6 +52,8 @@ class EtlApi:
and deriving a cost estimate.
"""

__slots__ = ("_endpoint", "_source_id", "_session", "_access_token_helper")

def __init__(
self,
endpoint: str,
Expand All @@ -59,6 +62,7 @@ def __init__(
source_id: Optional[str] = None,
requests_session: Optional[requests.Session] = None,
):
_log.debug(f"EtlApi.__init__() with {endpoint}")
self._endpoint = endpoint
self._source_id = source_id or get_backend_config().etl_source_id
self._session = requests_session or requests.Session()
Expand Down Expand Up @@ -237,27 +241,48 @@ def get_etl_api(
requests_session: Optional[requests.Session] = None,
# TODO #531 remove this temporary feature flag/toggle for dynamic ETL selection.
allow_dynamic_etl_api: bool = False,
etl_api_cache: Optional[TtlCache] = None,
) -> EtlApi:
"""Get EtlApi, possibly depending on additional data (pre-determined root_url, current user, ...)."""
backend_config = get_backend_config()
etl_config: Optional[EtlApiConfig] = backend_config.etl_api_config

def get_cached_or_build(cache_key: str, build: Callable[[], EtlApi]) -> EtlApi:
"""Helper to build an EtlApi object, with optional caching."""
if etl_api_cache:
return etl_api_cache.get_or_call(cache_key, build)
else:
return build()

dynamic_etl_mode = allow_dynamic_etl_api and (etl_config is not None)
if dynamic_etl_mode:
_log.debug("get_etl_api: dynamic EtlApiConfig based ETL API selection")
# First get root URL as main ETL API identifier
if root_url is None:
root_url = etl_config.get_root_url(user=user, job_options=job_options)
client_credentials = etl_config.get_client_credentials(root_url=root_url)
return EtlApi(endpoint=root_url, credentials=client_credentials, requests_session=requests_session)

# Build EtlApi (or get from cache if possible)
return get_cached_or_build(
cache_key=root_url,
build=lambda: EtlApi(
endpoint=root_url,
credentials=etl_config.get_client_credentials(root_url=root_url),
requests_session=requests_session,
),
)
else:
# TODO #531 eliminate this code path
_log.debug("get_etl_api: legacy static EtlApi")
return EtlApi(
endpoint=backend_config.etl_api,
credentials=get_etl_api_credentials_from_env(),
requests_session=requests_session,
return get_cached_or_build(
cache_key="_",
build=lambda: EtlApi(
endpoint=backend_config.etl_api,
credentials=get_etl_api_credentials_from_env(),
requests_session=requests_session,
),
)


def assert_resource_logging_possible():
# TODO: still necessary to keep this function around, or was this just a temp debugging thing?

Expand Down
27 changes: 26 additions & 1 deletion openeogeotrellis/job_costs_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import logging
from typing import NamedTuple, Optional, List

from openeogeotrellis.integrations.etl_api import EtlApi, ETL_API_STATUS
from openeo_driver.util.caching import TtlCache
from openeo_driver.util.http import requests_with_retry
from openeogeotrellis.integrations.etl_api import EtlApi, ETL_API_STATUS, get_etl_api

_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -48,6 +50,7 @@ class EtlApiJobCostsCalculator(JobCostsCalculator):
Base class for cost calculators based on resource reporting with ETL API.
"""
def __init__(self, etl_api: EtlApi):
super().__init__()
self._etl_api = etl_api

def calculate_costs(self, details: CostsDetails) -> float:
Expand Down Expand Up @@ -88,3 +91,25 @@ def calculate_costs(self, details: CostsDetails) -> float:
) for process_id in details.unique_process_ids)

return resource_costs_in_credits + added_value_costs_in_credits


class DynamicEtlApiJobCostCalculator(JobCostsCalculator):
"""
Like EtlApiJobCostsCalculator but with an ETL API endpoint that is determined based on user or job data
"""

def __init__(self, cache_ttl: int = 5 * 60):
self._request_session = requests_with_retry(total=3, backoff_factor=2)
self._etl_cache: Optional[TtlCache] = TtlCache(default_ttl=cache_ttl) if cache_ttl > 0 else None

def calculate_costs(self, details: CostsDetails) -> float:
job_options = details.job_options or {}
etl_api = get_etl_api(
job_options=job_options,
allow_dynamic_etl_api=True,
requests_session=self._request_session,
etl_api_cache=self._etl_cache,
)
_log.debug(f"DynamicEtlApiJobCostCalculator.calculate_costs with {etl_api=}")
# Reuse logic from EtlApiJobCostsCalculator
return EtlApiJobCostsCalculator(etl_api=etl_api).calculate_costs(details=details)
61 changes: 61 additions & 0 deletions tests/integrations/test_etl_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from openeo.rest.auth.testing import OidcMock
from openeo_driver.users import User
from openeo_driver.util.auth import ClientCredentials
from openeo_driver.util.caching import TtlCache
from openeogeotrellis.integrations.etl_api import (
ETL_API_STATE,
get_etl_api_credentials_from_env,
Expand Down Expand Up @@ -146,3 +147,63 @@ def test_dynamic_etl_api_with_job_options(self, custom_etl_api_config, requests_
assert isinstance(etl_api, EtlApi)
assert etl_api.root_url == "https://etl.planb.test"
self.assert_etl_access_token(etl_api=etl_api, requests_mock=requests_mock, oidc_mock=oidc_mock)

def test_legacy_mode_with_caching(self, etl_credentials_in_env, time_machine, oidc_mock):
assert oidc_mock.mocks["oidc_discovery"].call_count == 0

time_machine.move_to("2023-04-05T12:00:00Z")
etl_api_cache = TtlCache(default_ttl=60)
etl_api1 = get_etl_api(etl_api_cache=etl_api_cache)
assert isinstance(etl_api1, EtlApi)
assert oidc_mock.mocks["oidc_discovery"].call_count == 1

time_machine.move_to("2023-04-05T12:00:10Z")
etl_api2 = get_etl_api(etl_api_cache=etl_api_cache)
assert isinstance(etl_api2, EtlApi)
assert etl_api2 is etl_api1
assert oidc_mock.mocks["oidc_discovery"].call_count == 1

time_machine.move_to("2023-04-05T12:30:00Z")
etl_api3 = get_etl_api(etl_api_cache=etl_api_cache)
assert isinstance(etl_api3, EtlApi)
assert etl_api3 is not etl_api1
assert etl_api3 is not etl_api2
assert oidc_mock.mocks["oidc_discovery"].call_count == 2
etl_api4 = get_etl_api(etl_api_cache=etl_api_cache)
assert isinstance(etl_api4, EtlApi)
assert etl_api4 is etl_api3
assert oidc_mock.mocks["oidc_discovery"].call_count == 2

def test_dynamic_mode_with_caching(self, custom_etl_api_config, time_machine, oidc_mock):
with gps_config_overrides(etl_api_config=custom_etl_api_config):
etl_api_cache = TtlCache(default_ttl=60)
assert oidc_mock.mocks["oidc_discovery"].call_count == 0

time_machine.move_to("2023-04-05T12:00:00Z")
etl_api1 = get_etl_api(user=User("alice"), allow_dynamic_etl_api=True, etl_api_cache=etl_api_cache)
assert isinstance(etl_api1, EtlApi)
assert oidc_mock.mocks["oidc_discovery"].call_count == 1

etl_api2 = get_etl_api(user=User("alphonse"), allow_dynamic_etl_api=True, etl_api_cache=etl_api_cache)
assert isinstance(etl_api2, EtlApi)
assert etl_api2 is etl_api1
assert oidc_mock.mocks["oidc_discovery"].call_count == 1

time_machine.move_to("2023-04-05T12:00:10Z")
etl_api3 = get_etl_api(user=User("alice"), allow_dynamic_etl_api=True, etl_api_cache=etl_api_cache)
assert isinstance(etl_api3, EtlApi)
assert etl_api3 is etl_api1
assert oidc_mock.mocks["oidc_discovery"].call_count == 1

time_machine.move_to("2023-04-05T12:30:00Z")
etl_api4 = get_etl_api(user=User("alice"), allow_dynamic_etl_api=True, etl_api_cache=etl_api_cache)
assert isinstance(etl_api4, EtlApi)
assert etl_api4 is not etl_api1
assert etl_api4 is not etl_api2
assert etl_api4 is not etl_api3
assert oidc_mock.mocks["oidc_discovery"].call_count == 2

etl_api5 = get_etl_api(user=User("alfred"), allow_dynamic_etl_api=True, etl_api_cache=etl_api_cache)
assert isinstance(etl_api5, EtlApi)
assert etl_api5 is etl_api4
assert oidc_mock.mocks["oidc_discovery"].call_count == 2
111 changes: 111 additions & 0 deletions tests/test_job_cost_calculator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from typing import Optional, List

import pytest

from openeo.rest.auth.testing import OidcMock
from openeo_driver.testing import DictSubSet
from openeo_driver.users import User
from openeo_driver.util.auth import ClientCredentials
from openeogeotrellis.integrations.etl_api import DynamicEtlApiConfig
from openeogeotrellis.job_costs_calculator import DynamicEtlApiJobCostCalculator, CostsDetails
from openeogeotrellis.testing import gps_config_overrides


class TestDynamicEtlApiJobCostCalculator:
@pytest.fixture
def etl_credentials(self) -> ClientCredentials:
"""Default client credentials for ETL API access"""
return ClientCredentials(oidc_issuer="https://oidc.test", client_id="client123", client_secret="s3cr3t")

@pytest.fixture
def custom_etl_api_config(self, etl_credentials):
class CustomEtlConfig(DynamicEtlApiConfig):
def get_root_url(self, *, user: Optional[User] = None, job_options: Optional[dict] = None) -> str:
return {"alt": "https://etl-alt.test", "planb": "https://etl.planb.test"}[job_options["my_etl"]]

return CustomEtlConfig(
urls_and_credentials={
# Note using same credentials for all ETL API instances, to keep testing here simple
"https://etl-alt.test": etl_credentials,
"https://etl.planb.test": etl_credentials,
}
)

@pytest.fixture(autouse=True)
def oidc_mock(self, requests_mock, etl_credentials: ClientCredentials) -> OidcMock:
oidc_mock = OidcMock(
requests_mock=requests_mock,
oidc_issuer=etl_credentials.oidc_issuer,
expected_grant_type="client_credentials",
expected_client_id=etl_credentials.client_id,
expected_fields={"client_secret": etl_credentials.client_secret, "scope": "openid"},
)
return oidc_mock

def _build_post_resources_handler(self, oidc_mock: OidcMock, expected_data: dict, expected_result: List[dict]):
"""Build a requests_mock handler for etl api `POST /resources` request"""

def post_resources(request, context):
"""Handler for etl api `POST /resources` request"""
assert request.headers["Authorization"] == "Bearer " + oidc_mock.state["access_token"]
assert request.json() == DictSubSet(expected_data)
return expected_result

return post_resources

def test_calculate_cost(self, custom_etl_api_config, requests_mock, oidc_mock):
with gps_config_overrides(etl_api_config=custom_etl_api_config):
calculator = DynamicEtlApiJobCostCalculator()

mock_alt = requests_mock.post(
"https://etl-alt.test/resources",
json=self._build_post_resources_handler(
oidc_mock,
expected_data={
"jobId": "job-123",
"userId": "john",
"executionId": "exec123",
"state": "FINISHED",
"status": "UNDEFINED", # TODO #610
},
expected_result=[{"cost": 33}, {"cost": 55}],
),
)
mock_planb = requests_mock.post(
"https://etl.planb.test/resources",
json=self._build_post_resources_handler(
oidc_mock,
expected_data={
"jobId": "job-456",
"userId": "john",
"executionId": "exec456",
"state": "FAILED",
"status": "UNDEFINED", # TODO #610
},
expected_result=[{"cost": 100}, {"cost": 2000}],
),
)

costs_details = CostsDetails(
job_id="job-123",
user_id="john",
execution_id="exec123",
job_options={"my_etl": "alt"},
app_state_etl_api_deprecated="FINISHED",
job_status="finished",
)
costs = calculator.calculate_costs(costs_details)
assert costs == 88.0
assert (mock_alt.call_count, mock_planb.call_count) == (1, 0)

costs_details = CostsDetails(
job_id="job-456",
user_id="john",
execution_id="exec456",
job_options={"my_etl": "planb"},
app_state_etl_api_deprecated="FAILED",
job_status="failed",
)
costs = calculator.calculate_costs(costs_details)
assert costs == 2100.0
assert (mock_alt.call_count, mock_planb.call_count) == (1, 1)

0 comments on commit ebab59b

Please sign in to comment.