From 534053fdf0a51d88b3dfc814d3810a234ef376fa Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Tue, 9 Jan 2024 15:29:21 +0100 Subject: [PATCH] test JobTracker without ZkJobRegistry #632 --- openeogeotrellis/job_registry.py | 8 +-- tests/test_job_tracker_v2.py | 105 +++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+), 4 deletions(-) diff --git a/openeogeotrellis/job_registry.py b/openeogeotrellis/job_registry.py index 7d0fbac2f..77d07df9d 100644 --- a/openeogeotrellis/job_registry.py +++ b/openeogeotrellis/job_registry.py @@ -932,12 +932,12 @@ def get_all_jobs_before( return jobs def get_running_jobs(self) -> Iterator[Dict]: + # TODO: rename to more aptly named get_active_jobs? + # TODO: incorporate user_limit? if self.zk_job_registry: yield from self.zk_job_registry.get_running_jobs(parse_specification=True) - if self.elastic_job_registry: - return iter(self.elastic_job_registry.list_active_jobs()) - - return iter(()) + elif self.elastic_job_registry: + yield from self.elastic_job_registry.list_active_jobs() def set_results_metadata(self, job_id, user_id, costs: Optional[float], usage: dict, results_metadata: Dict[str, Any]): diff --git a/tests/test_job_tracker_v2.py b/tests/test_job_tracker_v2.py index f0bc44ba5..35e7f832e 100644 --- a/tests/test_job_tracker_v2.py +++ b/tests/test_job_tracker_v2.py @@ -1580,6 +1580,111 @@ def test_k8s_zookeeper_job_cost( assert caplog.record_tuples == [] + def test_k8s_no_zookeeper(self, k8s_mock, prometheus_mock, job_costs_calculator, batch_job_output_root, + elastic_job_registry, caplog, time_machine): + job_tracker = JobTracker( + app_state_getter=K8sStatusGetter(k8s_mock, prometheus_mock), + zk_job_registry=None, + principal="john@EXAMPLE.TEST", + keytab="test/openeo.keytab", + job_costs_calculator=job_costs_calculator, + output_root_dir=batch_job_output_root, + elastic_job_registry=elastic_job_registry, + ) + + # TODO: check if job entity in elastic_job_registry is ultimately patched with costs, usage and results_metadata + + caplog.set_level(logging.WARNING) + time_machine.move_to("2022-12-14T12:00:00Z", tick=False) + + user_id = "john" + job_id = "job-123" + elastic_job_registry.create_job( + job_id=job_id, user_id=user_id, process=DUMMY_PROCESS_1, + ) + + # Check initial status in registry + assert elastic_job_registry.db == { + "job-123": DictSubSet( + { + "job_id": "job-123", + "user_id": "john", + "status": "created", + "process": DUMMY_PROCESS_1, + "created": "2022-12-14T12:00:00Z", + "updated": "2022-12-14T12:00:00Z", + } + ) + } + + # Submit Kubernetes app + time_machine.coordinates.shift(70) + app_id = k8s_job_name() + kube_app = k8s_mock.submit(app_id=app_id) + kube_app.set_submitted() + elastic_job_registry.set_application_id(job_id=job_id, application_id=app_id) + + # Trigger `update_statuses` + job_tracker.update_statuses() + assert elastic_job_registry.db[job_id] == DictSubSet( + { + "status": "queued", + "created": "2022-12-14T12:00:00Z", + "updated": "2022-12-14T12:01:10Z", + } + ) + + # Set RUNNING IN Kubernetes + time_machine.coordinates.shift(70) + kube_app.set_running() + job_tracker.update_statuses() + assert elastic_job_registry.db[job_id] == DictSubSet( + { + "status": "running", + "created": "2022-12-14T12:00:00Z", + "updated": "2022-12-14T12:02:20Z", + "started": "2022-12-14T12:01:10Z", + } + ) + + # Set COMPLETED IN Kubernetes + time_machine.coordinates.shift(70) + kube_app.set_completed() + json_write( + path=job_tracker._batch_jobs.get_results_metadata_path(job_id=job_id), + data={ + "foo": "bar", + "usage": {"input_pixel": {"unit": "mega-pixel", "value": 1.125}, + "sentinelhub": {"unit": "sentinelhub_processing_unit", "value": 1.25},} + }, + ) + job_tracker.update_statuses() + assert elastic_job_registry.db[job_id] == DictSubSet( + { + "status": "finished", + "created": "2022-12-14T12:00:00Z", + "updated": "2022-12-14T12:03:30Z", + "started": "2022-12-14T12:01:10Z", + "finished": "2022-12-14T12:03:30Z", + "usage": { + "input_pixel": {"unit": "mega-pixel", "value": 1.125}, + "cpu": {"unit": "cpu-seconds", "value": pytest.approx(2.34 * 3600, rel=0.001)}, + "memory": {"unit": "mb-seconds", "value": pytest.approx(5.678 * 3600, rel=0.001)}, + "network_received": {"unit": "b", "value": pytest.approx(370841160371254.75, rel=0.001)}, + "sentinelhub": {"unit": "sentinelhub_processing_unit", "value": 1.25}, + }, + "costs": 129.95, + "results_metadata": { + "foo": "bar", + "usage": {"input_pixel": {"unit": "mega-pixel", "value": 1.125}, + "sentinelhub": {"unit": "sentinelhub_processing_unit", "value": 1.25},} + }, + } + ) + + assert caplog.record_tuples == [] + + class TestK8sStatusGetter: def test_cpu_and_memory_usage_not_in_prometheus(self, caplog): caplog.set_level(logging.WARNING)