Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to select a pool for a job through a label #197

Merged
merged 9 commits into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions batch/batch/driver/instance_collection/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def __init__(
self.data_disk_size_gb = config.data_disk_size_gb
self.data_disk_size_standing_gb = config.data_disk_size_standing_gb
self.preemptible = config.preemptible
self.label = config.label

@property
def local_ssd_data_disk(self) -> bool:
Expand All @@ -130,6 +131,7 @@ def config(self):
'max_instances': self.max_instances,
'max_live_instances': self.max_live_instances,
'preemptible': self.preemptible,
'label': self.label,
}

def configure(self, pool_config: PoolConfig):
Expand All @@ -148,6 +150,7 @@ def configure(self, pool_config: PoolConfig):
self.max_instances = pool_config.max_instances
self.max_live_instances = pool_config.max_live_instances
self.preemptible = pool_config.preemptible
self.label = pool_config.label

def adjust_for_remove_instance(self, instance):
super().adjust_for_remove_instance(instance)
Expand Down
3 changes: 3 additions & 0 deletions batch/batch/driver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,8 @@ async def pool_config_update(request, userdata): # pylint: disable=unused-argum
session, 'Max live instances', post['max_live_instances'], lambda v: v > 0, 'a positive integer'
)

label = post['label']

enable_standing_worker = 'enable_standing_worker' in post

possible_worker_cores = []
Expand Down Expand Up @@ -537,6 +539,7 @@ async def pool_config_update(request, userdata): # pylint: disable=unused-argum
max_instances,
max_live_instances,
pool.preemptible,
label,
)
await pool_config.update_database(db)
pool.configure(pool_config)
Expand Down
1 change: 1 addition & 0 deletions batch/batch/driver/templates/pool.html
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ <h2>Configuration</h2>
<div>Standing worker cores: <input name="standing_worker_cores" value="{{ pool.standing_worker_cores }}" /></div>
<div>Max instances: <input name="max_instances" value="{{ pool.max_instances }}" /></div>
<div>Max live instances: <input name="max_live_instances" value="{{ pool.max_live_instances }}" /></div>
<div>Label: <input name="label" value="{{ pool.label }}" /></div>
<input type="hidden" name="_csrf" value="{{ csrf_token }}"/>
<button>
Update
Expand Down
6 changes: 5 additions & 1 deletion batch/batch/front_end/front_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,7 @@ async def _create_jobs(userdata: dict, job_specs: dict, batch_id: int, app: aioh

worker_type = None
machine_type = resources.get('machine_type')
pool_label = resources.get('pool_label') or ''
preemptible = resources.get('preemptible', BATCH_JOB_DEFAULT_PREEMPTIBLE)

if machine_type and machine_type not in valid_machine_types(cloud):
Expand All @@ -894,6 +895,9 @@ async def _create_jobs(userdata: dict, job_specs: dict, batch_id: int, app: aioh
if machine_type and ('cpu' in resources or 'memory' in resources):
raise web.HTTPBadRequest(reason='cannot specify cpu and memory with machine_type')

if machine_type and pool_label:
raise web.HTTPBadRequest(reason='cannot specify pool label with machine_type')

if spec['process']['type'] == 'jvm':
jvm_requested_cpu = parse_cpu_in_mcpu(resources.get('cpu', BATCH_JOB_DEFAULT_CPU))
if 'cpu' in resources and jvm_requested_cpu not in (1000, 8000):
Expand Down Expand Up @@ -961,7 +965,7 @@ async def _create_jobs(userdata: dict, job_specs: dict, batch_id: int, app: aioh
inst_coll_configs: InstanceCollectionConfigs = app['inst_coll_configs']

result, exc = inst_coll_configs.select_inst_coll(
cloud, machine_type, preemptible, worker_type, req_cores_mcpu, req_memory_bytes, req_storage_bytes
cloud, machine_type, pool_label, preemptible, worker_type, req_cores_mcpu, req_memory_bytes, req_storage_bytes
)

if exc:
Expand Down
1 change: 1 addition & 0 deletions batch/batch/front_end/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
'cpu': regex(CPU_REGEXPAT, CPU_REGEX),
'storage': regex(STORAGE_REGEXPAT, STORAGE_REGEX),
'machine_type': str_type,
'pool_label': str_type,
'preemptible': bool_type,
}
),
Expand Down
19 changes: 13 additions & 6 deletions batch/batch/inst_coll_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def from_record(record):
max_instances=record['max_instances'],
max_live_instances=record['max_live_instances'],
preemptible=bool(record['preemptible']),
label=record['label'],
)

async def update_database(self, db: Database):
Expand All @@ -93,7 +94,8 @@ async def update_database(self, db: Database):
boot_disk_size_gb = %s,
max_instances = %s,
max_live_instances = %s,
preemptible = %s
preemptible = %s,
label = %s
WHERE pools.name = %s;
''',
(
Expand All @@ -106,6 +108,7 @@ async def update_database(self, db: Database):
self.max_instances,
self.max_live_instances,
self.preemptible,
self.label,
self.name,
),
)
Expand All @@ -124,6 +127,7 @@ def __init__(
max_instances: int,
max_live_instances: int,
preemptible: bool,
label: str,
):
self.name = name
self.cloud = cloud
Expand All @@ -137,6 +141,7 @@ def __init__(
self.max_instances = max_instances
self.max_live_instances = max_live_instances
self.preemptible = preemptible
self.label = label

def instance_config(self, product_versions: ProductVersions, location: str) -> InstanceConfig:
return instance_config_from_pool_config(self, product_versions, location)
Expand Down Expand Up @@ -273,13 +278,13 @@ async def refresh(self, db: Database):
self.resource_rates = resource_rates
self.product_versions.update(product_versions_data)

def select_pool_from_cost(self, cloud, cores_mcpu, memory_bytes, storage_bytes, preemptible):
def select_pool_from_cost(self, cloud, pool_label, cores_mcpu, memory_bytes, storage_bytes, preemptible):
assert self.resource_rates is not None

optimal_result = None
optimal_cost = None
for pool in self.name_pool_config.values():
if pool.cloud != cloud or pool.preemptible != preemptible:
if pool.cloud != cloud or pool.preemptible != preemptible or pool.label != pool_label:
continue

result = pool.convert_requests_to_resources(cores_mcpu, memory_bytes, storage_bytes)
Expand All @@ -304,9 +309,9 @@ def select_pool_from_cost(self, cloud, cores_mcpu, memory_bytes, storage_bytes,
optimal_result = (pool.name, maybe_cores_mcpu, maybe_memory_bytes, maybe_storage_gib)
return optimal_result

def select_pool_from_worker_type(self, cloud, worker_type, cores_mcpu, memory_bytes, storage_bytes, preemptible):
def select_pool_from_worker_type(self, cloud, pool_label, worker_type, cores_mcpu, memory_bytes, storage_bytes, preemptible):
for pool in self.name_pool_config.values():
if pool.cloud == cloud and pool.worker_type == worker_type and pool.preemptible == preemptible:
if pool.cloud == cloud and pool.worker_type == worker_type and pool.preemptible == preemptible and pool.label == pool_label:
result = pool.convert_requests_to_resources(cores_mcpu, memory_bytes, storage_bytes)
if result:
actual_cores_mcpu, actual_memory_bytes, acutal_storage_gib = result
Expand All @@ -319,11 +324,12 @@ def select_job_private(self, cloud, machine_type, storage_bytes):
return self.jpim_config.convert_requests_to_resources(machine_type, storage_bytes)

def select_inst_coll(
self, cloud, machine_type, preemptible, worker_type, req_cores_mcpu, req_memory_bytes, req_storage_bytes
self, cloud, machine_type, pool_label, preemptible, worker_type, req_cores_mcpu, req_memory_bytes, req_storage_bytes
):
if worker_type is not None and machine_type is None:
result = self.select_pool_from_worker_type(
cloud=cloud,
pool_label=pool_label,
worker_type=worker_type,
cores_mcpu=req_cores_mcpu,
memory_bytes=req_memory_bytes,
Expand All @@ -333,6 +339,7 @@ def select_inst_coll(
elif worker_type is None and machine_type is None:
result = self.select_pool_from_cost(
cloud=cloud,
pool_label=pool_label,
cores_mcpu=req_cores_mcpu,
memory_bytes=req_memory_bytes,
storage_bytes=req_storage_bytes,
Expand Down
1 change: 1 addition & 0 deletions batch/sql/add-pool-label.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE `pools` ADD `label` VARCHAR(100) NOT NULL;
lgruen marked this conversation as resolved.
Show resolved Hide resolved
43 changes: 43 additions & 0 deletions batch/sql/add-seqr-pools.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
-- Adds dedicated pools for the seqr loading pipeline, with the 'seqr' pool label.

INSERT INTO inst_colls (`name`, `is_pool`, `boot_disk_size_gb`, `max_instances`, `max_live_instances`, `cloud`)
SELECT 'seqr-standard', 1, boot_disk_size_gb, max_instances, max_live_instances, cloud
FROM inst_colls
WHERE name = 'standard';

INSERT INTO inst_colls (`name`, `is_pool`, `boot_disk_size_gb`, `max_instances`, `max_live_instances`, `cloud`)
SELECT 'seqr-highmem', 1, boot_disk_size_gb, max_instances, max_live_instances, cloud
FROM inst_colls
WHERE name = 'highmem';

INSERT INTO inst_colls (`name`, `is_pool`, `boot_disk_size_gb`, `max_instances`, `max_live_instances`, `cloud`)
SELECT 'seqr-highcpu', 1, boot_disk_size_gb, max_instances, max_live_instances, cloud
FROM inst_colls
WHERE name = 'highcpu';

INSERT INTO pools (`name`, `worker_type`, `worker_cores`, `worker_local_ssd_data_disk`,
`worker_external_ssd_data_disk_size_gb`, `enable_standing_worker`, `standing_worker_cores`,
`preemptible`, `label`)
SELECT 'seqr-standard', worker_type, worker_cores, worker_local_ssd_data_disk,
worker_external_ssd_data_disk_size_gb, FALSE, standing_worker_cores,
TRUE, 'seqr'
FROM pools
WHERE name = 'standard';

INSERT INTO pools (`name`, `worker_type`, `worker_cores`, `worker_local_ssd_data_disk`,
`worker_external_ssd_data_disk_size_gb`, `enable_standing_worker`, `standing_worker_cores`,
`preemptible`, `label`)
SELECT 'seqr-highmem', worker_type, worker_cores, worker_local_ssd_data_disk,
worker_external_ssd_data_disk_size_gb, FALSE, standing_worker_cores,
TRUE, 'seqr'
FROM pools
WHERE name = 'highmem';

INSERT INTO pools (`name`, `worker_type`, `worker_cores`, `worker_local_ssd_data_disk`,
`worker_external_ssd_data_disk_size_gb`, `enable_standing_worker`, `standing_worker_cores`,
`preemptible`, `label`)
SELECT 'seqr-highcpu', worker_type, worker_cores, worker_local_ssd_data_disk,
worker_external_ssd_data_disk_size_gb, FALSE, standing_worker_cores,
TRUE, 'seqr'
FROM pools
WHERE name = 'highcpu';
2 changes: 2 additions & 0 deletions build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2003,6 +2003,8 @@ steps:
script: /io/sql/kill-mjc-deadlocks.sql
- name: add-nonpreemptible-pools
script: /io/sql/add-nonpreemptible-pools.sql
- name: add-pool-label
script: /io/sql/add-pool-label.sql
inputs:
- from: /repo/batch/sql
to: /io/sql
Expand Down
2 changes: 2 additions & 0 deletions hail/python/hailtop/batch/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,8 @@ async def compile_job(job):
resources['storage'] = job._storage
if job._machine_type:
resources['machine_type'] = job._machine_type
if job._pool_label:
resources['pool_label'] = job._pool_label
if job._preemptible is not None:
resources['preemptible'] = job._preemptible

Expand Down
1 change: 1 addition & 0 deletions hail/python/hailtop/batch/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def __init__(self,
self._always_run: bool = False
self._preemptible: Optional[bool] = None
self._machine_type: Optional[str] = None
self._pool_label: Optional[str] = None
self._timeout: Optional[Union[int, float]] = None
self._cloudfuse: List[Tuple[str, str, bool]] = []
self._env: Dict[str, str] = {}
Expand Down