Skip to content

Commit

Permalink
Update init_spark_on_yarn (intel-analytics#2587)
Browse files Browse the repository at this point in the history
* update

* meet review

* update doc

* style
  • Loading branch information
hkvision committed Jul 21, 2020
1 parent 3e9c052 commit 08d67bc
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 51 deletions.
67 changes: 36 additions & 31 deletions python/dllib/src/bigdl/dllib/utils/nncontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ def init_spark_on_local(cores=2, conf=None, python_location=None, spark_log_leve

def init_spark_on_yarn(hadoop_conf,
conda_name,
num_executor,
num_executors,
executor_cores,
executor_memory="2g",
driver_memory="1g",
driver_cores=4,
driver_memory="1g",
extra_executor_memory_for_ray=None,
extra_python_lib=None,
penv_archive=None,
Expand All @@ -61,42 +61,47 @@ def init_spark_on_yarn(hadoop_conf,
spark_log_level="WARN",
redirect_spark_log=True,
jars=None,
spark_conf=None):
conf=None):
"""
Create a SparkContext with Zoo configuration on Yarn cluster on "Yarn-client" mode.
You should create a conda env and install the python dependencies in that env.
Conda env and the python dependencies only need to be installed in the driver machine.
It's not necessary create and install those on the whole yarn cluster.
:param hadoop_conf: path to the yarn configuration folder.
:param conda_name: Name of the conda env.
:param num_executor: Number of the Executors.
:param executor_cores: Cores for each Executor.
:param executor_memory: Memory for each Executor.
:param driver_memory: Memory for the Driver.
:param driver_cores: Number of cores for the Driver.
:param extra_executor_memory_for_ray: Memory size for the Ray services.
:param extra_python_lib:
:param penv_archive: Ideally, program would auto-pack the conda env which is specified by
`conda_name`, but you can also pass the path to a packed file in "tar.gz" format here.
:param additional_archive: comma seperated additional archives that you want to upload and
unpack on executor
:param hadoop_user_name: User name for running in yarn cluster. Default value is: root
:param spark_yarn_archive conf value for spark.yarn.archive
:param spark_log_level: Log level of Spark
:param redirect_spark_log: Direct the Spark log to local file or not.
:param jars: Comma-separated list of jars to include on the driver and executor classpaths.
:param spark_conf: You can append extra spark conf here in key value format.
i.e spark_conf={"spark.executor.extraJavaOptions": "-XX:+PrintGCDetails"}
:return: SparkContext
Create a SparkContext with Analytics Zoo configurations on Yarn cluster for yarn-client mode.
You only need to create a conda environment and install the python dependencies in that
environment beforehand on the driver machine. These dependencies would be automatically
packaged and distributed to the whole Yarn cluster.
:param hadoop_conf: The path to the yarn configuration folder.
:param conda_name: The name of the conda environment.
:param num_executors: The number of Spark executors.
:param executor_cores: The number of cores for each executor.
:param executor_memory: The memory for each executor. Default to be '2g'.
:param driver_cores: The number of cores for the Spark driver. Default to be 4.
:param driver_memory: The memory for the Spark driver. Default to be '1g'.
:param extra_executor_memory_for_ray: The extra memory for Ray services. Default to be None.
:param extra_python_lib: Extra python files or packages needed for distribution.
Default to be None.
:param penv_archive: Ideally, the program would auto-pack the conda environment specified by
'conda_name', but you can also pass the path to a packed file in "tar.gz" format here.
Default to be None.
:param additional_archive: Comma-separated list of additional archives to be uploaded and
unpacked on executors. Default to be None.
:param hadoop_user_name: The user name for running the yarn cluster. Default to be 'root'.
:param spark_yarn_archive: Conf value for setting spark.yarn.archive. Default to be None.
:param spark_log_level: The log level for Spark. Default to be 'WARN'.
:param redirect_spark_log: Whether to redirect the Spark log to local file. Default to be True.
:param jars: Comma-separated list of jars to be included on driver and executor's classpath.
Default to be None.
:param conf: You can append extra conf for Spark in key-value format.
i.e conf={"spark.executor.extraJavaOptions": "-XX:+PrintGCDetails"}.
Default to be None.
:return: An instance of SparkContext.
"""
from zoo.util.spark import SparkRunner
sparkrunner = SparkRunner(spark_log_level=spark_log_level,
redirect_spark_log=redirect_spark_log)
sc = sparkrunner.init_spark_on_yarn(
hadoop_conf=hadoop_conf,
conda_name=conda_name,
num_executor=num_executor,
num_executors=num_executors,
executor_cores=executor_cores,
executor_memory=executor_memory,
driver_memory=driver_memory,
Expand All @@ -108,7 +113,7 @@ def init_spark_on_yarn(hadoop_conf,
hadoop_user_name=hadoop_user_name,
spark_yarn_archive=spark_yarn_archive,
jars=jars,
spark_conf=spark_conf)
conf=conf)
return sc


Expand Down
41 changes: 21 additions & 20 deletions python/dllib/src/bigdl/dllib/utils/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,18 @@ def init_spark_on_local(self, cores, conf=None, python_location=None):
def init_spark_on_yarn(self,
hadoop_conf,
conda_name,
num_executor,
num_executors,
executor_cores,
executor_memory="10g",
driver_memory="1g",
executor_memory="2g",
driver_cores=4,
driver_memory="1g",
extra_executor_memory_for_ray=None,
extra_python_lib=None,
penv_archive=None,
additional_archive=None,
hadoop_user_name="root",
spark_yarn_archive=None,
spark_conf=None,
conf=None,
jars=None):
os.environ["HADOOP_CONF_DIR"] = hadoop_conf
os.environ['HADOOP_USER_NAME'] = hadoop_user_name
Expand All @@ -166,12 +166,13 @@ def _yarn_opt(jars):
archive = "{}#{}".format(penv_archive, self.PYTHON_ENV)
if additional_archive:
archive = archive + "," + additional_archive
command = " --archives {} --num-executors {} " \
" --executor-cores {} --executor-memory {}". \
format(archive, num_executor, executor_cores, executor_memory)
command = " --archives {} --driver-cores {} --driver-memory {}" \
" --num-executors {} --executor-cores {} --executor-memory {}". \
format(archive, driver_cores, driver_memory,
num_executors, executor_cores, executor_memory)

if extra_python_lib:
command = command + " --py-files {} ".format(extra_python_lib)
command = command + " --py-files {}".format(extra_python_lib)
if jars:
command = command + " --jars {}".format(jars)
return command
Expand All @@ -182,7 +183,7 @@ def _submit_opt():
"spark.driver.cores": driver_cores,
"spark.executor.cores": executor_cores,
"spark.executor.memory": executor_memory,
"spark.scheduler.minRegisterreResourcesRatio": "1.0"}
"spark.scheduler.minRegisteredResourcesRatio": "1.0"}
if extra_executor_memory_for_ray:
conf["spark.executor.memoryOverhead"] = extra_executor_memory_for_ray
if spark_yarn_archive:
Expand All @@ -197,23 +198,23 @@ def _submit_opt():
penv_archive = self.pack_penv(conda_name)
pack_env = True

submit_args, conf = _submit_opt()
submit_args, _conf = _submit_opt()

if not spark_conf:
spark_conf = {}
if not conf:
conf = {}
zoo_bigdl_path_on_executor = ":".join(self._assemble_zoo_classpath_for_executor())

if "spark.executor.extraClassPath" in spark_conf:
spark_conf["spark.executor.extraClassPath"] = "{}:{}".format(
zoo_bigdl_path_on_executor, spark_conf["spark.executor.extraClassPath"])
if "spark.executor.extraClassPath" in conf:
conf["spark.executor.extraClassPath"] = "{}:{}".format(
zoo_bigdl_path_on_executor, conf["spark.executor.extraClassPath"])
else:
spark_conf["spark.executor.extraClassPath"] = zoo_bigdl_path_on_executor
conf["spark.executor.extraClassPath"] = zoo_bigdl_path_on_executor

spark_conf["spark.executorEnv.PYTHONHOME"] = self.PYTHON_ENV
conf["spark.executorEnv.PYTHONHOME"] = self.PYTHON_ENV

for item in spark_conf.items():
conf[str(item[0])] = str(item[1])
sc = self._create_sc(submit_args, conf)
for item in conf.items():
_conf[str(item[0])] = str(item[1])
sc = self._create_sc(submit_args, _conf)
finally:
if conda_name and penv_archive and pack_env:
os.remove(penv_archive)
Expand Down

0 comments on commit 08d67bc

Please sign in to comment.