Skip to content

Commit

Permalink
Add use_zk_job_registry config to optionally disable `ZkJobRegistry…
Browse files Browse the repository at this point in the history
…` usage #236/#498/#632
  • Loading branch information
soxofaan committed Jan 4, 2024
1 parent 4478f96 commit 5475236
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 5 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ without compromising stable operations.

## Unreleased

## 0.22.0

### Added

- Added config `use_zk_job_registry` to disable `ZkJobRegistry` usage

### Bugfix

- apply_neighborhood: fix error if overlap is null/None ([#519](https://github.com/Open-EO/openeo-python-client/issues/519))
Expand Down
2 changes: 1 addition & 1 deletion openeogeotrellis/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.21.5a1"
__version__ = "0.22.0a1"
3 changes: 3 additions & 0 deletions openeogeotrellis/async_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ def get_batch_jobs(batch_job_id: str, user_id: str) -> GpsBatchJobs:
while True:
time.sleep(DEPENDENCIES_POLL_INTERVAL_S)

# TODO #236/#498/#632 phase out ZkJobRegistry (or at least abstract it away)
with ZkJobRegistry() as registry:
job_info = registry.get_job(batch_job_id, user_id)

Expand All @@ -257,6 +258,7 @@ def get_batch_jobs(batch_job_id: str, user_id: str) -> GpsBatchJobs:
# TODO: retry in Nifi? How to mark this job as 'error' then?
log.exception("failed to handle polling job dependencies")

# TODO #236/#498/#632 phase out ZkJobRegistry (or at least abstract it away)
with ZkJobRegistry() as registry:
registry.set_status(batch_job_id, user_id, JOB_STATUS.ERROR)

Expand All @@ -267,6 +269,7 @@ def get_batch_jobs(batch_job_id: str, user_id: str) -> GpsBatchJobs:
f" {DEPENDENCIES_MAX_POLL_DELAY_S} s, aborting"
log.error(max_poll_delay_reached_error)

# TODO #236/#498/#632 phase out ZkJobRegistry (or at least abstract it away)
with ZkJobRegistry() as registry:
registry.set_status(batch_job_id, user_id, JOB_STATUS.ERROR)

Expand Down
2 changes: 1 addition & 1 deletion openeogeotrellis/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -1637,7 +1637,7 @@ def __init__(
)

self._double_job_registry = DoubleJobRegistry(
zk_job_registry_factory=ZkJobRegistry, # TODO #236/#498 allow to disable this with config?
zk_job_registry_factory=ZkJobRegistry if get_backend_config().use_zk_job_registry else None,
elastic_job_registry=elastic_job_registry,
)

Expand Down
3 changes: 3 additions & 0 deletions openeogeotrellis/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ class GpsBackendConfig(OpenEoBackendConfig):
default="/openeo", validator=attrs.validators.matches_re("^/.+"), converter=lambda s: s.rstrip("/")
)

# TODO #236/#498/#632 long term goal is to fully disable ZK job registry, but for now it's configurable.
use_zk_job_registry: bool = True

ejr_api: Optional[str] = os.environ.get("OPENEO_EJR_API")
ejr_backend_id: str = "unknown"
ejr_credentials_vault_path: Optional[str] = os.environ.get("OPENEO_EJR_CREDENTIALS_VAULT_PATH")
Expand Down
5 changes: 3 additions & 2 deletions openeogeotrellis/deploy/kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from openeo_driver.util.logging import get_logging_config, setup_logging, LOG_HANDLER_STDERR_JSON
from openeo_driver.views import build_app
from openeogeotrellis import deploy
from openeogeotrellis.configparams import ConfigParams
from openeogeotrellis.config import get_backend_config
from openeogeotrellis.deploy import get_socket
from openeogeotrellis.job_registry import ZkJobRegistry

Expand Down Expand Up @@ -37,7 +37,8 @@ def main():
SparkContext.getOrCreate()

def setup_batch_jobs():
if not ConfigParams().is_ci_context:
if get_backend_config().use_zk_job_registry:
# TODO #236/#498/#632 Phase out ZkJobRegistry?
with ZkJobRegistry() as job_registry:
job_registry.ensure_paths()

Expand Down
6 changes: 5 additions & 1 deletion openeogeotrellis/job_tracker_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ def __init__(
elastic_job_registry: Optional[ElasticJobRegistry] = None
):
self._app_state_getter = app_state_getter
# TODO #236/#498/#632 make ZkJobRegistry optional
self._zk_job_registry = zk_job_registry
self._job_costs_calculator = job_costs_calculator
# TODO: inject GpsBatchJobs (instead of constructing it here and requiring all its constructor args to be present)
Expand All @@ -388,11 +389,12 @@ def __init__(

def update_statuses(self, fail_fast: bool = False) -> None:
"""Iterate through all known (ongoing) jobs and update their status"""
# TODO #236/#498/#632 make ZkJobRegistry optional
with self._zk_job_registry as zk_job_registry, StatsReporter(
name="JobTracker.update_statuses stats", report=_log.info
) as stats, TimingLogger("JobTracker.update_statuses", logger=_log.info):

# TODO: #236/#498 also/instead get jobs_to_track from EJR?
# TODO: #236/#498/#632 also/instead get jobs_to_track from EJR?
jobs_to_track = zk_job_registry.get_running_jobs(parse_specification=True)

for job_info in jobs_to_track:
Expand Down Expand Up @@ -453,6 +455,7 @@ def _sync_job_status(
user_id: str,
application_id: str,
job_info: dict,
# TODO #236/#498/#632 make ZkJobRegistry optional
zk_job_registry: ZkJobRegistry,
stats: collections.Counter,
):
Expand Down Expand Up @@ -611,6 +614,7 @@ def main(self, *, args: Optional[List[str]] = None):
# ZooKeeper Job Registry
zk_root_path = args.zk_job_registry_root_path
_log.info(f"Using {zk_root_path=}")
# TODO #236/#498/#632 make ZkJobRegistry optional
zk_job_registry = ZkJobRegistry(root_path=zk_root_path)

requests_session = requests_with_retry(total=3, backoff_factor=2)
Expand Down
14 changes: 14 additions & 0 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ class TestBatchJobs:
@staticmethod
@contextlib.contextmanager
def _mock_kazoo_client() -> KazooClientMock:
# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking of `KazooClient`
zk_client = KazooClientMock()
with mock.patch.object(openeogeotrellis.job_registry, 'KazooClient', return_value=zk_client):
yield zk_client
Expand Down Expand Up @@ -579,6 +580,7 @@ def test_create_and_start_and_download(self, mock_search, api, tmp_path, monkeyp
assert res["logs"] == []

# Fake update from job tracker
# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with openeogeotrellis.job_registry.ZkJobRegistry() as reg:
reg.set_status(
job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.RUNNING
Expand All @@ -605,6 +607,7 @@ def test_create_and_start_and_download(self, mock_search, api, tmp_path, monkeyp
metadata = api.load_json(JOB_METADATA_FILENAME)
json.dump(metadata, f)

# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with openeogeotrellis.job_registry.ZkJobRegistry() as reg:
reg.set_status(
job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.FINISHED
Expand Down Expand Up @@ -738,6 +741,7 @@ def test_providers_present(self, api, tmp_path, monkeypatch, batch_job_output_ro
json.dump(job_metadata_contents, f)

# Fake update from job tracker
# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with openeogeotrellis.job_registry.ZkJobRegistry() as reg:
reg.set_status(job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.FINISHED)
res = api.get(f"/jobs/{job_id}", headers=TEST_USER_AUTH_HEADER).assert_status_code(200).json
Expand Down Expand Up @@ -812,6 +816,7 @@ def test_download_from_object_storage(
data = api.get_process_graph_dict(self.DUMMY_PROCESS_GRAPH, title="Dummy")
job_options = {}

# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with ZkJobRegistry() as registry:
registry.register(
job_id=job_id,
Expand Down Expand Up @@ -911,6 +916,7 @@ def test_download_without_object_storage(
data = api.get_process_graph_dict(self.DUMMY_PROCESS_GRAPH, title="Dummy")
job_options = {}

# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with ZkJobRegistry() as registry:
registry.register(
job_id=job_id,
Expand Down Expand Up @@ -1059,6 +1065,7 @@ def test_cancel_job(self, api, job_registry):
run.assert_called_once()

# Fake running
# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with openeogeotrellis.job_registry.ZkJobRegistry() as reg:
reg.set_status(job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.RUNNING)
res = api.get('/jobs/{j}'.format(j=job_id), headers=TEST_USER_AUTH_HEADER).assert_status_code(200).json
Expand Down Expand Up @@ -1113,6 +1120,7 @@ def test_delete_job(self, api, job_registry):
run.assert_called_once()

# Fake running
# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with openeogeotrellis.job_registry.ZkJobRegistry() as reg:
reg.set_status(
job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.RUNNING
Expand Down Expand Up @@ -1181,6 +1189,7 @@ def test_get_job_logs_skips_lines_with_empty_loglevel(self, mock_search, api):
data = api.get_process_graph_dict(self.DUMMY_PROCESS_GRAPH, title="Dummy")
job_options = {}

# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with ZkJobRegistry() as registry:
registry.register(
job_id=job_id,
Expand Down Expand Up @@ -1241,6 +1250,7 @@ def test_get_job_logs_with_connection_timeout(self, mock_search, api, caplog):
data = api.get_process_graph_dict(self.DUMMY_PROCESS_GRAPH, title="Dummy")
job_options = {}

# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with ZkJobRegistry() as registry:
registry.register(
job_id=job_id,
Expand Down Expand Up @@ -1361,6 +1371,7 @@ def test_api_job_results_contains_proj_metadata_at_asset_level(self, api, batch_
data = api.get_process_graph_dict(self.DUMMY_PROCESS_GRAPH, title="Dummy")
job_options = {}

# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with ZkJobRegistry() as registry:
registry.register(
job_id=job_id,
Expand Down Expand Up @@ -1469,6 +1480,7 @@ def test_api_job_results_contains_proj_metadata_at_item_level(self, api100, batc
data = api100.get_process_graph_dict(self.DUMMY_PROCESS_GRAPH, title="Dummy")
job_options = {}

# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with ZkJobRegistry() as registry:
registry.register(
job_id=job_id,
Expand Down Expand Up @@ -2141,6 +2153,7 @@ def value_estimate(self):

# Fake update from job tracker
dbl_job_registry = DoubleJobRegistry(
# TODO #236/#498/#632 phase out ZkJobRegistry
zk_job_registry_factory=(lambda: ZkJobRegistry(zk_client=zk_client)),
elastic_job_registry=job_registry,
)
Expand Down Expand Up @@ -2345,6 +2358,7 @@ def test_download_pending_job_results(

# Fake update from job tracker
dbl_job_registry = DoubleJobRegistry(
# TODO #236/#498/#632 phase out ZkJobRegistry
zk_job_registry_factory=(lambda: ZkJobRegistry(zk_client=zk_client)),
elastic_job_registry=job_registry,
)
Expand Down

0 comments on commit 5475236

Please sign in to comment.