Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Commit

Permalink
Feature: task affinity to master node (#413)
Browse files Browse the repository at this point in the history
  • Loading branch information
jafreck committed Feb 23, 2018
1 parent e188170 commit 146345d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 5 deletions.
9 changes: 9 additions & 0 deletions aztk/spark/helpers/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,20 @@ def generate_task(spark_client, container_id, application):
return task


def affinitize_task_to_master(spark_client, cluster_id, task):
cluster = spark_client.get_cluster(cluster_id)
master_node = spark_client.batch_client.compute_node.get(pool_id=cluster_id, node_id=cluster.master_node_id)
task.affinity_info = batch_models.AffinityInformation(affinity_id=master_node.affinity_id)
return task


def submit_application(spark_client, cluster_id, application, wait: bool = False):
"""
Submit a spark app
"""
task = generate_task(spark_client, cluster_id, application)
task = affinitize_task_to_master(spark_client, cluster_id, task)


# Add task to batch job (which has the same name as cluster_id)
job_id = cluster_id
Expand Down
21 changes: 16 additions & 5 deletions node_scripts/job_submission.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
import sys
import datetime
import os
import yaml
import subprocess
import datetime
import sys
from typing import List
import azure.storage.blob as blob
import azure.batch.models as batch_models
import azure.storage.blob as blob
import yaml
from command_builder import CommandBuilder
from core import config
from install.pick_master import get_master_node_id


def affinitize_task_to_master(batch_client, cluster_id, task):
pool = batch_client.pool.get(config.pool_id)
master_node_id = get_master_node_id(pool)
master_node = batch_client.compute_node.get(pool_id=cluster_id, node_id=master_node_id)
task.affinity_info = batch_models.AffinityInformation(affinity_id=master_node.affinity_id)
return task


def schedule_tasks(tasks_path):
Expand All @@ -16,14 +25,16 @@ def schedule_tasks(tasks_path):
'''
batch_client = config.batch_client
blob_client = config.blob_client

for task_definition in tasks_path:
with open(task_definition, 'r') as stream:
try:
task = yaml.load(stream)
except yaml.YAMLError as exc:
print(exc)

# affinitize task to master
task = affinitize_task_to_master(batch_client, os.environ["AZ_BATCH_POOL_ID"], task)
# schedule the task
batch_client.task.add(job_id=os.environ['AZ_BATCH_JOB_ID'], task=task)

Expand Down

0 comments on commit 146345d

Please sign in to comment.