Skip to content

Commit

Permalink
Align job registry implementations: omit process/job_options from get…
Browse files Browse the repository at this point in the history
…_user_jobs listings

Process graph (and job options) are not to be included in user job listings,
so align `ZkJobRegistry.get_user_jobs` to not produce then like `ElasticJobRegistry`

refs: eu-cdse/openeo-cdse-infra#141, #498
  • Loading branch information
soxofaan committed May 17, 2024
1 parent b71ea64 commit a632e1a
Showing 1 changed file with 34 additions and 5 deletions.
39 changes: 34 additions & 5 deletions openeogeotrellis/job_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,22 @@ def get_running_jobs(self, *, user_limit: Optional[int] = 1000, parse_specificat
else:
stats["user_id without jobs"] += 1

def get_job(self, job_id: str, user_id: str, parse_specification: bool = False) -> dict:
def get_job(
self,
job_id: str,
user_id: str,
*,
parse_specification: bool = False,
omit_raw_specification: bool = False,
) -> dict:
"""Returns details of a job."""
job_info, _ = self._read(job_id, user_id, include_done=True, parse_specification=parse_specification)
job_info, _ = self._read(
job_id=job_id,
user_id=user_id,
include_done=True,
parse_specification=parse_specification,
omit_raw_specification=omit_raw_specification,
)
return job_info

def get_user_jobs(self, user_id: str) -> List[Dict]:
Expand All @@ -227,13 +240,19 @@ def get_user_jobs(self, user_id: str) -> List[Dict]:

try:
done_job_ids = self._zk.get_children(self._done(user_id))
jobs.extend([self.get_job(job_id, user_id) for job_id in done_job_ids])
jobs.extend(
self.get_job(job_id=job_id, user_id=user_id, parse_specification=False, omit_raw_specification=True)
for job_id in done_job_ids
)
except NoNodeError:
pass

try:
ongoing_job_ids = self._zk.get_children(self._ongoing(user_id))
jobs.extend([self.get_job(job_id, user_id) for job_id in ongoing_job_ids])
jobs.extend(
self.get_job(job_id=job_id, user_id=user_id, parse_specification=False, omit_raw_specification=True)
for job_id in ongoing_job_ids
)
except NoNodeError:
pass

Expand Down Expand Up @@ -336,11 +355,18 @@ def _create(self, job_info: Dict, done: bool = False) -> None:
self._zk.create(path, data, makepath=True)

def _read(
self, job_id: str, user_id: str, include_done: bool = False, parse_specification: bool = False
self,
job_id: str,
user_id: str,
*,
include_done: bool = False,
parse_specification: bool = False,
omit_raw_specification: bool = False,
) -> Tuple[Dict, int]:
"""
:param parse_specification: parse the (JSON encoded) "specification" field
and inject the process_graph (as `"process": {"process_graph": ...}`) and job_options as additional fields
:param omit_raw_specification: remove the original (raw) "specification" field from the result
"""
assert job_id, "Shouldn't be empty: job_id"
assert user_id, "Shouldn't be empty: user_id"
Expand Down Expand Up @@ -370,6 +396,9 @@ def _read(
if "job_options" not in job_info:
job_info["job_options"] = job_options

if omit_raw_specification:
del job_info["specification"]

return job_info, stat.version

def _update(self, job_info: Dict, version: int) -> None:
Expand Down

0 comments on commit a632e1a

Please sign in to comment.