Skip to content

Commit

Permalink
Merge branch 'master' into 632-phase-out-zkjobregistry
Browse files Browse the repository at this point in the history
  • Loading branch information
bossie committed Jan 18, 2024
2 parents f10fdc6 + e3c00eb commit 91b6348
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 38 deletions.
20 changes: 18 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,29 @@ without compromising stable operations.

## Unreleased

## 0.23.0

### Added

- Support disabling ZkJobRegistry ([#632](https://github.com/Open-EO/openeo-geopyspark-driver/issues/632))

## 0.22.3

### Bugfix

- Restore batch job result metadata; this reverts the Zookeeper fix introduced in 0.22.2

## 0.22.2

### Bugfix

- Prevent Zookeeper from blocking requests (https://github.com/Open-EO/openeo-geopyspark-driver/pull/639)

## 0.22.1

### Bugfix

- Prevent usage duplication in ETL API ([#41](https://github.com/eu-cdse/openeo-cdse-infra/issues/41))
- Prevent Zookeeper from blocking requests (https://github.com/Open-EO/openeo-geopyspark-driver/pull/639)
- Support disabling ZkJobRegistry ([#632](https://github.com/Open-EO/openeo-geopyspark-driver/issues/632))

## 0.22.0

Expand Down
2 changes: 1 addition & 1 deletion openeogeotrellis/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2033,7 +2033,7 @@ def _start_job(self, job_id: str, user: User, get_vault_token: Callable[[str], s
else:
# New style job info (EJR based)
job_process_graph = job_info["process"]["process_graph"]
job_options = job_info.get("job_options") or {} # can be None
job_options = job_info.get("job_options", {})
job_specification_json = json.dumps({"process_graph": job_process_graph, "job_options": job_options})

job_title = job_info.get('title', '')
Expand Down
55 changes: 20 additions & 35 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,8 +509,7 @@ def test_create_and_get_user_jobs(self, api):
}

@mock.patch("openeogeotrellis.logs.Elasticsearch.search")
def test_create_and_start_and_download(self, mock_search, api, tmp_path, monkeypatch, batch_job_output_root,
job_registry):
def test_create_and_start_and_download(self, mock_search, api, tmp_path, monkeypatch, batch_job_output_root):
with self._mock_kazoo_client() as zk, \
self._mock_utcnow() as un, \
mock.patch.dict("os.environ", {"OPENEO_SPARK_SUBMIT_PY_FILES": "data/deps/custom_processes.py,data/deps/foolib.whl"}):
Expand Down Expand Up @@ -581,14 +580,11 @@ def test_create_and_start_and_download(self, mock_search, api, tmp_path, monkeyp
assert res["logs"] == []

# Fake update from job tracker
dbl_job_registry = DoubleJobRegistry(
# TODO #236/#498/#632 phase out ZkJobRegistry
zk_job_registry_factory=(lambda: ZkJobRegistry(zk_client=zk)),
elastic_job_registry=job_registry,
)
with dbl_job_registry as jr:
jr.set_status(job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.RUNNING)

# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with openeogeotrellis.job_registry.ZkJobRegistry() as reg:
reg.set_status(
job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.RUNNING
)
meta_data = zk.get_json_decoded(
f"/openeo.test/jobs/ongoing/{TEST_USER}/{job_id}"
)
Expand All @@ -611,8 +607,9 @@ def test_create_and_start_and_download(self, mock_search, api, tmp_path, monkeyp
metadata = api.load_json(JOB_METADATA_FILENAME)
json.dump(metadata, f)

with dbl_job_registry as jr:
jr.set_status(
# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with openeogeotrellis.job_registry.ZkJobRegistry() as reg:
reg.set_status(
job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.FINISHED
)
res = (
Expand Down Expand Up @@ -668,7 +665,7 @@ def test_create_and_start_and_download(self, mock_search, api, tmp_path, monkeyp

assert res["logs"] == expected_log_entries

def test_providers_present(self, api, tmp_path, monkeypatch, batch_job_output_root, job_registry):
def test_providers_present(self, api, tmp_path, monkeypatch, batch_job_output_root):
with self._mock_kazoo_client() as zk, self._mock_utcnow() as un, mock.patch.dict(
"os.environ", {"OPENEO_SPARK_SUBMIT_PY_FILES": "data/deps/custom_processes.py,data/deps/foolib.whl"}
):
Expand Down Expand Up @@ -744,13 +741,9 @@ def test_providers_present(self, api, tmp_path, monkeypatch, batch_job_output_ro
json.dump(job_metadata_contents, f)

# Fake update from job tracker
dbl_job_registry = DoubleJobRegistry(
# TODO #236/#498/#632 phase out ZkJobRegistry
zk_job_registry_factory=(lambda: ZkJobRegistry(zk_client=zk)),
elastic_job_registry=job_registry,
)
with dbl_job_registry as jr:
jr.set_status(job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.FINISHED)
# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with openeogeotrellis.job_registry.ZkJobRegistry() as reg:
reg.set_status(job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.FINISHED)
res = api.get(f"/jobs/{job_id}", headers=TEST_USER_AUTH_HEADER).assert_status_code(200).json
assert res["status"] == "finished"

Expand Down Expand Up @@ -1073,13 +1066,8 @@ def test_cancel_job(self, api, job_registry):

# Fake running
# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
dbl_job_registry = DoubleJobRegistry(
# TODO #236/#498/#632 phase out ZkJobRegistry
zk_job_registry_factory=(lambda: ZkJobRegistry(zk_client=zk)),
elastic_job_registry=job_registry,
)
with dbl_job_registry as jr:
jr.set_status(job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.RUNNING)
with openeogeotrellis.job_registry.ZkJobRegistry() as reg:
reg.set_status(job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.RUNNING)
res = api.get('/jobs/{j}'.format(j=job_id), headers=TEST_USER_AUTH_HEADER).assert_status_code(200).json
assert res["status"] == "running"

Expand Down Expand Up @@ -1132,13 +1120,11 @@ def test_delete_job(self, api, job_registry):
run.assert_called_once()

# Fake running
dbl_job_registry = DoubleJobRegistry(
# TODO #236/#498/#632 phase out ZkJobRegistry
zk_job_registry_factory=(lambda: ZkJobRegistry(zk_client=zk)),
elastic_job_registry=job_registry,
)
with dbl_job_registry as jr:
jr.set_status(job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.RUNNING)
# TODO #236/#498/#632 eliminate direct dependency on deprecated ZkJobRegistry and related mocking (e.g. `self._mock_kazoo_client()` above)
with openeogeotrellis.job_registry.ZkJobRegistry() as reg:
reg.set_status(
job_id=job_id, user_id=TEST_USER, status=JOB_STATUS.RUNNING
)
res = (
api.get(f"/jobs/{job_id}", headers=TEST_USER_AUTH_HEADER)
.assert_status_code(200)
Expand Down Expand Up @@ -1543,7 +1529,6 @@ def test_api_job_results_contains_proj_metadata_at_item_level(self, api100, batc
}
]


class TestSentinelHubBatchJobs:
"""Tests for batch jobs involving SentinelHub collections and batch processes"""

Expand Down

0 comments on commit 91b6348

Please sign in to comment.