From 9a895054539b13b0066a6a32c6d9ec18a0912d5e Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Fri, 29 Sep 2023 16:51:49 +0200 Subject: [PATCH] Issue #523/#498/#236 Fall back on EJR when no job metadata in ZK --- openeogeotrellis/job_registry.py | 15 ++++++--- openeogeotrellis/testing.py | 10 ++++-- tests/test_job_registry.py | 53 ++++++++++++++++++++++++++++++++ 3 files changed, 70 insertions(+), 8 deletions(-) diff --git a/openeogeotrellis/job_registry.py b/openeogeotrellis/job_registry.py index 5bf15ed66..e4ab30785 100644 --- a/openeogeotrellis/job_registry.py +++ b/openeogeotrellis/job_registry.py @@ -1,3 +1,4 @@ +import contextlib import datetime as dt import json import logging @@ -728,10 +729,12 @@ def get_job(self, job_id: str, user_id: str) -> dict: # TODO: eliminate get_job/get_job_metadata duplication? zk_job = ejr_job = None if self.zk_job_registry: - zk_job = self.zk_job_registry.get_job(job_id=job_id, user_id=user_id) + with contextlib.suppress(JobNotFoundException): + zk_job = self.zk_job_registry.get_job(job_id=job_id, user_id=user_id) if self.elastic_job_registry: with self._just_log_errors("get_job", job_id=job_id): - ejr_job = self.elastic_job_registry.get_job(job_id=job_id) + with contextlib.suppress(JobNotFoundException): + ejr_job = self.elastic_job_registry.get_job(job_id=job_id) self._check_zk_ejr_job_info(job_id=job_id, zk_job_info=zk_job, ejr_job_info=ejr_job) return zk_job or ejr_job @@ -741,11 +744,13 @@ def get_job_metadata(self, job_id: str, user_id: str) -> BatchJobMetadata: zk_job_info = ejr_job_info = None if self.zk_job_registry: with TimingLogger(f"self.zk_job_registry.get_job({job_id=}, {user_id=})", logger=_log.debug): - zk_job_info = self.zk_job_registry.get_job(job_id=job_id, user_id=user_id) + with contextlib.suppress(JobNotFoundException): + zk_job_info = self.zk_job_registry.get_job(job_id=job_id, user_id=user_id) if self.elastic_job_registry: with self._just_log_errors("get_job_metadata", job_id=job_id): with TimingLogger(f"self.elastic_job_registry.get_job({job_id=})", logger=_log.debug): - ejr_job_info = self.elastic_job_registry.get_job(job_id=job_id) + with contextlib.suppress(JobNotFoundException): + ejr_job_info = self.elastic_job_registry.get_job(job_id=job_id) self._check_zk_ejr_job_info(job_id=job_id, zk_job_info=zk_job_info, ejr_job_info=ejr_job_info) job_metadata = zk_job_info_to_metadata(zk_job_info) if zk_job_info else ejr_job_info_to_metadata(ejr_job_info) @@ -760,7 +765,7 @@ def _check_zk_ejr_job_info(self, job_id: str, zk_job_info: Union[dict, None], ej if zk_job_info != ejr_job_info: self._log.warning(f"DoubleJobRegistry mismatch {zk_job_info=} {ejr_job_info=}") elif zk_job_info is None and ejr_job_info is None: - raise DoubleJobRegistryException(f"None of ZK/EJR have {job_id=}") + raise JobNotFoundException(job_id=job_id) def set_status(self, job_id: str, user_id: str, status: str) -> None: if self.zk_job_registry: diff --git a/openeogeotrellis/testing.py b/openeogeotrellis/testing.py index 9a1ad4d57..d2c000148 100644 --- a/openeogeotrellis/testing.py +++ b/openeogeotrellis/testing.py @@ -120,9 +120,13 @@ def set(self, path: Union[str, Path], value: bytes, version: int = -1): def delete(self, path: Union[str, Path], version: int = -1): path = Path(path) - self._get(path).assert_version(version) - parent = self._get(path.parent) - del parent.children[path.name] + znode = self._get(path).assert_version(version) + if znode is self.root: + # Special case: wipe everything, start over. + self.root = _ZNode() + else: + parent = self._get(path.parent) + del parent.children[path.name] def dump(self) -> Dict[str, bytes]: """Dump ZooKeeper data for inspection""" diff --git a/tests/test_job_registry.py b/tests/test_job_registry.py index 2f0f2a88d..d8a72e17a 100644 --- a/tests/test_job_registry.py +++ b/tests/test_job_registry.py @@ -301,6 +301,14 @@ def test_get_job(self, double_jr, caplog): assert caplog.messages == [] + def test_get_job_not_found(self, double_jr, caplog): + with double_jr: + with pytest.raises(JobNotFoundException): + _ = double_jr.get_job("j-nope", user_id="john") + with pytest.raises(JobNotFoundException): + _ = double_jr.get_job_metadata("j-nope", user_id="john") + assert caplog.messages == [] + def test_get_job_mismatch(self, double_jr, memory_jr, caplog): with double_jr: double_jr.create_job( @@ -381,6 +389,51 @@ def test_get_job_consistency( assert caplog.messages == [] + def test_get_job_deleted_from_zk(self, double_jr, caplog, zk_client, memory_jr): + """ + Make sure to fall back on EJR if no data found in ZK + https://github.com/Open-EO/openeo-geopyspark-driver/issues/523 + """ + with double_jr: + double_jr.create_job(job_id="j-123", user_id="john", process=self.DUMMY_PROCESS) + # Wipe Zookeeper db + zk_client.delete("/") + + job = double_jr.get_job("j-123", user_id="john") + job_metadata = double_jr.get_job_metadata("j-123", user_id="john") + + expected_job = { + "job_id": "j-123", + "user_id": "john", + "created": "2023-02-15T17:17:17Z", + "status": "created", + "updated": "2023-02-15T17:17:17Z", + "api_version": None, + "application_id": None, + "title": "John's job", + "description": None, + } + assert job == DictSubSet( + {"job_id": "j-123", "user_id": "john", "created": "2023-02-15T17:17:17Z", "status": "created"} + ) + assert job_metadata == BatchJobMetadata( + id="j-123", + status="created", + created=datetime.datetime(2023, 2, 15, 17, 17, 17), + process=dict( + process_graph={"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}}, + title="dummy", + ), + job_options=None, + title=None, + description=None, + updated=datetime.datetime(2023, 2, 15, 17, 17, 17), + started=None, + finished=None, + ) + + assert caplog.messages == [] + def test_set_status(self, double_jr, zk_client, memory_jr, time_machine): with double_jr: double_jr.create_job(job_id="j-123", user_id="john", process=self.DUMMY_PROCESS)