Skip to content

Commit

Permalink
Expose job resource in batch API (#329)
Browse files Browse the repository at this point in the history
* Expose job resource dataframe in batch API

* Testing fixes

* Apply suggestions from code review

Co-authored-by: John Marshall <john.marshall@populationgenomics.org.au>

---------

Co-authored-by: Michael Franklin <illusional@users.noreply.github.com>
Co-authored-by: John Marshall <john.marshall@populationgenomics.org.au>
  • Loading branch information
3 people authored Feb 22, 2024
1 parent be23273 commit 8f6797b
Showing 1 changed file with 60 additions and 1 deletion.
61 changes: 60 additions & 1 deletion batch/batch/front_end/front_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,59 @@ async def get_jobs_for_billing(request, userdata, batch_id):
return web.json_response(resp)


@routes.get('/api/v1alpha/batches/{batch_id}/jobs/{job_id}/resource_usage')
@billing_project_users_only()
async def get_job_resource_usage(request: web.Request, _, batch_id: int) -> web.Response:
"""
Get the resource_usage data for a job. The data is returned as a JSON object
transformed from a pandas DataFrame using the 'split' orientation.
Returns
-------
Example response:
{
// eg: input, main, output
"[job_stage]": {
"columns":[
"time_msecs",
"memory_in_bytes",
"cpu_usage",
"non_io_storage_in_bytes",
"io_storage_in_bytes",
"network_bandwidth_upload_in_bytes_per_second",
"network_bandwidth_download_in_bytes_per_second"
],
"index":[0, 1, ...],
"data": [[<records>]],
}, ...
}
"""

# pull this out separately as billing_project_users_only() does a permission
# check for us, but has a fixed signature
job_id = int(request.match_info['job_id'])

job_record = await _get_job_record(request.app, batch_id, job_id)

# effectively the auth check
if not job_record:
raise web.HTTPNotFound()

resources: Optional[Dict[str, Optional[pd.DataFrame]]] = await _get_job_resource_usage_from_record(
app=request.app, record=job_record, batch_id=batch_id, job_id=job_id
)

if not resources:
# empty response if not available yet
return web.json_response({})

return web.json_response({
stage: stage_resource.to_dict(orient='split')
for stage, stage_resource in resources.items()
if stage_resource is not None
})


async def _get_job_record(app, batch_id, job_id):
db: Database = app['db']

Expand Down Expand Up @@ -636,8 +689,14 @@ async def _get_job_log(app, batch_id, job_id) -> Dict[str, Optional[bytes]]:
return dict(zip(containers, logs))


async def _get_job_resource_usage(app, batch_id, job_id) -> Optional[Dict[str, Optional[pd.DataFrame]]]:
async def _get_job_resource_usage(app, batch_id: int, job_id: int) -> Optional[Dict[str, Optional[pd.DataFrame]]]:
record = await _get_job_record(app, batch_id, job_id)
return await _get_job_resource_usage_from_record(app, record, batch_id=batch_id, job_id=job_id)


async def _get_job_resource_usage_from_record(
app, record, batch_id: int, job_id: int
) -> Optional[Dict[str, Optional[pd.DataFrame]]]:

client_session: httpx.ClientSession = app['client_session']
file_store: FileStore = app['file_store']
Expand Down

0 comments on commit 8f6797b

Please sign in to comment.