Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Bug: fix job submission cluster data issues #533

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions aztk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ def __submit_job(self,
:param vm_image_model -> aztk_sdk.models.VmImage
:returns None
"""
self._get_cluster_data(job_configuration.id).save_cluster_config(job_configuration.to_cluster_config())

# get a verified node agent sku
sku_to_use, image_ref_to_use = \
helpers.select_latest_verified_vm_image_with_node_agent_sku(
Expand Down
1 change: 1 addition & 0 deletions aztk/node_scripts/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
batch_resource_id = os.environ.get("SP_BATCH_RESOURCE_ID")
storage_resource_id = os.environ.get("SP_STORAGE_RESOURCE_ID")

cluster_id = os.environ.get("AZTK_CLUSTER_ID")
pool_id = os.environ["AZ_BATCH_POOL_ID"]
node_id = os.environ["AZ_BATCH_NODE_ID"]
is_dedicated = os.environ["AZ_BATCH_NODE_IS_DEDICATED"]
Expand Down
2 changes: 1 addition & 1 deletion aztk/node_scripts/install/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from aztk.internal import cluster_data

def read_cluster_config():
data = cluster_data.ClusterData(config.blob_client, config.pool_id)
data = cluster_data.ClusterData(config.blob_client, config.cluster_id)
cluster_config = data.read_cluster_config()
print("Got cluster config", cluster_config)
return cluster_config
Expand Down
2 changes: 2 additions & 0 deletions aztk/spark/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool =

start_task = create_cluster_helper.generate_cluster_start_task(self,
zip_resource_files,
cluster_conf.cluster_id,
cluster_conf.gpu_enabled(),
cluster_conf.docker_repo,
cluster_conf.file_shares,
Expand Down Expand Up @@ -192,6 +193,7 @@ def submit_job(self, job_configuration):

start_task = create_cluster_helper.generate_cluster_start_task(self,
zip_resource_files,
job_configuration.id,
job_configuration.gpu_enabled,
job_configuration.docker_repo,
mixed_mode=job_configuration.mixed_mode(),
Expand Down
6 changes: 4 additions & 2 deletions aztk/spark/helpers/create_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
scope=batch_models.AutoUserScope.pool,
elevation_level=batch_models.ElevationLevel.admin))

def _get_aztk_environment(worker_on_master, mixed_mode):
def _get_aztk_environment(cluster_id, worker_on_master, mixed_mode):
envs = []
envs.append(batch_models.EnvironmentSetting(name="AZTK_MIXED_MODE", value=helpers.bool_env(mixed_mode)))
envs.append(batch_models.EnvironmentSetting(
name="AZTK_WORKER_ON_MASTER", value=helpers.bool_env(worker_on_master)))
envs.append(batch_models.EnvironmentSetting(name="AZTK_CLUSTER_ID", value=cluster_id))
return envs

def __get_docker_credentials(spark_client):
Expand Down Expand Up @@ -115,6 +116,7 @@ def __cluster_install_cmd(zip_resource_file: batch_models.ResourceFile,
def generate_cluster_start_task(
spark_client,
zip_resource_file: batch_models.ResourceFile,
cluster_id: str,
gpu_enabled: bool,
docker_repo: str = None,
file_shares: List[aztk_models.FileShare] = None,
Expand Down Expand Up @@ -149,7 +151,7 @@ def generate_cluster_start_task(
name="SPARK_SUBMIT_LOGS_FILE", value=spark_submit_logs_file),
batch_models.EnvironmentSetting(
name="AZTK_GPU_ENABLED", value=helpers.bool_env(gpu_enabled)),
] + __get_docker_credentials(spark_client) + _get_aztk_environment(worker_on_master, mixed_mode)
] + __get_docker_credentials(spark_client) + _get_aztk_environment(cluster_id, worker_on_master, mixed_mode)

# start task command
command = __cluster_install_cmd(zip_resource_file, gpu_enabled, docker_repo, plugins, worker_on_master, file_shares, mixed_mode)
Expand Down