Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Bug: fix job submission cluster data issues #533

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions aztk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ def __submit_job(self,
:param vm_image_model -> aztk_sdk.models.VmImage
:returns None
"""
self._get_cluster_data(job_configuration.id).save_cluster_config(job_configuration.to_cluster_config())

# get a verified node agent sku
sku_to_use, image_ref_to_use = \
helpers.select_latest_verified_vm_image_with_node_agent_sku(
Expand Down
1 change: 1 addition & 0 deletions aztk/node_scripts/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
batch_resource_id = os.environ.get("SP_BATCH_RESOURCE_ID")
storage_resource_id = os.environ.get("SP_STORAGE_RESOURCE_ID")

cluster_id = os.environ.get("AZTK_CLUSTER_ID")
pool_id = os.environ["AZ_BATCH_POOL_ID"]
node_id = os.environ["AZ_BATCH_NODE_ID"]
is_dedicated = os.environ["AZ_BATCH_NODE_IS_DEDICATED"]
Expand Down
2 changes: 1 addition & 1 deletion aztk/node_scripts/install/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from aztk.internal import cluster_data

def read_cluster_config():
data = cluster_data.ClusterData(config.blob_client, config.pool_id)
data = cluster_data.ClusterData(config.blob_client, config.cluster_id)
cluster_config = data.read_cluster_config()
print("Got cluster config", cluster_config)
return cluster_config
Expand Down
2 changes: 2 additions & 0 deletions aztk/spark/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool =

start_task = create_cluster_helper.generate_cluster_start_task(self,
zip_resource_files,
cluster_conf.cluster_id,
cluster_conf.gpu_enabled(),
cluster_conf.docker_repo,
cluster_conf.file_shares,
Expand Down Expand Up @@ -192,6 +193,7 @@ def submit_job(self, job_configuration):

start_task = create_cluster_helper.generate_cluster_start_task(self,
zip_resource_files,
job_configuration.id,
job_configuration.gpu_enabled,
job_configuration.docker_repo,
mixed_mode=job_configuration.mixed_mode(),
Expand Down
6 changes: 4 additions & 2 deletions aztk/spark/helpers/create_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
scope=batch_models.AutoUserScope.pool,
elevation_level=batch_models.ElevationLevel.admin))

def _get_aztk_environment(worker_on_master, mixed_mode):
def _get_aztk_environment(cluster_id, worker_on_master, mixed_mode):
envs = []
envs.append(batch_models.EnvironmentSetting(name="AZTK_MIXED_MODE", value=helpers.bool_env(mixed_mode)))
envs.append(batch_models.EnvironmentSetting(
name="AZTK_WORKER_ON_MASTER", value=helpers.bool_env(worker_on_master)))
envs.append(batch_models.EnvironmentSetting(name="AZTK_CLUSTER_ID", value=cluster_id))
return envs

def __get_docker_credentials(spark_client):
Expand Down Expand Up @@ -115,6 +116,7 @@ def __cluster_install_cmd(zip_resource_file: batch_models.ResourceFile,
def generate_cluster_start_task(
spark_client,
zip_resource_file: batch_models.ResourceFile,
cluster_id: str,
gpu_enabled: bool,
docker_repo: str = None,
file_shares: List[aztk_models.FileShare] = None,
Expand Down Expand Up @@ -149,7 +151,7 @@ def generate_cluster_start_task(
name="SPARK_SUBMIT_LOGS_FILE", value=spark_submit_logs_file),
batch_models.EnvironmentSetting(
name="AZTK_GPU_ENABLED", value=helpers.bool_env(gpu_enabled)),
] + __get_docker_credentials(spark_client) + _get_aztk_environment(worker_on_master, mixed_mode)
] + __get_docker_credentials(spark_client) + _get_aztk_environment(cluster_id, worker_on_master, mixed_mode)

# start task command
command = __cluster_install_cmd(zip_resource_file, gpu_enabled, docker_repo, plugins, worker_on_master, file_shares, mixed_mode)
Expand Down