Skip to content

Commit

Permalink
EJR: automatically inject job_id extra logging
Browse files Browse the repository at this point in the history
Related to #163
  • Loading branch information
soxofaan committed Oct 3, 2023
1 parent 25ee566 commit a940528
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 58 deletions.
108 changes: 54 additions & 54 deletions openeo_driver/jobregistry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import argparse
import contextlib
import datetime as dt
import json
import logging
Expand Down Expand Up @@ -241,6 +242,19 @@ def set_user_agent(self):
user_agent += f"/{self._backend_id}"
self._session.headers["User-Agent"] = user_agent

@contextlib.contextmanager
def _with_extra_logging(self, **kwargs):
"""Context manager to temporarily add extra logging fields in a context"""
orig = self.logger
extra = kwargs
if isinstance(orig, logging.LoggerAdapter):
extra = {**orig.extra, **extra}
self.logger = logging.LoggerAdapter(logger=orig, extra=extra)
try:
yield
finally:
self.logger = orig

@property
def backend_id(self) -> str:
assert self._backend_id
Expand Down Expand Up @@ -282,13 +296,9 @@ def _do_request(
json: Union[dict, list, None] = None,
use_auth: bool = True,
expected_status: int = 200,
logging_extra: Optional[dict] = None,
) -> Union[dict, list, None]:
"""Do an HTTP request to Elastic Job Tracker service."""
with TimingLogger(
logger=(lambda m: self.logger.debug(m, extra=logging_extra)),
title=f"EJR Request `{method} {path}`",
):
with TimingLogger(logger=self.logger.debug, title=f"EJR Request `{method} {path}`"):
headers = {}
if use_auth:
access_token = self._cache.get_or_call(
Expand All @@ -300,10 +310,7 @@ def _do_request(
headers["Authorization"] = f"Bearer {access_token}"

url = url_join(self._api_url, path)
self.logger.debug(
f"Doing EJR request `{method} {url}` {headers.keys()=}",
extra=logging_extra,
)
self.logger.debug(f"Doing EJR request `{method} {url}` {headers.keys()=}")
if self._debug_show_curl:
curl_command = self._as_curl(method=method, url=url, data=json, headers=headers)
self.logger.debug(f"Equivalent curl command: {curl_command}")
Expand All @@ -315,10 +322,7 @@ def _do_request(
headers=headers,
timeout=self._REQUEST_TIMEOUT,
)
self.logger.debug(
f"EJR response on `{method} {path}`: {response.status_code!r}",
extra=logging_extra,
)
self.logger.debug(f"EJR response on `{method} {path}`: {response.status_code!r}")
if expected_status and response.status_code != expected_status:
raise EjrHttpError.from_response(response=response)
else:
Expand Down Expand Up @@ -382,48 +386,46 @@ def create_job(
"api_version": api_version,
# TODO: additional technical metadata, see https://github.com/Open-EO/openeo-api/issues/472
}
logging_extra = {"job_id": job_id}
self.logger.info(f"EJR creating {job_id=} {created=}", extra=logging_extra)
return self._do_request(
"POST",
"/jobs",
json=job_data,
expected_status=201,
logging_extra=logging_extra,
)
with self._with_extra_logging(job_id=job_id):
self.logger.info(f"EJR creating {job_id=} {created=}")
return self._do_request("POST", "/jobs", json=job_data, expected_status=201)

def get_job(self, job_id: str, fields: Optional[List[str]] = None) -> JobDict:
query = {
"bool": {
"filter": [
{"term": {"backend_id": self.backend_id}},
{"term": {"job_id": job_id}},
]
with self._with_extra_logging(job_id=job_id):
self.logger.info(f"EJR get job data {job_id=}")
query = {
"bool": {
"filter": [
{"term": {"backend_id": self.backend_id}},
{"term": {"job_id": job_id}},
]
}
}
}

# Return full document, by default
jobs = self._search(query=query, fields=fields or ["*"])
if len(jobs) == 1:
job = jobs[0]
assert job["job_id"] == job_id, f"{job['job_id']=} != {job_id=}"
return job
elif len(jobs) == 0:
raise JobNotFoundException(job_id=job_id)
else:
summary = [{k: j.get(k) for k in ["user_id", "created"]} for j in jobs]
self.logger.error(f"Found multiple ({len(jobs)}) jobs for {job_id=}: {repr_truncate(summary, width=200)}")
raise InternalException(message=f"Found {len(jobs)} jobs for {job_id=}")
# Return full document, by default
jobs = self._search(query=query, fields=fields or ["*"])
if len(jobs) == 1:
job = jobs[0]
assert job["job_id"] == job_id, f"{job['job_id']=} != {job_id=}"
return job
elif len(jobs) == 0:
raise JobNotFoundException(job_id=job_id)
else:
summary = [{k: j.get(k) for k in ["user_id", "created"]} for j in jobs]
self.logger.error(
f"Found multiple ({len(jobs)}) jobs for {job_id=}: {repr_truncate(summary, width=200)}"
)
raise InternalException(message=f"Found {len(jobs)} jobs for {job_id=}")

def delete_job(self, job_id: str) -> None:
try:
self._do_request(method="DELETE", path=f"/jobs/{job_id}")
logging_extra = {"job_id": job_id}
self.logger.info(f"EJR deleted {job_id=}", extra=logging_extra)
except EjrHttpError as e:
if e.status_code == 404:
raise JobNotFoundException(job_id=job_id) from e
raise e
with self._with_extra_logging(job_id=job_id):
try:
self._do_request(method="DELETE", path=f"/jobs/{job_id}")
self.logger.info(f"EJR deleted {job_id=}")
except EjrHttpError as e:
if e.status_code == 404:
raise JobNotFoundException(job_id=job_id) from e
raise e

def set_status(
self,
Expand All @@ -446,11 +448,9 @@ def set_status(

def _update(self, job_id: str, data: dict) -> JobDict:
"""Generic update method"""
logging_extra = {"job_id": job_id}
self.logger.info(f"EJR update {job_id=} {data=}", extra=logging_extra)
return self._do_request(
"PATCH", f"/jobs/{job_id}", json=data, logging_extra=logging_extra
)
with self._with_extra_logging(job_id=job_id):
self.logger.info(f"EJR update {job_id=} {data=}")
return self._do_request("PATCH", f"/jobs/{job_id}", json=data)

def set_dependencies(
self, job_id: str, dependencies: List[Dict[str, str]]
Expand Down
39 changes: 35 additions & 4 deletions tests/test_jobregistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,10 +606,6 @@ def test_job_id_logging(self, requests_mock, oidc_mock, ejr, caplog):
"""Check that job_id logging is passed through as logging extra in appropriate places"""
caplog.set_level(logging.DEBUG)

class Formatter:
def format(self, record: logging.LogRecord):
job_id = getattr(record, "job_id", None)
return f"{record.name}:{job_id}:{record.message}"

job_id = "j-123"

Expand All @@ -623,6 +619,12 @@ def patch_job(request, context):

requests_mock.post(f"{self.EJR_API_URL}/jobs", json=post_jobs)
requests_mock.patch(f"{self.EJR_API_URL}/jobs/{job_id}", json=patch_job)
requests_mock.delete(f"{self.EJR_API_URL}/jobs/{job_id}", status_code=200, content=b"")

class Formatter:
def format(self, record: logging.LogRecord):
job_id = getattr(record, "job_id", None)
return f"{record.name}:{job_id}:{record.message}"

with caplog_with_custom_formatter(caplog=caplog, format=Formatter()):
with time_machine.travel("2020-01-02 03:04:05+00", tick=False):
Expand All @@ -635,6 +637,9 @@ def patch_job(request, context):
with time_machine.travel("2020-01-02 03:44:55+00", tick=False):
ejr.set_status(job_id=job_id, status=JOB_STATUS.RUNNING)

with time_machine.travel("2020-01-03 12:00:00+00", tick=False):
ejr.delete_job(job_id=job_id)

logs = caplog.text.strip().split("\n")

for expected in [
Expand All @@ -652,5 +657,31 @@ def patch_job(request, context):
"openeo_driver.jobregistry.elastic:j-123:EJR update job_id='j-123' data={'status': 'running', 'updated': '2020-01-02T03:44:55Z'}",
"openeo_driver.jobregistry.elastic:j-123:EJR response on `PATCH /jobs/j-123`: 200",
"openeo_driver.jobregistry.elastic:j-123:EJR Request `PATCH /jobs/j-123`: end 2020-01-02 03:44:55, elapsed 0:00:00",
# delete
"openeo_driver.jobregistry.elastic:j-123:EJR Request `DELETE /jobs/j-123`: start 2020-01-03 12:00:00",
"openeo_driver.jobregistry.elastic:j-123:EJR deleted job_id='j-123'",
]:
assert expected in logs

def test_with_extra_logging(self, requests_mock, oidc_mock, ejr, caplog):
"""Test that "extra logging fields" (like job_id) do not leak outside of context"""
caplog.set_level(logging.INFO)

class Formatter:
def format(self, record: logging.LogRecord):
job_id = getattr(record, "job_id", None)
return f"{record.name} [{job_id}] {record.message}"

with caplog_with_custom_formatter(caplog=caplog, format=Formatter()):
# Trigger failure during _with_extra_logging
requests_mock.post(f"{self.EJR_API_URL}/jobs/search", status_code=500)
with pytest.raises(EjrHttpError):
_ = ejr.get_job(job_id="job-123")

# Health check should not be logged with job id in logs
requests_mock.get(f"{self.EJR_API_URL}/health", json={"ok": "yep"})
ejr.health_check(use_auth=False, log=True)

logs = caplog.text.strip().split("\n")
assert "openeo_driver.jobregistry.elastic [job-123] EJR get job data job_id='job-123'" in logs
assert "openeo_driver.jobregistry.elastic [None] EJR health check {'ok': 'yep'}" in logs

0 comments on commit a940528

Please sign in to comment.