From 068f803f9fa0f9069c6cb3f9e567d8ec576e6f43 Mon Sep 17 00:00:00 2001 From: Leonhard Gruenschloss Date: Thu, 26 May 2022 16:48:39 +1000 Subject: [PATCH 1/9] Allow specifying the pool name explicitly --- batch/batch/front_end/front_end.py | 6 +++++- batch/batch/inst_coll_config.py | 13 ++++++++++--- hail/python/hailtop/batch/backend.py | 2 ++ hail/python/hailtop/batch/job.py | 1 + 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/batch/batch/front_end/front_end.py b/batch/batch/front_end/front_end.py index f9c9e1ac3d0..000185598db 100644 --- a/batch/batch/front_end/front_end.py +++ b/batch/batch/front_end/front_end.py @@ -886,6 +886,7 @@ async def _create_jobs(userdata: dict, job_specs: dict, batch_id: int, app: aioh worker_type = None machine_type = resources.get('machine_type') + pool_name = resources.get('pool_name') preemptible = resources.get('preemptible', BATCH_JOB_DEFAULT_PREEMPTIBLE) if machine_type and machine_type not in valid_machine_types(cloud): @@ -894,6 +895,9 @@ async def _create_jobs(userdata: dict, job_specs: dict, batch_id: int, app: aioh if machine_type and ('cpu' in resources or 'memory' in resources): raise web.HTTPBadRequest(reason='cannot specify cpu and memory with machine_type') + if machine_type and pool_name: + raise web.HTTPBadRequest(reason='cannot specify pool name with machine_type') + if spec['process']['type'] == 'jvm': jvm_requested_cpu = parse_cpu_in_mcpu(resources.get('cpu', BATCH_JOB_DEFAULT_CPU)) if 'cpu' in resources and jvm_requested_cpu not in (1000, 8000): @@ -961,7 +965,7 @@ async def _create_jobs(userdata: dict, job_specs: dict, batch_id: int, app: aioh inst_coll_configs: InstanceCollectionConfigs = app['inst_coll_configs'] result, exc = inst_coll_configs.select_inst_coll( - cloud, machine_type, preemptible, worker_type, req_cores_mcpu, req_memory_bytes, req_storage_bytes + cloud, machine_type, pool_name, preemptible, worker_type, req_cores_mcpu, req_memory_bytes, req_storage_bytes ) if exc: diff --git a/batch/batch/inst_coll_config.py b/batch/batch/inst_coll_config.py index 53d014da949..9f43da5a972 100644 --- a/batch/batch/inst_coll_config.py +++ b/batch/batch/inst_coll_config.py @@ -273,12 +273,15 @@ async def refresh(self, db: Database): self.resource_rates = resource_rates self.product_versions.update(product_versions_data) - def select_pool_from_cost(self, cloud, cores_mcpu, memory_bytes, storage_bytes, preemptible): + def select_pool_from_cost(self, cloud, pool_name, cores_mcpu, memory_bytes, storage_bytes, preemptible): assert self.resource_rates is not None optimal_result = None optimal_cost = None for pool in self.name_pool_config.values(): + if pool_name and pool.name != pool_name: + continue + if pool.cloud != cloud or pool.preemptible != preemptible: continue @@ -304,8 +307,10 @@ def select_pool_from_cost(self, cloud, cores_mcpu, memory_bytes, storage_bytes, optimal_result = (pool.name, maybe_cores_mcpu, maybe_memory_bytes, maybe_storage_gib) return optimal_result - def select_pool_from_worker_type(self, cloud, worker_type, cores_mcpu, memory_bytes, storage_bytes, preemptible): + def select_pool_from_worker_type(self, cloud, pool_name, worker_type, cores_mcpu, memory_bytes, storage_bytes, preemptible): for pool in self.name_pool_config.values(): + if pool_name and pool.name != pool_name: + continue if pool.cloud == cloud and pool.worker_type == worker_type and pool.preemptible == preemptible: result = pool.convert_requests_to_resources(cores_mcpu, memory_bytes, storage_bytes) if result: @@ -319,11 +324,12 @@ def select_job_private(self, cloud, machine_type, storage_bytes): return self.jpim_config.convert_requests_to_resources(machine_type, storage_bytes) def select_inst_coll( - self, cloud, machine_type, preemptible, worker_type, req_cores_mcpu, req_memory_bytes, req_storage_bytes + self, cloud, machine_type, pool_name, preemptible, worker_type, req_cores_mcpu, req_memory_bytes, req_storage_bytes ): if worker_type is not None and machine_type is None: result = self.select_pool_from_worker_type( cloud=cloud, + pool_name=pool_name, worker_type=worker_type, cores_mcpu=req_cores_mcpu, memory_bytes=req_memory_bytes, @@ -333,6 +339,7 @@ def select_inst_coll( elif worker_type is None and machine_type is None: result = self.select_pool_from_cost( cloud=cloud, + pool_name=pool_name, cores_mcpu=req_cores_mcpu, memory_bytes=req_memory_bytes, storage_bytes=req_storage_bytes, diff --git a/hail/python/hailtop/batch/backend.py b/hail/python/hailtop/batch/backend.py index 9d6c0d75de4..cf939bdbb18 100644 --- a/hail/python/hailtop/batch/backend.py +++ b/hail/python/hailtop/batch/backend.py @@ -657,6 +657,8 @@ async def compile_job(job): resources['storage'] = job._storage if job._machine_type: resources['machine_type'] = job._machine_type + if job._pool_name: + resources['pool_name'] = job._pool_name if job._preemptible is not None: resources['preemptible'] = job._preemptible diff --git a/hail/python/hailtop/batch/job.py b/hail/python/hailtop/batch/job.py index b13d960843a..8cb1f98a933 100644 --- a/hail/python/hailtop/batch/job.py +++ b/hail/python/hailtop/batch/job.py @@ -79,6 +79,7 @@ def __init__(self, self._always_run: bool = False self._preemptible: Optional[bool] = None self._machine_type: Optional[str] = None + self._pool_name: Optional[str] = None self._timeout: Optional[Union[int, float]] = None self._cloudfuse: List[Tuple[str, str, bool]] = [] self._env: Dict[str, str] = {} From d328bcfc73864cdf2152e87e6ebdfa70246679ba Mon Sep 17 00:00:00 2001 From: Leonhard Gruenschloss Date: Thu, 26 May 2022 16:55:49 +1000 Subject: [PATCH 2/9] Add SQL for adding seqr-specific pools --- batch/sql/add-seqr-pools.sql | 42 ++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 batch/sql/add-seqr-pools.sql diff --git a/batch/sql/add-seqr-pools.sql b/batch/sql/add-seqr-pools.sql new file mode 100644 index 00000000000..3bcaa5ed952 --- /dev/null +++ b/batch/sql/add-seqr-pools.sql @@ -0,0 +1,42 @@ + +INSERT INTO inst_colls (`name`, `is_pool`, `boot_disk_size_gb`, `max_instances`, `max_live_instances`, `cloud`) +SELECT 'seqr-standard', 1, boot_disk_size_gb, max_instances, max_live_instances, cloud +FROM inst_colls +WHERE name = 'standard'; + +INSERT INTO inst_colls (`name`, `is_pool`, `boot_disk_size_gb`, `max_instances`, `max_live_instances`, `cloud`) +SELECT 'seqr-highmem', 1, boot_disk_size_gb, max_instances, max_live_instances, cloud +FROM inst_colls +WHERE name = 'highmem'; + +INSERT INTO inst_colls (`name`, `is_pool`, `boot_disk_size_gb`, `max_instances`, `max_live_instances`, `cloud`) +SELECT 'seqr-highcpu', 1, boot_disk_size_gb, max_instances, max_live_instances, cloud +FROM inst_colls +WHERE name = 'highcpu'; + +INSERT INTO pools (`name`, `worker_type`, `worker_cores`, `worker_local_ssd_data_disk`, + `worker_external_ssd_data_disk_size_gb`, `enable_standing_worker`, `standing_worker_cores`, + `preemptible`) +SELECT 'seqr-standard', worker_type, worker_cores, worker_local_ssd_data_disk, + worker_external_ssd_data_disk_size_gb, enable_standing_worker, standing_worker_cores, + TRUE +FROM pools +WHERE name = 'standard'; + +INSERT INTO pools (`name`, `worker_type`, `worker_cores`, `worker_local_ssd_data_disk`, + `worker_external_ssd_data_disk_size_gb`, `enable_standing_worker`, `standing_worker_cores`, + `preemptible`) +SELECT 'seqr-highmem', worker_type, worker_cores, worker_local_ssd_data_disk, + worker_external_ssd_data_disk_size_gb, enable_standing_worker, standing_worker_cores, + TRUE +FROM pools +WHERE name = 'highmem'; + +INSERT INTO pools (`name`, `worker_type`, `worker_cores`, `worker_local_ssd_data_disk`, + `worker_external_ssd_data_disk_size_gb`, `enable_standing_worker`, `standing_worker_cores`, + `preemptible`) +SELECT 'seqr-highcpu', worker_type, worker_cores, worker_local_ssd_data_disk, + worker_external_ssd_data_disk_size_gb, enable_standing_worker, standing_worker_cores, + TRUE +FROM pools +WHERE name = 'highcpu'; From ecfc93e18600fd64ddca260018008f3f0732c4e1 Mon Sep 17 00:00:00 2001 From: Leonhard Gruenschloss Date: Thu, 26 May 2022 17:51:51 +1000 Subject: [PATCH 3/9] Allow pool name prefix --- batch/batch/front_end/front_end.py | 6 +++--- batch/batch/inst_coll_config.py | 14 +++++++------- batch/sql/add-seqr-pools.sql | 6 +++--- hail/python/hailtop/batch/backend.py | 4 ++-- hail/python/hailtop/batch/job.py | 2 +- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/batch/batch/front_end/front_end.py b/batch/batch/front_end/front_end.py index 000185598db..0629ee1d505 100644 --- a/batch/batch/front_end/front_end.py +++ b/batch/batch/front_end/front_end.py @@ -886,7 +886,7 @@ async def _create_jobs(userdata: dict, job_specs: dict, batch_id: int, app: aioh worker_type = None machine_type = resources.get('machine_type') - pool_name = resources.get('pool_name') + pool_name_prefix = resources.get('pool_name_prefix') preemptible = resources.get('preemptible', BATCH_JOB_DEFAULT_PREEMPTIBLE) if machine_type and machine_type not in valid_machine_types(cloud): @@ -895,7 +895,7 @@ async def _create_jobs(userdata: dict, job_specs: dict, batch_id: int, app: aioh if machine_type and ('cpu' in resources or 'memory' in resources): raise web.HTTPBadRequest(reason='cannot specify cpu and memory with machine_type') - if machine_type and pool_name: + if machine_type and pool_name_prefix: raise web.HTTPBadRequest(reason='cannot specify pool name with machine_type') if spec['process']['type'] == 'jvm': @@ -965,7 +965,7 @@ async def _create_jobs(userdata: dict, job_specs: dict, batch_id: int, app: aioh inst_coll_configs: InstanceCollectionConfigs = app['inst_coll_configs'] result, exc = inst_coll_configs.select_inst_coll( - cloud, machine_type, pool_name, preemptible, worker_type, req_cores_mcpu, req_memory_bytes, req_storage_bytes + cloud, machine_type, pool_name_prefix, preemptible, worker_type, req_cores_mcpu, req_memory_bytes, req_storage_bytes ) if exc: diff --git a/batch/batch/inst_coll_config.py b/batch/batch/inst_coll_config.py index 9f43da5a972..74cf88f0679 100644 --- a/batch/batch/inst_coll_config.py +++ b/batch/batch/inst_coll_config.py @@ -273,13 +273,13 @@ async def refresh(self, db: Database): self.resource_rates = resource_rates self.product_versions.update(product_versions_data) - def select_pool_from_cost(self, cloud, pool_name, cores_mcpu, memory_bytes, storage_bytes, preemptible): + def select_pool_from_cost(self, cloud, pool_name_prefix, cores_mcpu, memory_bytes, storage_bytes, preemptible): assert self.resource_rates is not None optimal_result = None optimal_cost = None for pool in self.name_pool_config.values(): - if pool_name and pool.name != pool_name: + if pool_name_prefix and not pool.name.startswith(pool_name_prefix): continue if pool.cloud != cloud or pool.preemptible != preemptible: @@ -307,9 +307,9 @@ def select_pool_from_cost(self, cloud, pool_name, cores_mcpu, memory_bytes, stor optimal_result = (pool.name, maybe_cores_mcpu, maybe_memory_bytes, maybe_storage_gib) return optimal_result - def select_pool_from_worker_type(self, cloud, pool_name, worker_type, cores_mcpu, memory_bytes, storage_bytes, preemptible): + def select_pool_from_worker_type(self, cloud, pool_name_prefix, worker_type, cores_mcpu, memory_bytes, storage_bytes, preemptible): for pool in self.name_pool_config.values(): - if pool_name and pool.name != pool_name: + if pool_name_prefix and not pool.name.startswith(pool_name_prefix): continue if pool.cloud == cloud and pool.worker_type == worker_type and pool.preemptible == preemptible: result = pool.convert_requests_to_resources(cores_mcpu, memory_bytes, storage_bytes) @@ -324,12 +324,12 @@ def select_job_private(self, cloud, machine_type, storage_bytes): return self.jpim_config.convert_requests_to_resources(machine_type, storage_bytes) def select_inst_coll( - self, cloud, machine_type, pool_name, preemptible, worker_type, req_cores_mcpu, req_memory_bytes, req_storage_bytes + self, cloud, machine_type, pool_name_prefix, preemptible, worker_type, req_cores_mcpu, req_memory_bytes, req_storage_bytes ): if worker_type is not None and machine_type is None: result = self.select_pool_from_worker_type( cloud=cloud, - pool_name=pool_name, + pool_name_prefix=pool_name_prefix, worker_type=worker_type, cores_mcpu=req_cores_mcpu, memory_bytes=req_memory_bytes, @@ -339,7 +339,7 @@ def select_inst_coll( elif worker_type is None and machine_type is None: result = self.select_pool_from_cost( cloud=cloud, - pool_name=pool_name, + pool_name_prefix=pool_name_prefix, cores_mcpu=req_cores_mcpu, memory_bytes=req_memory_bytes, storage_bytes=req_storage_bytes, diff --git a/batch/sql/add-seqr-pools.sql b/batch/sql/add-seqr-pools.sql index 3bcaa5ed952..7a4de2a0664 100644 --- a/batch/sql/add-seqr-pools.sql +++ b/batch/sql/add-seqr-pools.sql @@ -18,7 +18,7 @@ INSERT INTO pools (`name`, `worker_type`, `worker_cores`, `worker_local_ssd_data `worker_external_ssd_data_disk_size_gb`, `enable_standing_worker`, `standing_worker_cores`, `preemptible`) SELECT 'seqr-standard', worker_type, worker_cores, worker_local_ssd_data_disk, - worker_external_ssd_data_disk_size_gb, enable_standing_worker, standing_worker_cores, + worker_external_ssd_data_disk_size_gb, FALSE, standing_worker_cores, TRUE FROM pools WHERE name = 'standard'; @@ -27,7 +27,7 @@ INSERT INTO pools (`name`, `worker_type`, `worker_cores`, `worker_local_ssd_data `worker_external_ssd_data_disk_size_gb`, `enable_standing_worker`, `standing_worker_cores`, `preemptible`) SELECT 'seqr-highmem', worker_type, worker_cores, worker_local_ssd_data_disk, - worker_external_ssd_data_disk_size_gb, enable_standing_worker, standing_worker_cores, + worker_external_ssd_data_disk_size_gb, FALSE, standing_worker_cores, TRUE FROM pools WHERE name = 'highmem'; @@ -36,7 +36,7 @@ INSERT INTO pools (`name`, `worker_type`, `worker_cores`, `worker_local_ssd_data `worker_external_ssd_data_disk_size_gb`, `enable_standing_worker`, `standing_worker_cores`, `preemptible`) SELECT 'seqr-highcpu', worker_type, worker_cores, worker_local_ssd_data_disk, - worker_external_ssd_data_disk_size_gb, enable_standing_worker, standing_worker_cores, + worker_external_ssd_data_disk_size_gb, FALSE, standing_worker_cores, TRUE FROM pools WHERE name = 'highcpu'; diff --git a/hail/python/hailtop/batch/backend.py b/hail/python/hailtop/batch/backend.py index cf939bdbb18..05a51ecfae5 100644 --- a/hail/python/hailtop/batch/backend.py +++ b/hail/python/hailtop/batch/backend.py @@ -657,8 +657,8 @@ async def compile_job(job): resources['storage'] = job._storage if job._machine_type: resources['machine_type'] = job._machine_type - if job._pool_name: - resources['pool_name'] = job._pool_name + if job._pool_name_prefix: + resources['pool_name_prefix'] = job._pool_name_prefix if job._preemptible is not None: resources['preemptible'] = job._preemptible diff --git a/hail/python/hailtop/batch/job.py b/hail/python/hailtop/batch/job.py index 8cb1f98a933..dbf986b8623 100644 --- a/hail/python/hailtop/batch/job.py +++ b/hail/python/hailtop/batch/job.py @@ -79,7 +79,7 @@ def __init__(self, self._always_run: bool = False self._preemptible: Optional[bool] = None self._machine_type: Optional[str] = None - self._pool_name: Optional[str] = None + self._pool_name_prefix: Optional[str] = None self._timeout: Optional[Union[int, float]] = None self._cloudfuse: List[Tuple[str, str, bool]] = [] self._env: Dict[str, str] = {} From b3564a26352d729a256d314d67f457f9e0b1c190 Mon Sep 17 00:00:00 2001 From: Leonhard Gruenschloss Date: Thu, 26 May 2022 19:07:10 +1000 Subject: [PATCH 4/9] Use pool labels --- .../batch/driver/instance_collection/pool.py | 3 +++ batch/batch/driver/main.py | 1 + batch/batch/driver/templates/pool.html | 1 + batch/batch/front_end/front_end.py | 8 +++--- batch/batch/inst_coll_config.py | 26 +++++++++---------- batch/sql/add-pool-label.sql | 1 + batch/sql/add-seqr-pools.sql | 13 +++++----- build.yaml | 2 ++ hail/python/hailtop/batch/backend.py | 4 +-- hail/python/hailtop/batch/job.py | 2 +- 10 files changed, 35 insertions(+), 26 deletions(-) create mode 100644 batch/sql/add-pool-label.sql diff --git a/batch/batch/driver/instance_collection/pool.py b/batch/batch/driver/instance_collection/pool.py index 98c2549b837..8fb519881d9 100644 --- a/batch/batch/driver/instance_collection/pool.py +++ b/batch/batch/driver/instance_collection/pool.py @@ -109,6 +109,7 @@ def __init__( self.data_disk_size_gb = config.data_disk_size_gb self.data_disk_size_standing_gb = config.data_disk_size_standing_gb self.preemptible = config.preemptible + self.label = config.label @property def local_ssd_data_disk(self) -> bool: @@ -130,6 +131,7 @@ def config(self): 'max_instances': self.max_instances, 'max_live_instances': self.max_live_instances, 'preemptible': self.preemptible, + 'label': self.label, } def configure(self, pool_config: PoolConfig): @@ -148,6 +150,7 @@ def configure(self, pool_config: PoolConfig): self.max_instances = pool_config.max_instances self.max_live_instances = pool_config.max_live_instances self.preemptible = pool_config.preemptible + self.label = pool_config.label def adjust_for_remove_instance(self, instance): super().adjust_for_remove_instance(instance) diff --git a/batch/batch/driver/main.py b/batch/batch/driver/main.py index e80b726072e..4084aa726d0 100644 --- a/batch/batch/driver/main.py +++ b/batch/batch/driver/main.py @@ -537,6 +537,7 @@ async def pool_config_update(request, userdata): # pylint: disable=unused-argum max_instances, max_live_instances, pool.preemptible, + pool.label, ) await pool_config.update_database(db) pool.configure(pool_config) diff --git a/batch/batch/driver/templates/pool.html b/batch/batch/driver/templates/pool.html index 19c84302980..23a5d706362 100644 --- a/batch/batch/driver/templates/pool.html +++ b/batch/batch/driver/templates/pool.html @@ -31,6 +31,7 @@

Configuration

Standing worker cores:
Max instances:
Max live instances:
+
Label: