Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Commit

Permalink
Bug: add support for jars, pyfiles, files in Jobs (#408)
Browse files Browse the repository at this point in the history
* add support for jars, pyfiles, files, refactor JobConfig

* set encoding explicitly

* fix typerror bug in mixed_mode()
  • Loading branch information
jafreck committed Mar 26, 2018
1 parent 5761a36 commit 2dd7891
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 35 deletions.
3 changes: 3 additions & 0 deletions aztk/node_scripts/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ def __app_submit_cmd(
os.environ['AZ_BATCH_TASK_WORKING_DIR'] + '/' + app + ' ' +
' '.join(['\'' + str(app_arg) + '\'' for app_arg in (app_args or [])]))

with open("spark-submit.txt", mode="w", encoding="UTF-8") as stream:
stream.write(spark_submit_cmd.to_str())

return spark_submit_cmd


Expand Down
4 changes: 2 additions & 2 deletions aztk/spark/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ def __init__(
custom_scripts=None,
spark_configuration=None,
docker_repo=None,
max_dedicated_nodes=None,
max_low_pri_nodes=None,
max_dedicated_nodes=0,
max_low_pri_nodes=0,
subnet_id=None,
worker_on_master=None):
self.id = id
Expand Down
44 changes: 33 additions & 11 deletions aztk_cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,8 @@ def __init__(self):
self.spark_configuration = None
self.vm_size = None
self.docker_repo = None
self.max_dedicated_nodes = None
self.max_low_pri_nodes = None
self.max_dedicated_nodes = 0
self.max_low_pri_nodes = 0
self.spark_defaults_conf = None
self.spark_env_sh = None
self.core_site_xml = None
Expand All @@ -327,13 +327,37 @@ def _merge_dict(self, config):
if cluster_configuration:
self.vm_size = cluster_configuration.get('vm_size')
self.docker_repo = cluster_configuration.get('docker_repo')
self.max_dedicated_nodes = cluster_configuration.get('size')
self.max_low_pri_nodes = cluster_configuration.get('size_low_pri')
if cluster_configuration.get('size') is not None:
self.max_dedicated_nodes = cluster_configuration.get('size')
if cluster_configuration.get('size_low_pri') is not None:
print("max_low_pri nodes is None")
self.max_low_pri_nodes = cluster_configuration.get('size_low_pri')
self.custom_scripts = cluster_configuration.get('custom_scripts')
self.subnet_id = cluster_configuration.get('subnet_id')
self.worker_on_master = cluster_configuration.get("worker_on_master")

self.applications = config.get('applications')
applications = config.get('applications')
if applications:
self.applications = []
for application in applications:
self.applications.append(
aztk.spark.models.ApplicationConfiguration(
name=application.get('name'),
application=application.get('application'),
application_args=application.get('application_args'),
main_class=application.get('main_class'),
jars=application.get('jars'),
py_files=application.get('py_files'),
files=application.get('files'),
driver_java_options=application.get('driver_java_options'),
driver_library_path=application.get('driver_library_path'),
driver_class_path=application.get('driver_class_path'),
driver_memory=application.get('driver_memory'),
executor_memory=application.get('executor_memory'),
driver_cores=application.get('driver_cores'),
executor_cores=application.get('executor_cores')
)
)

spark_configuration = config.get('spark_configuration')
if spark_configuration:
Expand Down Expand Up @@ -379,14 +403,12 @@ def merge(self, id, job_config_yaml=None):
self.id = id

for entry in self.applications:
if entry['name'] is None:
if entry.name is None:
raise aztk.error.AztkError(
"Application specified with no name. Please verify your configuration in job.yaml"
)
if entry['application'] is None:
"Application specified with no name. Please verify your configuration in job.yaml")
if entry.application is None:
raise aztk.error.AztkError(
"No path to application specified for {} in job.yaml".
format(entry['name']))
"No path to application specified for {} in job.yaml".format(entry.name))


def get_file_if_exists(file):
Expand Down
23 changes: 1 addition & 22 deletions aztk_cli/spark/endpoints/job/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,6 @@ def execute(args: typing.NamedTuple):

job_conf.merge(args.job_id, args.job_conf)

aztk_applications = []
for application in job_conf.applications:
aztk_applications.append(
aztk.spark.models.ApplicationConfiguration(
name=application.get('name'),
application=application.get('application'),
application_args=application.get('application_args'),
main_class=application.get('main_class'),
jars=[],
py_files=[],
files=[],
driver_java_options=application.get('driver_java_options'),
driver_library_path=application.get('driver_library_path'),
driver_class_path=application.get('driver_class_path'),
driver_memory=application.get('driver_memory'),
executor_memory=application.get('executor_memory'),
driver_cores=application.get('driver_cores'),
executor_cores=application.get('executor_cores')
)
)

# by default, load spark configuration files in .aztk/
spark_configuration = config.load_aztk_spark_config()
# overwrite with values in job_conf if they exist
Expand All @@ -56,7 +35,7 @@ def execute(args: typing.NamedTuple):

job_configuration = aztk.spark.models.JobConfiguration(
id=job_conf.id,
applications=aztk_applications,
applications=job_conf.applications,
custom_scripts=job_conf.custom_scripts,
spark_configuration=spark_configuration,
vm_size=job_conf.vm_size,
Expand Down

0 comments on commit 2dd7891

Please sign in to comment.