From 2dd7891499335d26bd57fb1ba6a6983db0278102 Mon Sep 17 00:00:00 2001 From: Jacob Freck Date: Mon, 26 Mar 2018 11:38:05 -0700 Subject: [PATCH] Bug: add support for jars, pyfiles, files in Jobs (#408) * add support for jars, pyfiles, files, refactor JobConfig * set encoding explicitly * fix typerror bug in mixed_mode() --- aztk/node_scripts/submit.py | 3 ++ aztk/spark/models/models.py | 4 +-- aztk_cli/config.py | 44 +++++++++++++++++++------- aztk_cli/spark/endpoints/job/submit.py | 23 +------------- 4 files changed, 39 insertions(+), 35 deletions(-) diff --git a/aztk/node_scripts/submit.py b/aztk/node_scripts/submit.py index b8b99633..8f799e7d 100644 --- a/aztk/node_scripts/submit.py +++ b/aztk/node_scripts/submit.py @@ -117,6 +117,9 @@ def __app_submit_cmd( os.environ['AZ_BATCH_TASK_WORKING_DIR'] + '/' + app + ' ' + ' '.join(['\'' + str(app_arg) + '\'' for app_arg in (app_args or [])])) + with open("spark-submit.txt", mode="w", encoding="UTF-8") as stream: + stream.write(spark_submit_cmd.to_str()) + return spark_submit_cmd diff --git a/aztk/spark/models/models.py b/aztk/spark/models/models.py index 01fde293..d0c20173 100644 --- a/aztk/spark/models/models.py +++ b/aztk/spark/models/models.py @@ -207,8 +207,8 @@ def __init__( custom_scripts=None, spark_configuration=None, docker_repo=None, - max_dedicated_nodes=None, - max_low_pri_nodes=None, + max_dedicated_nodes=0, + max_low_pri_nodes=0, subnet_id=None, worker_on_master=None): self.id = id diff --git a/aztk_cli/config.py b/aztk_cli/config.py index 716e2e48..7abc810a 100644 --- a/aztk_cli/config.py +++ b/aztk_cli/config.py @@ -309,8 +309,8 @@ def __init__(self): self.spark_configuration = None self.vm_size = None self.docker_repo = None - self.max_dedicated_nodes = None - self.max_low_pri_nodes = None + self.max_dedicated_nodes = 0 + self.max_low_pri_nodes = 0 self.spark_defaults_conf = None self.spark_env_sh = None self.core_site_xml = None @@ -327,13 +327,37 @@ def _merge_dict(self, config): if cluster_configuration: self.vm_size = cluster_configuration.get('vm_size') self.docker_repo = cluster_configuration.get('docker_repo') - self.max_dedicated_nodes = cluster_configuration.get('size') - self.max_low_pri_nodes = cluster_configuration.get('size_low_pri') + if cluster_configuration.get('size') is not None: + self.max_dedicated_nodes = cluster_configuration.get('size') + if cluster_configuration.get('size_low_pri') is not None: + print("max_low_pri nodes is None") + self.max_low_pri_nodes = cluster_configuration.get('size_low_pri') self.custom_scripts = cluster_configuration.get('custom_scripts') self.subnet_id = cluster_configuration.get('subnet_id') self.worker_on_master = cluster_configuration.get("worker_on_master") - self.applications = config.get('applications') + applications = config.get('applications') + if applications: + self.applications = [] + for application in applications: + self.applications.append( + aztk.spark.models.ApplicationConfiguration( + name=application.get('name'), + application=application.get('application'), + application_args=application.get('application_args'), + main_class=application.get('main_class'), + jars=application.get('jars'), + py_files=application.get('py_files'), + files=application.get('files'), + driver_java_options=application.get('driver_java_options'), + driver_library_path=application.get('driver_library_path'), + driver_class_path=application.get('driver_class_path'), + driver_memory=application.get('driver_memory'), + executor_memory=application.get('executor_memory'), + driver_cores=application.get('driver_cores'), + executor_cores=application.get('executor_cores') + ) + ) spark_configuration = config.get('spark_configuration') if spark_configuration: @@ -379,14 +403,12 @@ def merge(self, id, job_config_yaml=None): self.id = id for entry in self.applications: - if entry['name'] is None: + if entry.name is None: raise aztk.error.AztkError( - "Application specified with no name. Please verify your configuration in job.yaml" - ) - if entry['application'] is None: + "Application specified with no name. Please verify your configuration in job.yaml") + if entry.application is None: raise aztk.error.AztkError( - "No path to application specified for {} in job.yaml". - format(entry['name'])) + "No path to application specified for {} in job.yaml".format(entry.name)) def get_file_if_exists(file): diff --git a/aztk_cli/spark/endpoints/job/submit.py b/aztk_cli/spark/endpoints/job/submit.py index 1f3407b0..6dd70990 100644 --- a/aztk_cli/spark/endpoints/job/submit.py +++ b/aztk_cli/spark/endpoints/job/submit.py @@ -23,27 +23,6 @@ def execute(args: typing.NamedTuple): job_conf.merge(args.job_id, args.job_conf) - aztk_applications = [] - for application in job_conf.applications: - aztk_applications.append( - aztk.spark.models.ApplicationConfiguration( - name=application.get('name'), - application=application.get('application'), - application_args=application.get('application_args'), - main_class=application.get('main_class'), - jars=[], - py_files=[], - files=[], - driver_java_options=application.get('driver_java_options'), - driver_library_path=application.get('driver_library_path'), - driver_class_path=application.get('driver_class_path'), - driver_memory=application.get('driver_memory'), - executor_memory=application.get('executor_memory'), - driver_cores=application.get('driver_cores'), - executor_cores=application.get('executor_cores') - ) - ) - # by default, load spark configuration files in .aztk/ spark_configuration = config.load_aztk_spark_config() # overwrite with values in job_conf if they exist @@ -56,7 +35,7 @@ def execute(args: typing.NamedTuple): job_configuration = aztk.spark.models.JobConfiguration( id=job_conf.id, - applications=aztk_applications, + applications=job_conf.applications, custom_scripts=job_conf.custom_scripts, spark_configuration=spark_configuration, vm_size=job_conf.vm_size,