Skip to content

Commit

Permalink
Merge pull request #197 from populationgenomics/pool-name
Browse files Browse the repository at this point in the history
Allow to select a pool for a job through a label
  • Loading branch information
lgruen authored May 31, 2022
2 parents 8305632 + ac1e946 commit 6b70db7
Show file tree
Hide file tree
Showing 11 changed files with 75 additions and 7 deletions.
3 changes: 3 additions & 0 deletions batch/batch/driver/instance_collection/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def __init__(
self.data_disk_size_gb = config.data_disk_size_gb
self.data_disk_size_standing_gb = config.data_disk_size_standing_gb
self.preemptible = config.preemptible
self.label = config.label

@property
def local_ssd_data_disk(self) -> bool:
Expand All @@ -130,6 +131,7 @@ def config(self):
'max_instances': self.max_instances,
'max_live_instances': self.max_live_instances,
'preemptible': self.preemptible,
'label': self.label,
}

def configure(self, pool_config: PoolConfig):
Expand All @@ -148,6 +150,7 @@ def configure(self, pool_config: PoolConfig):
self.max_instances = pool_config.max_instances
self.max_live_instances = pool_config.max_live_instances
self.preemptible = pool_config.preemptible
self.label = pool_config.label

def adjust_for_remove_instance(self, instance):
super().adjust_for_remove_instance(instance)
Expand Down
3 changes: 3 additions & 0 deletions batch/batch/driver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,8 @@ async def pool_config_update(request, userdata): # pylint: disable=unused-argum
session, 'Max live instances', post['max_live_instances'], lambda v: v > 0, 'a positive integer'
)

label = post['label']

enable_standing_worker = 'enable_standing_worker' in post

possible_worker_cores = []
Expand Down Expand Up @@ -537,6 +539,7 @@ async def pool_config_update(request, userdata): # pylint: disable=unused-argum
max_instances,
max_live_instances,
pool.preemptible,
label,
)
await pool_config.update_database(db)
pool.configure(pool_config)
Expand Down
1 change: 1 addition & 0 deletions batch/batch/driver/templates/pool.html
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ <h2>Configuration</h2>
<div>Standing worker cores: <input name="standing_worker_cores" value="{{ pool.standing_worker_cores }}" /></div>
<div>Max instances: <input name="max_instances" value="{{ pool.max_instances }}" /></div>
<div>Max live instances: <input name="max_live_instances" value="{{ pool.max_live_instances }}" /></div>
<div>Label: <input name="label" value="{{ pool.label }}" /></div>
<input type="hidden" name="_csrf" value="{{ csrf_token }}"/>
<button>
Update
Expand Down
6 changes: 5 additions & 1 deletion batch/batch/front_end/front_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,7 @@ async def _create_jobs(userdata: dict, job_specs: dict, batch_id: int, app: aioh

worker_type = None
machine_type = resources.get('machine_type')
pool_label = resources.get('pool_label') or ''
preemptible = resources.get('preemptible', BATCH_JOB_DEFAULT_PREEMPTIBLE)

if machine_type and machine_type not in valid_machine_types(cloud):
Expand All @@ -894,6 +895,9 @@ async def _create_jobs(userdata: dict, job_specs: dict, batch_id: int, app: aioh
if machine_type and ('cpu' in resources or 'memory' in resources):
raise web.HTTPBadRequest(reason='cannot specify cpu and memory with machine_type')

if machine_type and pool_label:
raise web.HTTPBadRequest(reason='cannot specify pool label with machine_type')

if spec['process']['type'] == 'jvm':
jvm_requested_cpu = parse_cpu_in_mcpu(resources.get('cpu', BATCH_JOB_DEFAULT_CPU))
if 'cpu' in resources and jvm_requested_cpu not in (1000, 8000):
Expand Down Expand Up @@ -961,7 +965,7 @@ async def _create_jobs(userdata: dict, job_specs: dict, batch_id: int, app: aioh
inst_coll_configs: InstanceCollectionConfigs = app['inst_coll_configs']

result, exc = inst_coll_configs.select_inst_coll(
cloud, machine_type, preemptible, worker_type, req_cores_mcpu, req_memory_bytes, req_storage_bytes
cloud, machine_type, pool_label, preemptible, worker_type, req_cores_mcpu, req_memory_bytes, req_storage_bytes
)

if exc:
Expand Down
1 change: 1 addition & 0 deletions batch/batch/front_end/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
'cpu': regex(CPU_REGEXPAT, CPU_REGEX),
'storage': regex(STORAGE_REGEXPAT, STORAGE_REGEX),
'machine_type': str_type,
'pool_label': str_type,
'preemptible': bool_type,
}
),
Expand Down
19 changes: 13 additions & 6 deletions batch/batch/inst_coll_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def from_record(record):
max_instances=record['max_instances'],
max_live_instances=record['max_live_instances'],
preemptible=bool(record['preemptible']),
label=record['label'],
)

async def update_database(self, db: Database):
Expand All @@ -93,7 +94,8 @@ async def update_database(self, db: Database):
boot_disk_size_gb = %s,
max_instances = %s,
max_live_instances = %s,
preemptible = %s
preemptible = %s,
label = %s
WHERE pools.name = %s;
''',
(
Expand All @@ -106,6 +108,7 @@ async def update_database(self, db: Database):
self.max_instances,
self.max_live_instances,
self.preemptible,
self.label,
self.name,
),
)
Expand All @@ -124,6 +127,7 @@ def __init__(
max_instances: int,
max_live_instances: int,
preemptible: bool,
label: str,
):
self.name = name
self.cloud = cloud
Expand All @@ -137,6 +141,7 @@ def __init__(
self.max_instances = max_instances
self.max_live_instances = max_live_instances
self.preemptible = preemptible
self.label = label

def instance_config(self, product_versions: ProductVersions, location: str) -> InstanceConfig:
return instance_config_from_pool_config(self, product_versions, location)
Expand Down Expand Up @@ -273,13 +278,13 @@ async def refresh(self, db: Database):
self.resource_rates = resource_rates
self.product_versions.update(product_versions_data)

def select_pool_from_cost(self, cloud, cores_mcpu, memory_bytes, storage_bytes, preemptible):
def select_pool_from_cost(self, cloud, pool_label, cores_mcpu, memory_bytes, storage_bytes, preemptible):
assert self.resource_rates is not None

optimal_result = None
optimal_cost = None
for pool in self.name_pool_config.values():
if pool.cloud != cloud or pool.preemptible != preemptible:
if pool.cloud != cloud or pool.preemptible != preemptible or pool.label != pool_label:
continue

result = pool.convert_requests_to_resources(cores_mcpu, memory_bytes, storage_bytes)
Expand All @@ -304,9 +309,9 @@ def select_pool_from_cost(self, cloud, cores_mcpu, memory_bytes, storage_bytes,
optimal_result = (pool.name, maybe_cores_mcpu, maybe_memory_bytes, maybe_storage_gib)
return optimal_result

def select_pool_from_worker_type(self, cloud, worker_type, cores_mcpu, memory_bytes, storage_bytes, preemptible):
def select_pool_from_worker_type(self, cloud, pool_label, worker_type, cores_mcpu, memory_bytes, storage_bytes, preemptible):
for pool in self.name_pool_config.values():
if pool.cloud == cloud and pool.worker_type == worker_type and pool.preemptible == preemptible:
if pool.cloud == cloud and pool.worker_type == worker_type and pool.preemptible == preemptible and pool.label == pool_label:
result = pool.convert_requests_to_resources(cores_mcpu, memory_bytes, storage_bytes)
if result:
actual_cores_mcpu, actual_memory_bytes, acutal_storage_gib = result
Expand All @@ -319,11 +324,12 @@ def select_job_private(self, cloud, machine_type, storage_bytes):
return self.jpim_config.convert_requests_to_resources(machine_type, storage_bytes)

def select_inst_coll(
self, cloud, machine_type, preemptible, worker_type, req_cores_mcpu, req_memory_bytes, req_storage_bytes
self, cloud, machine_type, pool_label, preemptible, worker_type, req_cores_mcpu, req_memory_bytes, req_storage_bytes
):
if worker_type is not None and machine_type is None:
result = self.select_pool_from_worker_type(
cloud=cloud,
pool_label=pool_label,
worker_type=worker_type,
cores_mcpu=req_cores_mcpu,
memory_bytes=req_memory_bytes,
Expand All @@ -333,6 +339,7 @@ def select_inst_coll(
elif worker_type is None and machine_type is None:
result = self.select_pool_from_cost(
cloud=cloud,
pool_label=pool_label,
cores_mcpu=req_cores_mcpu,
memory_bytes=req_memory_bytes,
storage_bytes=req_storage_bytes,
Expand Down
1 change: 1 addition & 0 deletions batch/sql/add-pool-label.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE `pools` ADD `label` VARCHAR(100) NOT NULL;
43 changes: 43 additions & 0 deletions batch/sql/add-seqr-pools.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
-- Adds dedicated pools for the seqr loading pipeline, with the 'seqr' pool label.

INSERT INTO inst_colls (`name`, `is_pool`, `boot_disk_size_gb`, `max_instances`, `max_live_instances`, `cloud`)
SELECT 'seqr-standard', 1, boot_disk_size_gb, max_instances, max_live_instances, cloud
FROM inst_colls
WHERE name = 'standard';

INSERT INTO inst_colls (`name`, `is_pool`, `boot_disk_size_gb`, `max_instances`, `max_live_instances`, `cloud`)
SELECT 'seqr-highmem', 1, boot_disk_size_gb, max_instances, max_live_instances, cloud
FROM inst_colls
WHERE name = 'highmem';

INSERT INTO inst_colls (`name`, `is_pool`, `boot_disk_size_gb`, `max_instances`, `max_live_instances`, `cloud`)
SELECT 'seqr-highcpu', 1, boot_disk_size_gb, max_instances, max_live_instances, cloud
FROM inst_colls
WHERE name = 'highcpu';

INSERT INTO pools (`name`, `worker_type`, `worker_cores`, `worker_local_ssd_data_disk`,
`worker_external_ssd_data_disk_size_gb`, `enable_standing_worker`, `standing_worker_cores`,
`preemptible`, `label`)
SELECT 'seqr-standard', worker_type, worker_cores, worker_local_ssd_data_disk,
worker_external_ssd_data_disk_size_gb, FALSE, standing_worker_cores,
TRUE, 'seqr'
FROM pools
WHERE name = 'standard';

INSERT INTO pools (`name`, `worker_type`, `worker_cores`, `worker_local_ssd_data_disk`,
`worker_external_ssd_data_disk_size_gb`, `enable_standing_worker`, `standing_worker_cores`,
`preemptible`, `label`)
SELECT 'seqr-highmem', worker_type, worker_cores, worker_local_ssd_data_disk,
worker_external_ssd_data_disk_size_gb, FALSE, standing_worker_cores,
TRUE, 'seqr'
FROM pools
WHERE name = 'highmem';

INSERT INTO pools (`name`, `worker_type`, `worker_cores`, `worker_local_ssd_data_disk`,
`worker_external_ssd_data_disk_size_gb`, `enable_standing_worker`, `standing_worker_cores`,
`preemptible`, `label`)
SELECT 'seqr-highcpu', worker_type, worker_cores, worker_local_ssd_data_disk,
worker_external_ssd_data_disk_size_gb, FALSE, standing_worker_cores,
TRUE, 'seqr'
FROM pools
WHERE name = 'highcpu';
2 changes: 2 additions & 0 deletions build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2003,6 +2003,8 @@ steps:
script: /io/sql/kill-mjc-deadlocks.sql
- name: add-nonpreemptible-pools
script: /io/sql/add-nonpreemptible-pools.sql
- name: add-pool-label
script: /io/sql/add-pool-label.sql
inputs:
- from: /repo/batch/sql
to: /io/sql
Expand Down
2 changes: 2 additions & 0 deletions hail/python/hailtop/batch/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,8 @@ async def compile_job(job):
resources['storage'] = job._storage
if job._machine_type:
resources['machine_type'] = job._machine_type
if job._pool_label:
resources['pool_label'] = job._pool_label
if job._preemptible is not None:
resources['preemptible'] = job._preemptible

Expand Down
1 change: 1 addition & 0 deletions hail/python/hailtop/batch/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def __init__(self,
self._always_run: bool = False
self._preemptible: Optional[bool] = None
self._machine_type: Optional[str] = None
self._pool_label: Optional[str] = None
self._timeout: Optional[Union[int, float]] = None
self._cloudfuse: List[Tuple[str, str, bool]] = []
self._env: Dict[str, str] = {}
Expand Down

0 comments on commit 6b70db7

Please sign in to comment.