Skip to content

Commit

Permalink
job_cost_calculator.py: finetune app state mapping (#531 related)
Browse files Browse the repository at this point in the history
Various doc tweaks as side-effect of getting to know the ETL implementation
  • Loading branch information
soxofaan committed Oct 13, 2023
1 parent 1f7fa04 commit 1450870
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 23 deletions.
1 change: 1 addition & 0 deletions openeogeotrellis/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -1498,6 +1498,7 @@ def set_terrascope_access_token_getter(self, get_terrascope_access_token: Callab
self.batch_jobs.set_terrascope_access_token_getter(get_terrascope_access_token)

def request_costs(self, user_id: str, request_id: str, success: bool) -> Optional[float]:
"""Get resource usage cost associated with (current) synchronous processing request."""
return self._get_request_costs(user_id, request_id, success)


Expand Down
24 changes: 22 additions & 2 deletions openeogeotrellis/integrations/etl_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@


class ETL_API_STATE:
# https://etl.terrascope.be/docs/#/resources/ResourcesController_upsertResource

"""
Possible values for the "state" field in the ETL API
(e.g. `POST /resources` endpoint https://etl.terrascope.be/docs/#/resources/ResourcesController_upsertResource).
Note that this roughly corresponds to YARN app state (for legacy reasons), also see `YARN_STATE`.
"""
ACCEPTED = "ACCEPTED"
RUNNING = "RUNNING"
FINISHED = "FINISHED"
Expand All @@ -22,7 +25,24 @@ class ETL_API_STATE:
UNDEFINED = "UNDEFINED"


class ETL_API_STATUS:
"""
Possible values for the "status" field in the ETL API
(e.g. `POST /resources` endpoint https://etl.terrascope.be/docs/#/resources/ResourcesController_upsertResource).
Note that this roughly corresponds to YARN final status (for legacy reasons), also see `YARN_FINAL_STATUS`.
"""

SUCCEEDED = "SUCCEEDED"
FAILED = "FAILED"
KILLED = "KILLED"
UNDEFINED = "UNDEFINED"


class EtlApi:
"""
API for reporting resource usage and added value to the ETL (EOPlaza marketplace) API
and deriving a cost estimate.
"""
def __init__(self, endpoint: str, source_id: str, requests_session: Optional[requests.Session] = None):
self._endpoint = endpoint
self._source_id = source_id
Expand Down
12 changes: 10 additions & 2 deletions openeogeotrellis/integrations/yarn.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@


class YARN_STATE:
# From https://hadoop.apache.org/docs/r3.1.1/api/org/apache/hadoop/yarn/api/records/YarnApplicationState.html
"""
Enumeration of various states of a YARN ApplicationMaster
Also see https://hadoop.apache.org/docs/r3.1.1/api/org/apache/hadoop/yarn/api/records/YarnApplicationState.html
"""
ACCEPTED = "ACCEPTED"
FAILED = "FAILED"
FINISHED = "FINISHED"
Expand All @@ -18,7 +22,11 @@ class YARN_STATE:


class YARN_FINAL_STATUS:
# From https://hadoop.apache.org/docs/r3.1.1/api/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.html
"""
Enumeration of various final states of a YARN Application
Also see https://hadoop.apache.org/docs/r3.1.1/api/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.html
"""
ENDED = "ENDED"
FAILED = "FAILED"
KILLED = "KILLED"
Expand Down
52 changes: 33 additions & 19 deletions openeogeotrellis/job_costs_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
import logging
from typing import NamedTuple, Optional, List

from openeogeotrellis.integrations.etl_api import EtlApi, ETL_API_STATE
from openeogeotrellis.integrations.etl_api import EtlApi, ETL_API_STATE, ETL_API_STATUS
from openeogeotrellis.integrations.kubernetes import K8S_SPARK_APP_STATE

from openeogeotrellis.integrations.yarn import YARN_STATE

_log = logging.getLogger(__name__)


class CostsDetails(NamedTuple): # for lack of a better name
"""
Container for batch job details that are relevant for reporting resource usage and calculating costs.
"""
job_id: str
user_id: str
execution_id: str
Expand Down Expand Up @@ -40,12 +43,16 @@ def calculate_costs(self, details: CostsDetails) -> float:


class EtlApiJobCostsCalculator(JobCostsCalculator):
"""
Base class for cost calculators based on resource reporting with ETL API.
"""
def __init__(self, etl_api: EtlApi, etl_api_access_token: str):
self._etl_api = etl_api
self._etl_api_access_token = etl_api_access_token

@abc.abstractmethod
def etl_api_state(self, app_state: str) -> str:
"""Map implementation specific Spark/Kubernetes app state to standardized ETL_API_STATE value."""
raise NotImplementedError

def calculate_costs(self, details: CostsDetails) -> float:
Expand All @@ -61,7 +68,7 @@ def calculate_costs(self, details: CostsDetails) -> float:
started_ms=started_ms,
finished_ms=finished_ms,
state=self.etl_api_state(details.app_state),
status='UNDEFINED', # TODO: map as well? it's just for reporting
status=ETL_API_STATUS.UNDEFINED, # TODO: map as well? it's just for reporting
cpu_seconds=details.cpu_seconds,
mb_seconds=details.mb_seconds,
duration_ms=duration_ms,
Expand All @@ -88,26 +95,33 @@ def calculate_costs(self, details: CostsDetails) -> float:


class YarnJobCostsCalculator(EtlApiJobCostsCalculator):
def __init__(self, etl_api: EtlApi, etl_api_access_token: str):
super().__init__(etl_api, etl_api_access_token)
_yarn_state_to_etl_api_state = {
YARN_STATE.ACCEPTED: ETL_API_STATE.ACCEPTED,
YARN_STATE.RUNNING: ETL_API_STATE.RUNNING,
YARN_STATE.FINISHED: ETL_API_STATE.FINISHED,
YARN_STATE.KILLED: ETL_API_STATE.KILLED,
YARN_STATE.FAILED: ETL_API_STATE.FAILED,
}

def etl_api_state(self, app_state: str) -> str:
return app_state
if app_state not in self._yarn_state_to_etl_api_state:
_log.warning(f"Unhandled YARN app state mapping: {app_state}")
return self._yarn_state_to_etl_api_state.get(app_state, ETL_API_STATE.UNDEFINED)


class K8sJobCostsCalculator(EtlApiJobCostsCalculator):
def __init__(self, etl_api: EtlApi, etl_api_access_token: str):
super().__init__(etl_api, etl_api_access_token)
_k8s_state_to_etl_api_state = {
K8S_SPARK_APP_STATE.NEW: ETL_API_STATE.ACCEPTED,
K8S_SPARK_APP_STATE.SUBMITTED: ETL_API_STATE.ACCEPTED,
K8S_SPARK_APP_STATE.RUNNING: ETL_API_STATE.RUNNING,
K8S_SPARK_APP_STATE.SUCCEEDING: ETL_API_STATE.RUNNING,
K8S_SPARK_APP_STATE.COMPLETED: ETL_API_STATE.FINISHED,
K8S_SPARK_APP_STATE.FAILED: ETL_API_STATE.FAILED,
K8S_SPARK_APP_STATE.SUBMISSION_FAILED: ETL_API_STATE.FAILED,
K8S_SPARK_APP_STATE.FAILING: ETL_API_STATE.FAILED,
}

def etl_api_state(self, app_state: str) -> str:
if app_state in {K8S_SPARK_APP_STATE.NEW, K8S_SPARK_APP_STATE.SUBMITTED}:
return ETL_API_STATE.ACCEPTED
if app_state in {K8S_SPARK_APP_STATE.RUNNING, K8S_SPARK_APP_STATE.SUCCEEDING}:
return ETL_API_STATE.RUNNING
if app_state == K8S_SPARK_APP_STATE.COMPLETED:
return ETL_API_STATE.FINISHED
if app_state in {K8S_SPARK_APP_STATE.FAILED, K8S_SPARK_APP_STATE.SUBMISSION_FAILED,
K8S_SPARK_APP_STATE.FAILING}:
return ETL_API_STATE.FAILED

return ETL_API_STATE.ACCEPTED
if app_state not in self._k8s_state_to_etl_api_state:
_log.warning(f"Unhandled K8s app state mapping {app_state}")
return self._k8s_state_to_etl_api_state.get(app_state, ETL_API_STATE.UNDEFINED)

0 comments on commit 1450870

Please sign in to comment.