Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JobTracker requires application IDs #632 #638

Merged
merged 1 commit into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions openeogeotrellis/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2413,28 +2413,28 @@ def as_arg_element(dependency: dict) -> dict:
f.write(job_specification_json)
# Generate our own random application id.
application_id = f"{random.randint(1000000000000, 9999999999999)}_{random.randint(1000000, 9999999)}"
output_string = f"Application report for application_{application_id} (state: running)"
script_output = f"Application report for application_{application_id} (state: running)"
else:
try:
log.info(f"Submitting job with command {args!r}")
output_string = subprocess.check_output(args, stderr=subprocess.STDOUT, universal_newlines=True)
log.info(f"Submitted job, output was: {output_string}")
script_output = subprocess.check_output(args, stderr=subprocess.STDOUT, universal_newlines=True)
log.info(f"Submitted job, output was: {script_output}")
except CalledProcessError as e:
log.error(f"Submitting job failed, output was: {e.stdout}", exc_info=True)
raise InternalException(message=f"Failed to start batch job (YARN submit failure).")

try:
application_id = self._extract_application_id(output_string)
application_id = self._extract_application_id(script_output)
log.info("mapped job_id %s to application ID %s" % (job_id, application_id))

with self._double_job_registry as dbl_registry:
dbl_registry.set_application_id(job_id, user_id, application_id)
dbl_registry.set_status(job_id, user_id, JOB_STATUS.QUEUED)

except _BatchJobError as e:
except _BatchJobError:
traceback.print_exc(file=sys.stderr)
# TODO: why reraise as CalledProcessError?
raise CalledProcessError(1, str(args), output=output_string)
raise CalledProcessError(1, str(args), output=script_output)

def _write_sensitive_values(self, output_file, vault_token: Optional[str]):
output_file.write(f"spark.openeo.sentinelhub.client.id.default={self._default_sentinel_hub_client_id}\n")
Expand All @@ -2444,13 +2444,13 @@ def _write_sensitive_values(self, output_file, vault_token: Optional[str]):
output_file.write(f"spark.openeo.vault.token={vault_token}\n")

@staticmethod
def _extract_application_id(stream) -> str:
def _extract_application_id(script_output: str) -> str:
regex = re.compile(r"^.*Application report for (application_\d{13}_\d+)\s\(state:.*", re.MULTILINE)
match = regex.search(stream)
match = regex.search(script_output)
if match:
return match.group(1)
else:
raise _BatchJobError(stream)
raise _BatchJobError(script_output)

# TODO: encapsulate this SHub stuff in a dedicated class?
def _schedule_and_get_dependencies( # some we schedule ourselves, some already exist
Expand Down
6 changes: 4 additions & 2 deletions openeogeotrellis/job_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,10 @@ def get_running_jobs(self, *, user_limit: Optional[int] = 1000, parse_specificat
if job_ids:
stats["user_id with jobs"] += 1
for job_id in job_ids:
yield self.get_job(job_id, user_id, parse_specification=parse_specification)
stats["job_ids"] += 1
job_info = self.get_job(job_id, user_id, parse_specification=parse_specification)
if job_info.get("application_id"):
yield job_info
stats["job_ids"] += 1
else:
stats["user_id without jobs"] += 1

Expand Down
19 changes: 1 addition & 18 deletions openeogeotrellis/job_tracker_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,26 +425,9 @@ def update_statuses(self, fail_fast: bool = False) -> None:

job_id = job_info["job_id"]
user_id = job_info["user_id"]
application_id = job_info["application_id"]

try:
application_id = job_info.get("application_id")
status = job_info.get("status")

if not application_id:
# No application_id typically means that job hasn't been started yet.
created = job_info.get("created")
if created:
age = dt.datetime.utcnow() - rfc3339.parse_datetime(created)
else:
age = "unknown"
# TODO: handle very old, non-started jobs? E.g. mark as error?
_log.info(
f"Skipping job without application_id: {job_id=}, {created=}, {age=}, {status=}",
extra={"job_id": job_id, "user_id": user_id}
)
stats[f"skip due to no application_id ({status=})"] += 1
continue

self._sync_job_status(
job_id=job_id,
user_id=user_id,
Expand Down
82 changes: 0 additions & 82 deletions tests/test_job_tracker_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,88 +699,6 @@ def test_yarn_zookeeper_unexpected_yarn_error(
)
]

def test_yarn_zookeeper_no_app_id(
self,
zk_job_registry,
yarn_mock,
job_tracker,
elastic_job_registry,
caplog,
time_machine,
):
caplog.set_level(logging.INFO)

time_machine.move_to("2022-12-14T12:00:00Z", tick=False)

# Job without app id (not started yet)
user_id = "john"
job_id = "job-123"
zk_job_registry.register(
job_id=job_id,
user_id=user_id,
api_version="1.2.3",
specification=ZkJobRegistry.build_specification_dict(
process_graph=DUMMY_PG_1, job_options=DUMMY_JOB_OPTIONS
),
)
elastic_job_registry.create_job(
job_id=job_id, user_id=user_id, process=DUMMY_PROCESS_1, job_options=DUMMY_JOB_OPTIONS
)

# Another job that has an app id (already running)
zk_job_registry.register(
job_id=job_id + "-other",
user_id=user_id,
api_version="1.2.3",
specification=ZkJobRegistry.build_specification_dict(
process_graph=DUMMY_PG_1, job_options=DUMMY_JOB_OPTIONS
),
)
elastic_job_registry.create_job(
job_id=job_id + "-other", user_id=user_id, process=DUMMY_PROCESS_1, job_options=DUMMY_JOB_OPTIONS
)
app_other = yarn_mock.submit(app_id="app-123-other").set_running()
zk_job_registry.set_application_id(
job_id=job_id + "-other", user_id=user_id, application_id=app_other.app_id
)

def zk_job_info() -> dict:
return zk_job_registry.get_job(job_id=job_id, user_id=user_id)

# Trigger `update_statuses` a bit later
time_machine.move_to("2022-12-14T12:30:00Z", tick=False)
job_tracker.update_statuses()
assert zk_job_info() == DictSubSet(
{
"status": "created",
"created": "2022-12-14T12:00:00Z",
}
)
assert elastic_job_registry.db[job_id] == DictSubSet(
{
"status": "created",
"created": "2022-12-14T12:00:00Z",
"updated": "2022-12-14T12:00:00Z",
}
)

assert "ERROR" not in caplog.text
assert caplog.text == re_assert.Matches(
".*Skipping job without application_id: job_id='job-123'.*age.*seconds=1800.*status='created'",
flags=re.DOTALL,
)

[stats] = _extract_update_statuses_stats(caplog)
assert stats == {
"collected jobs": 2,
"skip due to no application_id (status='created')": 1,
"get metadata attempt": 1,
"job with previous_status='created'": 1,
"new metadata": 1,
"status change": 1,
"status change 'created' -> 'running'": 1,
}

def test_yarn_zookeeper_yarn_failed_to_launch_container(
self,
zk_job_registry,
Expand Down