Skip to content

Commit

Permalink
Merge pull request #333 from populationgenomics/upstream-14282
Browse files Browse the repository at this point in the history
Merge upstream HEAD(13de4e6, 2024-05-14) Add job groups [migration might take a while!]
  • Loading branch information
milo-hyben authored May 24, 2024
2 parents e864ba6 + b37df1e commit 4b8ee7f
Show file tree
Hide file tree
Showing 80 changed files with 4,518 additions and 1,395 deletions.
8 changes: 4 additions & 4 deletions auth/auth/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import kubernetes_asyncio.client
import kubernetes_asyncio.client.rest
import kubernetes_asyncio.config
import uvloop
from aiohttp import web
from prometheus_async.aio.web import server_stats # type: ignore

Expand All @@ -33,7 +32,7 @@
from gear.auth import AIOHTTPHandler, get_session_id
from gear.cloud_config import get_global_config
from gear.profiling import install_profiler_if_requested
from hailtop import httpx
from hailtop import httpx, uvloopx
from hailtop.auth import AzureFlow, Flow, GoogleFlow, IdentityProvider
from hailtop.config import get_deploy_config
from hailtop.hail_logging import AccessLogger
Expand All @@ -56,8 +55,6 @@

log = logging.getLogger('auth')

uvloop.install()

CLOUD = get_global_config()['cloud']
DEFAULT_NAMESPACE = os.environ['HAIL_DEFAULT_NAMESPACE']

Expand Down Expand Up @@ -842,6 +839,7 @@ async def on_startup(app):
kubernetes_asyncio.config.load_incluster_config()
app[AppKeys.K8S_CLIENT] = kubernetes_asyncio.client.CoreV1Api()
exit_stack.push_async_callback(app[AppKeys.K8S_CLIENT].api_client.rest_client.pool_manager.close)

app[AppKeys.K8S_CACHE] = K8sCache(app[AppKeys.K8S_CLIENT])


Expand Down Expand Up @@ -886,6 +884,8 @@ async def auth_check_csrf_token(request: web.Request, handler: AIOHTTPHandler):


def run():
uvloopx.install()

install_profiler_if_requested('auth')

app = web.Application(middlewares=[auth_check_csrf_token, monitor_endpoints_middleware])
Expand Down
128 changes: 90 additions & 38 deletions batch/batch/batch.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
import json
import logging
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, cast

from gear import transaction
from hailtop.batch_client.types import CostBreakdownEntry, JobListEntryV1Alpha
from hailtop.batch_client.globals import ROOT_JOB_GROUP_ID
from hailtop.batch_client.types import CostBreakdownEntry, GetJobGroupResponseV1Alpha, JobListEntryV1Alpha
from hailtop.utils import humanize_timedelta_msecs, time_msecs_str

from .batch_format_version import BatchFormatVersion
from .exceptions import NonExistentBatchError, OpenBatchError
from .exceptions import NonExistentJobGroupError
from .utils import coalesce

log = logging.getLogger('batch')


def _maybe_time_msecs_str(t: Optional[int]) -> Optional[str]:
if t is not None:
return time_msecs_str(t)
return None


def cost_breakdown_to_dict(cost_breakdown: Dict[str, float]) -> List[CostBreakdownEntry]:
return [{'resource': resource, 'cost': cost} for resource, cost in cost_breakdown.items()]

Expand All @@ -30,14 +37,9 @@ def batch_record_to_dict(record: Dict[str, Any]) -> Dict[str, Any]:
else:
state = 'running'

def _time_msecs_str(t):
if t:
return time_msecs_str(t)
return None

time_created = _time_msecs_str(record['time_created'])
time_closed = _time_msecs_str(record['time_closed'])
time_completed = _time_msecs_str(record['time_completed'])
time_created = _maybe_time_msecs_str(record['time_created'])
time_closed = _maybe_time_msecs_str(record['time_closed'])
time_completed = _maybe_time_msecs_str(record['time_completed'])

if record['time_created'] and record['time_completed']:
duration_ms = record['time_completed'] - record['time_created']
Expand All @@ -50,7 +52,7 @@ def _time_msecs_str(t):
if cost_breakdown is not None:
cost_breakdown = cost_breakdown_to_dict(json.loads(cost_breakdown))

d = {
batch_response = {
'id': record['id'],
'user': record['user'],
'billing_project': record['billing_project'],
Expand All @@ -75,9 +77,55 @@ def _time_msecs_str(t):

attributes = json.loads(record['attributes'])
if attributes:
d['attributes'] = attributes
batch_response['attributes'] = attributes

return batch_response


def job_group_record_to_dict(record: Dict[str, Any]) -> GetJobGroupResponseV1Alpha:
if record['n_failed'] > 0:
state = 'failure'
elif record['cancelled'] or record['n_cancelled'] > 0:
state = 'cancelled'
elif record['state'] == 'complete':
assert record['n_succeeded'] == record['n_jobs']
state = 'success'
else:
state = 'running'

time_created = _maybe_time_msecs_str(record['time_created'])
time_completed = _maybe_time_msecs_str(record['time_completed'])

return d
if record['time_created'] and record['time_completed']:
duration_ms = record['time_completed'] - record['time_created']
else:
duration_ms = None

if record['cost_breakdown'] is not None:
record['cost_breakdown'] = cost_breakdown_to_dict(json.loads(record['cost_breakdown']))

job_group_response = {
'batch_id': record['batch_id'],
'job_group_id': record['job_group_id'],
'state': state,
'complete': record['state'] == 'complete',
'n_jobs': record['n_jobs'],
'n_completed': record['n_completed'],
'n_succeeded': record['n_succeeded'],
'n_failed': record['n_failed'],
'n_cancelled': record['n_cancelled'],
'time_created': time_created,
'time_completed': time_completed,
'duration': duration_ms,
'cost': coalesce(record['cost'], 0),
'cost_breakdown': record['cost_breakdown'],
}

attributes = json.loads(record['attributes'])
if attributes:
job_group_response['attributes'] = attributes

return cast(GetJobGroupResponseV1Alpha, job_group_response)


def job_record_to_dict(record: Dict[str, Any], name: Optional[str]) -> JobListEntryV1Alpha:
Expand All @@ -95,38 +143,42 @@ def job_record_to_dict(record: Dict[str, Any], name: Optional[str]) -> JobListEn
if cost_breakdown is not None:
cost_breakdown = cost_breakdown_to_dict(json.loads(cost_breakdown))

return {
'batch_id': record['batch_id'],
'job_id': record['job_id'],
'name': name,
'user': record['user'],
'billing_project': record['billing_project'],
'state': record['state'],
'exit_code': exit_code,
'duration': duration,
'cost': coalesce(record.get('cost'), 0),
'msec_mcpu': record['msec_mcpu'],
'cost_breakdown': cost_breakdown,
}


async def cancel_batch_in_db(db, batch_id):
return cast(
JobListEntryV1Alpha,
{
'batch_id': record['batch_id'],
'job_id': record['job_id'],
'name': name,
'user': record['user'],
'billing_project': record['billing_project'],
'state': record['state'],
'exit_code': exit_code,
'duration': duration,
'cost': coalesce(record.get('cost'), 0),
'msec_mcpu': record['msec_mcpu'],
'cost_breakdown': cost_breakdown,
},
)


async def cancel_job_group_in_db(db, batch_id, job_group_id):
@transaction(db)
async def cancel(tx):
record = await tx.execute_and_fetchone(
"""
SELECT `state` FROM batches
WHERE id = %s AND NOT deleted
SELECT 1
FROM job_groups
LEFT JOIN batches ON batches.id = job_groups.batch_id
LEFT JOIN batch_updates ON job_groups.batch_id = batch_updates.batch_id AND
job_groups.update_id = batch_updates.update_id
WHERE job_groups.batch_id = %s AND job_groups.job_group_id = %s AND NOT deleted AND (batch_updates.committed OR job_groups.job_group_id = %s)
FOR UPDATE;
""",
(batch_id,),
(batch_id, job_group_id, ROOT_JOB_GROUP_ID),
)
if not record:
raise NonExistentBatchError(batch_id)

if record['state'] == 'open':
raise OpenBatchError(batch_id)
raise NonExistentJobGroupError(batch_id, job_group_id)

await tx.just_execute('CALL cancel_batch(%s);', (batch_id,))
await tx.just_execute('CALL cancel_job_group(%s, %s);', (batch_id, job_group_id))

await cancel()
1 change: 0 additions & 1 deletion batch/batch/constants.py

This file was deleted.

Loading

0 comments on commit 4b8ee7f

Please sign in to comment.