Skip to content

Commit

Permalink
Issue #236 EJR: initial unit test coverage for job_tracker with YARN+ZK
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Dec 13, 2022
1 parent 20bea51 commit 8852a8d
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 13 deletions.
24 changes: 18 additions & 6 deletions openeogeotrellis/job_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,37 @@
from decimal import Decimal
from typing import List, Dict, Callable, Union, Optional
import logging
from urllib.parse import urlparse

from deprecated import deprecated
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError, NodeExistsError
from urllib.parse import urlparse

from openeo.util import rfc3339
from openeo_driver.backend import BatchJobMetadata
from openeo_driver.errors import JobNotFoundException
from openeo_driver.jobregistry import JOB_STATUS
from openeogeotrellis.configparams import ConfigParams
from openeogeotrellis import sentinel_hub
from openeo_driver.errors import JobNotFoundException
from openeogeotrellis.testing import KazooClientMock


_log = logging.getLogger(__name__)


class ZkJobRegistry:
# TODO: improve encapsulation
def __init__(self, root_path: str = ConfigParams().batch_jobs_zookeeper_root_path,
zookeeper_hosts: str = ','.join(ConfigParams().zookeepernodes)):
self._root = root_path
self._zk = KazooClient(hosts=zookeeper_hosts)
def __init__(
self,
root_path: Optional[str] = None,
zk_client: Union[str, KazooClient, KazooClientMock, None] = None,
):
self._root = root_path or ConfigParams().batch_jobs_zookeeper_root_path
if zk_client is None:
zk_client = KazooClient(hosts=",".join(ConfigParams().zookeepernodes))
elif isinstance(zk_client, str):
zk_client = KazooClient(hosts=zk_client)
self._zk = zk_client

def ensure_paths(self):
self._zk.ensure_path(self._ongoing())
Expand Down Expand Up @@ -124,6 +132,9 @@ def set_status(self, job_id: str, user_id: str, status: str) -> None:
self.patch(job_id, user_id, status=status, updated=rfc3339.datetime(datetime.utcnow()))
_log.debug("batch job {j} -> {s}".format(j=job_id, s=status))

def get_status(self, job_id: str, user_id: str) -> str:
return self.get_job(job_id=job_id, user_id=user_id)["status"]

def set_dependency_status(self, job_id: str, user_id: str, dependency_status: str) -> None:
self.patch(job_id, user_id, dependency_status=dependency_status)
_log.debug("batch job {j} dependency -> {s}".format(j=job_id, s=dependency_status))
Expand Down Expand Up @@ -325,4 +336,5 @@ def _done(self, user_id: str = None, job_id: str = None) -> str:


# Legacy alias
# TODO: remove this legacy alias
JobRegistry = ZkJobRegistry
28 changes: 21 additions & 7 deletions openeogeotrellis/job_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@

_log = logging.getLogger(__name__)

# TODO: make this job tracker logic an internal implementation detail of JobRegistry?
# TODO: current implementation mixes YARN and Kubernetes logic. Instead use composition/inheritance for better separation of concerns?
# Especially because the job registry storage will also get different options: legacy ZooKeeper and ElasticJobRegistry (and maybe even a simple in-memory option)


class JobTracker:
Expand All @@ -44,6 +45,7 @@ def __init__(self, job_registry: Callable[[], ZkJobRegistry], principal: str, ke
self._batch_jobs = GpsBatchJobs(catalog=None, jvm=None, principal=principal, key_tab=keytab, vault=None)

def loop_update_statuses(self, interval_s: int = 60):
# TODO: this method seems to be unused
with self._job_registry() as registry:
registry.ensure_paths()

Expand Down Expand Up @@ -74,9 +76,13 @@ def update_statuses(self) -> None:
jobs_to_track = registry.get_running_jobs()

for job_info in jobs_to_track:
job_id = None
user_id = None
try:
job_id, user_id = job_info['job_id'], job_info['user_id']
application_id, current_status = job_info['application_id'], job_info['status']
job_id = job_info["job_id"]
user_id = job_info["user_id"]
application_id = job_info["application_id"]
current_status = job_info["status"]

if application_id:
try:
Expand All @@ -91,6 +97,7 @@ def update_statuses(self) -> None:
started=start_time,
finished=finish_time)
with ElasticJobRegistry.just_log_errors(f"job_tracker update status {new_status}"):
# TODO: also set started/finished
self._batch_jobs._elastic_job_registry.set_status(job_id, new_status)

if current_status != new_status:
Expand Down Expand Up @@ -125,6 +132,7 @@ def update_statuses(self) -> None:
memory_time_megabyte_seconds=memory_time_megabyte_seconds,
cpu_time_seconds=cpu_time_seconds)
with ElasticJobRegistry.just_log_errors(f"job_tracker update status from YARN"):
# TODO: also set started/finished, ...
self._batch_jobs._elastic_job_registry.set_status(job_id, new_status)

if current_status != new_status:
Expand Down Expand Up @@ -164,10 +172,14 @@ def update_statuses(self) -> None:
except JobTracker._UnknownApplicationIdException:
registry.mark_done(job_id, user_id)
except Exception:
_log.warning("resuming with remaining jobs after failing to handle batch job {j}", exc_info=True,
extra={'job_id': job_id})
registry.set_status(job_id, user_id, JOB_STATUS.ERROR)
registry.mark_done(job_id, user_id)
_log.warning(
f"resuming with remaining jobs after failing to handle batch job {job_id}",
exc_info=True,
extra={"job_id": job_id},
)
if job_id and user_id:
registry.set_status(job_id, user_id, JOB_STATUS.ERROR)
registry.mark_done(job_id, user_id)

def get_kube_usage(self,job_id,user_id):
usage = None
Expand Down Expand Up @@ -204,6 +216,7 @@ def get_kube_usage(self,job_id,user_id):
@staticmethod
def yarn_available() -> bool:
"""Check if YARN tools are available."""
# TODO: this methods seems to be unused?
try:
_log.info("Checking if Hadoop 'yarn' tool is available")
output = subprocess.check_output(["yarn", "version"]).decode("ascii")
Expand Down Expand Up @@ -333,6 +346,7 @@ def _refresh_kerberos_tgt(self):
if __name__ == '__main__':
import argparse

# TODO: (re)use central logging setup helpers from `openeo_driver.util.logging
logging.basicConfig(level=logging.INFO)
openeogeotrellis.backend.logger.setLevel(logging.DEBUG)
kazoo.client.log.setLevel(logging.WARNING)
Expand Down
2 changes: 2 additions & 0 deletions openeogeotrellis/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ def dump(self, root=None) -> Iterator[Tuple[str, bytes]]:
class KazooClientMock:
"""Simple mock for KazooClient that stores data in memory"""

# TODO: unify (for better reuse/sharing) with DummyKazooClient from https://github.com/Open-EO/openeo-aggregator/blob/720fc26311c9a377cd45dfb2dc6b81616adb5850/src/openeo_aggregator/testing.py#L17

def __init__(self, root: _ZNode = None):
"""Create client and optionally initialize state of root node (and its children)."""
self.root = root or _ZNode()
Expand Down
205 changes: 205 additions & 0 deletions tests/test_job_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
from unittest import mock

import contextlib
import pytest
import subprocess
from dataclasses import dataclass
from typing import Tuple, List, Optional, Dict

from openeo_driver.utils import generate_unique_id
from openeogeotrellis.job_registry import ZkJobRegistry
from openeogeotrellis.job_tracker import JobTracker
from openeogeotrellis.testing import KazooClientMock

# TODO: move YARN related mocks to openeogeotrellis.testing


class YARN_STATE:
# From https://hadoop.apache.org/docs/r3.1.1/api/org/apache/hadoop/yarn/api/records/YarnApplicationState.html
# TODO: move this to openeogeotrellis.job_tracker
ACCEPTED = "ACCEPTED"
FAILED = "FAILED"
FINISHED = "FINISHED"
KILLED = "KILLED"
NEW = "NEW"
NEW_SAVING = "NEW_SAVING"
RUNNING = "RUNNING"
SUBMITTED = "SUBMITTED"


class YARN_FINAL_STATUS:
# From https://hadoop.apache.org/docs/r3.1.1/api/org/apache/hadoop/yarn/api/records/FinalApplicationStatus.html
# TODO: move this to openeogeotrellis.job_tracker
ENDED = "ENDED"
FAILED = "FAILED"
KILLED = "KILLED"
SUCCEEDED = "SUCCEEDED"
UNDEFINED = "UNDEFINED"


@dataclass
class YarnAppInfo:
"""Dummy YARN app metadata."""

app_id: str
user_id: str = "johndoe"
queue: str = "default"
start_time: int = 0
finish_time: int = 0
progress: str = "0%"
state: str = YARN_STATE.SUBMITTED
final_state: str = YARN_FINAL_STATUS.UNDEFINED
aggregate_resource_allocation: Tuple[int, int] = (1234, 32)

def set_state(self, state: str, final_state: str) -> "YarnAppInfo":
self.state = state
self.final_state = final_state
return self

def set_submitted(self):
return self.set_state(YARN_STATE.SUBMITTED, YARN_FINAL_STATUS.UNDEFINED)

def set_accepted(self):
return self.set_state(YARN_STATE.ACCEPTED, YARN_FINAL_STATUS.UNDEFINED)

def set_running(self):
return self.set_state(YARN_STATE.RUNNING, YARN_FINAL_STATUS.UNDEFINED)

def set_finished(self, final_state: str = YARN_FINAL_STATUS.SUCCEEDED):
assert final_state in {YARN_FINAL_STATUS.SUCCEEDED, YARN_FINAL_STATUS.FAILED}
# TODO: what is the meaning actually of state=FINISHED + final-state=FAILED?
return self.set_state(YARN_STATE.FINISHED, final_state)

def set_failed(self):
return self.set_state(YARN_STATE.FAILED, YARN_FINAL_STATUS.FAILED)

def set_killed(self):
return self.set_state(YARN_STATE.KILLED, YARN_FINAL_STATUS.KILLED)

def status_report(self) -> str:
fields = [
f"\t{k} : {v}"
for k, v in [
("Application-Id", self.app_id),
("User", self.user_id),
("Queue", self.queue),
("Start-Time", self.start_time),
("Finish-Time", self.finish_time),
("Progress", self.progress),
("State", self.state),
("Final-State", self.final_state),
(
"Aggregate Resource Allocation",
"{} MB-seconds, {} vcore-seconds".format(
*self.aggregate_resource_allocation
),
),
]
]
return "\n".join(["Application Report : "] + fields + [""])


class YarnMock:
"""YARN cluster mock"""

def __init__(self):
self.apps: Dict[str, YarnAppInfo] = {}

def submit(self, app_id: Optional[str] = None, **kwargs) -> YarnAppInfo:
"""Create a new (dummy) YARN app"""
if app_id is None:
app_id = generate_unique_id(prefix="app")
self.apps[app_id] = app = YarnAppInfo(app_id=app_id, **kwargs)
return app

def _check_output(self, args: List[str]):
"""Mock for subprocess.check_output(["yarn", ...])"""
if len(args) == 4 and args[:3] == ["yarn", "application", "-status"]:
app_id = args[3]
if app_id in self.apps:
return self.apps[app_id].status_report().encode("utf-8")
else:
raise subprocess.CalledProcessError(
returncode=255,
cmd=args,
output=f"Application with id '{app_id}' doesn't exist in RM or Timeline Server.",
)

raise RuntimeError(f"Unsupported check_output({args!r})")

@contextlib.contextmanager
def patch(self):
with mock.patch(
"subprocess.check_output", new=self._check_output
) as check_output:
yield check_output


@pytest.fixture
def zk_client() -> KazooClientMock:
return KazooClientMock()


@pytest.fixture
def zk_job_registry(zk_client) -> ZkJobRegistry:
return ZkJobRegistry(zk_client=zk_client)


@pytest.fixture
def yarn() -> YarnMock:
yarn = YarnMock()
with yarn.patch():
yield yarn


class TestJobTracker:
@pytest.fixture
def job_tracker(self, zk_job_registry) -> JobTracker:
principal = "john@EXAMPLE.TEST"
keytab = "test/openeo.keytab"
job_tracker = JobTracker(
job_registry=lambda: zk_job_registry,
principal=principal,
keytab=keytab,
)
return job_tracker

def test_basic_yarn_zookeeper(self, zk_job_registry, yarn, job_tracker):
user_id = "john"
job_id = "job-123"
# Register new job in zookeeper and yarn
yarn_app = yarn.submit(app_id="app-123")
zk_job_registry.register(
job_id=job_id, user_id=user_id, api_version="1.2.3", specification={}
)
zk_job_registry.set_application_id(
job_id=job_id, user_id=user_id, application_id=yarn_app.app_id
)

def zk_status() -> str:
return zk_job_registry.get_status(job_id=job_id, user_id=user_id)

# Check initial status in registry
assert zk_status() == "created"

# Set SUBMITTED in Yarn
yarn_app.set_submitted()
job_tracker.update_statuses()
assert zk_status() == "created"

# Set ACCEPTED in Yarn
yarn_app.set_accepted()
job_tracker.update_statuses()
assert zk_status() == "queued"

# Set RUNNING in Yarn
yarn_app.set_running()
job_tracker.update_statuses()
assert zk_status() == "running"

# Set FINISHED in Yarn
yarn_app.set_finished()
job_tracker.update_statuses()
assert zk_status() == "finished"

# TODO: assert for warnings

0 comments on commit 8852a8d

Please sign in to comment.