Skip to content

Commit

Permalink
test JobTracker without ZkJobRegistry #632
Browse files Browse the repository at this point in the history
  • Loading branch information
bossie committed Jan 9, 2024
1 parent 210f85a commit 534053f
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 4 deletions.
8 changes: 4 additions & 4 deletions openeogeotrellis/job_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -932,12 +932,12 @@ def get_all_jobs_before(
return jobs

def get_running_jobs(self) -> Iterator[Dict]:
# TODO: rename to more aptly named get_active_jobs?
# TODO: incorporate user_limit?
if self.zk_job_registry:
yield from self.zk_job_registry.get_running_jobs(parse_specification=True)
if self.elastic_job_registry:
return iter(self.elastic_job_registry.list_active_jobs())

return iter(())
elif self.elastic_job_registry:
yield from self.elastic_job_registry.list_active_jobs()

def set_results_metadata(self, job_id, user_id, costs: Optional[float], usage: dict,
results_metadata: Dict[str, Any]):
Expand Down
105 changes: 105 additions & 0 deletions tests/test_job_tracker_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -1580,6 +1580,111 @@ def test_k8s_zookeeper_job_cost(

assert caplog.record_tuples == []

def test_k8s_no_zookeeper(self, k8s_mock, prometheus_mock, job_costs_calculator, batch_job_output_root,
elastic_job_registry, caplog, time_machine):
job_tracker = JobTracker(
app_state_getter=K8sStatusGetter(k8s_mock, prometheus_mock),
zk_job_registry=None,
principal="john@EXAMPLE.TEST",
keytab="test/openeo.keytab",
job_costs_calculator=job_costs_calculator,
output_root_dir=batch_job_output_root,
elastic_job_registry=elastic_job_registry,
)

# TODO: check if job entity in elastic_job_registry is ultimately patched with costs, usage and results_metadata

caplog.set_level(logging.WARNING)
time_machine.move_to("2022-12-14T12:00:00Z", tick=False)

user_id = "john"
job_id = "job-123"
elastic_job_registry.create_job(
job_id=job_id, user_id=user_id, process=DUMMY_PROCESS_1,
)

# Check initial status in registry
assert elastic_job_registry.db == {
"job-123": DictSubSet(
{
"job_id": "job-123",
"user_id": "john",
"status": "created",
"process": DUMMY_PROCESS_1,
"created": "2022-12-14T12:00:00Z",
"updated": "2022-12-14T12:00:00Z",
}
)
}

# Submit Kubernetes app
time_machine.coordinates.shift(70)
app_id = k8s_job_name()
kube_app = k8s_mock.submit(app_id=app_id)
kube_app.set_submitted()
elastic_job_registry.set_application_id(job_id=job_id, application_id=app_id)

# Trigger `update_statuses`
job_tracker.update_statuses()
assert elastic_job_registry.db[job_id] == DictSubSet(
{
"status": "queued",
"created": "2022-12-14T12:00:00Z",
"updated": "2022-12-14T12:01:10Z",
}
)

# Set RUNNING IN Kubernetes
time_machine.coordinates.shift(70)
kube_app.set_running()
job_tracker.update_statuses()
assert elastic_job_registry.db[job_id] == DictSubSet(
{
"status": "running",
"created": "2022-12-14T12:00:00Z",
"updated": "2022-12-14T12:02:20Z",
"started": "2022-12-14T12:01:10Z",
}
)

# Set COMPLETED IN Kubernetes
time_machine.coordinates.shift(70)
kube_app.set_completed()
json_write(
path=job_tracker._batch_jobs.get_results_metadata_path(job_id=job_id),
data={
"foo": "bar",
"usage": {"input_pixel": {"unit": "mega-pixel", "value": 1.125},
"sentinelhub": {"unit": "sentinelhub_processing_unit", "value": 1.25},}
},
)
job_tracker.update_statuses()
assert elastic_job_registry.db[job_id] == DictSubSet(
{
"status": "finished",
"created": "2022-12-14T12:00:00Z",
"updated": "2022-12-14T12:03:30Z",
"started": "2022-12-14T12:01:10Z",
"finished": "2022-12-14T12:03:30Z",
"usage": {
"input_pixel": {"unit": "mega-pixel", "value": 1.125},
"cpu": {"unit": "cpu-seconds", "value": pytest.approx(2.34 * 3600, rel=0.001)},
"memory": {"unit": "mb-seconds", "value": pytest.approx(5.678 * 3600, rel=0.001)},
"network_received": {"unit": "b", "value": pytest.approx(370841160371254.75, rel=0.001)},
"sentinelhub": {"unit": "sentinelhub_processing_unit", "value": 1.25},
},
"costs": 129.95,
"results_metadata": {
"foo": "bar",
"usage": {"input_pixel": {"unit": "mega-pixel", "value": 1.125},
"sentinelhub": {"unit": "sentinelhub_processing_unit", "value": 1.25},}
},
}
)

assert caplog.record_tuples == []


class TestK8sStatusGetter:
def test_cpu_and_memory_usage_not_in_prometheus(self, caplog):
caplog.set_level(logging.WARNING)
Expand Down

0 comments on commit 534053f

Please sign in to comment.