Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Bug: Load default Jars for job submission CLI #367

Merged
merged 3 commits into from
Feb 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 22 additions & 21 deletions cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ def _merge_dict(self, config):
self.spark_defaults_conf = self.__convert_to_path(spark_configuration.get('spark_defaults_conf'))
self.spark_env_sh = self.__convert_to_path(spark_configuration.get('spark_env_sh'))
self.core_site_xml = self.__convert_to_path(spark_configuration.get('core_site_xml'))
self.jars = [self.__convert_to_path(jar) for jar in spark_configuration.get('jars')]

def __convert_to_path(self, str_path):
if str_path:
Expand Down Expand Up @@ -409,16 +410,28 @@ def merge(self, id, job_config_yaml=None):
"No path to application specified for {} in job.yaml".format(entry['name']))


def get_file_if_exists(file):
local_conf_file = os.path.join(aztk.utils.constants.DEFAULT_SPARK_CONF_SOURCE, file)
global_conf_file = os.path.join(aztk.utils.constants.GLOBAL_CONFIG_PATH, file)

if os.path.exists(local_conf_file):
return local_conf_file
if os.path.exists(global_conf_file):
return global_conf_file

return None


def load_aztk_spark_config():
def get_file_if_exists(file, local: bool):
if local:
if os.path.exists(os.path.join(aztk.utils.constants.DEFAULT_SPARK_CONF_SOURCE, file)):
return os.path.join(aztk.utils.constants.DEFAULT_SPARK_CONF_SOURCE, file)
else:
if os.path.exists(os.path.join(aztk.utils.constants.GLOBAL_CONFIG_PATH, file)):
return os.path.join(aztk.utils.constants.GLOBAL_CONFIG_PATH, file)
return aztk.spark.models.SparkConfiguration(
spark_defaults_conf=get_file_if_exists('spark-defaults.conf'),
jars=load_jars(),
spark_env_sh=get_file_if_exists('spark-env.sh'),
core_site_xml=get_file_if_exists('core-site.xml'))

jars = spark_defaults_conf = spark_env_sh = core_site_xml = None

def load_jars():
jars = None

# try load global
try:
Expand All @@ -428,10 +441,6 @@ def get_file_if_exists(file, local: bool):
except FileNotFoundError:
pass

spark_defaults_conf = get_file_if_exists('spark-defaults.conf', False)
spark_env_sh = get_file_if_exists('spark-env.sh', False)
core_site_xml = get_file_if_exists('core-site.xml', False)

# try load local, overwrite if found
try:
jars_src = os.path.join(
Expand All @@ -440,12 +449,4 @@ def get_file_if_exists(file, local: bool):
except FileNotFoundError:
pass

spark_defaults_conf = get_file_if_exists('spark-defaults.conf', True)
spark_env_sh = get_file_if_exists('spark-env.sh', True)
core_site_xml = get_file_if_exists('core-site.xml', True)

return aztk.spark.models.SparkConfiguration(
spark_defaults_conf=spark_defaults_conf,
jars=jars,
spark_env_sh=spark_env_sh,
core_site_xml=core_site_xml)
return jars
18 changes: 12 additions & 6 deletions cli/spark/endpoints/job/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
import aztk.spark
from cli import config, utils, log
from cli.config import JobConfig
from cli.config import JobConfig, load_aztk_spark_config


def setup_parser(parser: argparse.ArgumentParser):
Expand Down Expand Up @@ -43,11 +43,17 @@ def execute(args: typing.NamedTuple):
executor_cores=application.get('executor_cores')
)
)
spark_configuration = aztk.spark.models.SparkConfiguration(
spark_defaults_conf=job_conf.spark_defaults_conf,
spark_env_sh=job_conf.spark_env_sh,
core_site_xml=job_conf.core_site_xml
)

# by default, load spark configuration files in .aztk/
spark_configuration = config.load_aztk_spark_config()
# overwrite with values in job_conf if they exist
if job_conf.spark_defaults_conf:
spark_configuration.spark_defaults_conf = job_conf.spark_defaults_conf
if job_conf.spark_env_sh:
spark_configuration.spark_env_sh = job_conf.spark_env_sh
if job_conf.core_site_xml:
spark_configuration.core_site_xml = job_conf.core_site_xml

job_configuration = aztk.spark.models.JobConfiguration(
id=job_conf.id,
applications=aztk_applications,
Expand Down