From 08d67bc05c72ca80396a40907eabece3d7ecf1d9 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Tue, 21 Jul 2020 18:29:44 +0800 Subject: [PATCH] Update init_spark_on_yarn (#2587) * update * meet review * update doc * style --- .../dllib/src/bigdl/dllib/utils/nncontext.py | 67 ++++++++++--------- python/dllib/src/bigdl/dllib/utils/spark.py | 41 ++++++------ 2 files changed, 57 insertions(+), 51 deletions(-) diff --git a/python/dllib/src/bigdl/dllib/utils/nncontext.py b/python/dllib/src/bigdl/dllib/utils/nncontext.py index ec53ba65937..d6bf369fc63 100644 --- a/python/dllib/src/bigdl/dllib/utils/nncontext.py +++ b/python/dllib/src/bigdl/dllib/utils/nncontext.py @@ -47,11 +47,11 @@ def init_spark_on_local(cores=2, conf=None, python_location=None, spark_log_leve def init_spark_on_yarn(hadoop_conf, conda_name, - num_executor, + num_executors, executor_cores, executor_memory="2g", - driver_memory="1g", driver_cores=4, + driver_memory="1g", extra_executor_memory_for_ray=None, extra_python_lib=None, penv_archive=None, @@ -61,34 +61,39 @@ def init_spark_on_yarn(hadoop_conf, spark_log_level="WARN", redirect_spark_log=True, jars=None, - spark_conf=None): + conf=None): """ - Create a SparkContext with Zoo configuration on Yarn cluster on "Yarn-client" mode. - You should create a conda env and install the python dependencies in that env. - Conda env and the python dependencies only need to be installed in the driver machine. - It's not necessary create and install those on the whole yarn cluster. - - :param hadoop_conf: path to the yarn configuration folder. - :param conda_name: Name of the conda env. - :param num_executor: Number of the Executors. - :param executor_cores: Cores for each Executor. - :param executor_memory: Memory for each Executor. - :param driver_memory: Memory for the Driver. - :param driver_cores: Number of cores for the Driver. - :param extra_executor_memory_for_ray: Memory size for the Ray services. - :param extra_python_lib: - :param penv_archive: Ideally, program would auto-pack the conda env which is specified by - `conda_name`, but you can also pass the path to a packed file in "tar.gz" format here. - :param additional_archive: comma seperated additional archives that you want to upload and - unpack on executor - :param hadoop_user_name: User name for running in yarn cluster. Default value is: root - :param spark_yarn_archive conf value for spark.yarn.archive - :param spark_log_level: Log level of Spark - :param redirect_spark_log: Direct the Spark log to local file or not. - :param jars: Comma-separated list of jars to include on the driver and executor classpaths. - :param spark_conf: You can append extra spark conf here in key value format. - i.e spark_conf={"spark.executor.extraJavaOptions": "-XX:+PrintGCDetails"} - :return: SparkContext + Create a SparkContext with Analytics Zoo configurations on Yarn cluster for yarn-client mode. + You only need to create a conda environment and install the python dependencies in that + environment beforehand on the driver machine. These dependencies would be automatically + packaged and distributed to the whole Yarn cluster. + + :param hadoop_conf: The path to the yarn configuration folder. + :param conda_name: The name of the conda environment. + :param num_executors: The number of Spark executors. + :param executor_cores: The number of cores for each executor. + :param executor_memory: The memory for each executor. Default to be '2g'. + :param driver_cores: The number of cores for the Spark driver. Default to be 4. + :param driver_memory: The memory for the Spark driver. Default to be '1g'. + :param extra_executor_memory_for_ray: The extra memory for Ray services. Default to be None. + :param extra_python_lib: Extra python files or packages needed for distribution. + Default to be None. + :param penv_archive: Ideally, the program would auto-pack the conda environment specified by + 'conda_name', but you can also pass the path to a packed file in "tar.gz" format here. + Default to be None. + :param additional_archive: Comma-separated list of additional archives to be uploaded and + unpacked on executors. Default to be None. + :param hadoop_user_name: The user name for running the yarn cluster. Default to be 'root'. + :param spark_yarn_archive: Conf value for setting spark.yarn.archive. Default to be None. + :param spark_log_level: The log level for Spark. Default to be 'WARN'. + :param redirect_spark_log: Whether to redirect the Spark log to local file. Default to be True. + :param jars: Comma-separated list of jars to be included on driver and executor's classpath. + Default to be None. + :param conf: You can append extra conf for Spark in key-value format. + i.e conf={"spark.executor.extraJavaOptions": "-XX:+PrintGCDetails"}. + Default to be None. + + :return: An instance of SparkContext. """ from zoo.util.spark import SparkRunner sparkrunner = SparkRunner(spark_log_level=spark_log_level, @@ -96,7 +101,7 @@ def init_spark_on_yarn(hadoop_conf, sc = sparkrunner.init_spark_on_yarn( hadoop_conf=hadoop_conf, conda_name=conda_name, - num_executor=num_executor, + num_executors=num_executors, executor_cores=executor_cores, executor_memory=executor_memory, driver_memory=driver_memory, @@ -108,7 +113,7 @@ def init_spark_on_yarn(hadoop_conf, hadoop_user_name=hadoop_user_name, spark_yarn_archive=spark_yarn_archive, jars=jars, - spark_conf=spark_conf) + conf=conf) return sc diff --git a/python/dllib/src/bigdl/dllib/utils/spark.py b/python/dllib/src/bigdl/dllib/utils/spark.py index 9eefe6c87c7..2c4b9472ee7 100644 --- a/python/dllib/src/bigdl/dllib/utils/spark.py +++ b/python/dllib/src/bigdl/dllib/utils/spark.py @@ -144,18 +144,18 @@ def init_spark_on_local(self, cores, conf=None, python_location=None): def init_spark_on_yarn(self, hadoop_conf, conda_name, - num_executor, + num_executors, executor_cores, - executor_memory="10g", - driver_memory="1g", + executor_memory="2g", driver_cores=4, + driver_memory="1g", extra_executor_memory_for_ray=None, extra_python_lib=None, penv_archive=None, additional_archive=None, hadoop_user_name="root", spark_yarn_archive=None, - spark_conf=None, + conf=None, jars=None): os.environ["HADOOP_CONF_DIR"] = hadoop_conf os.environ['HADOOP_USER_NAME'] = hadoop_user_name @@ -166,12 +166,13 @@ def _yarn_opt(jars): archive = "{}#{}".format(penv_archive, self.PYTHON_ENV) if additional_archive: archive = archive + "," + additional_archive - command = " --archives {} --num-executors {} " \ - " --executor-cores {} --executor-memory {}". \ - format(archive, num_executor, executor_cores, executor_memory) + command = " --archives {} --driver-cores {} --driver-memory {}" \ + " --num-executors {} --executor-cores {} --executor-memory {}". \ + format(archive, driver_cores, driver_memory, + num_executors, executor_cores, executor_memory) if extra_python_lib: - command = command + " --py-files {} ".format(extra_python_lib) + command = command + " --py-files {}".format(extra_python_lib) if jars: command = command + " --jars {}".format(jars) return command @@ -182,7 +183,7 @@ def _submit_opt(): "spark.driver.cores": driver_cores, "spark.executor.cores": executor_cores, "spark.executor.memory": executor_memory, - "spark.scheduler.minRegisterreResourcesRatio": "1.0"} + "spark.scheduler.minRegisteredResourcesRatio": "1.0"} if extra_executor_memory_for_ray: conf["spark.executor.memoryOverhead"] = extra_executor_memory_for_ray if spark_yarn_archive: @@ -197,23 +198,23 @@ def _submit_opt(): penv_archive = self.pack_penv(conda_name) pack_env = True - submit_args, conf = _submit_opt() + submit_args, _conf = _submit_opt() - if not spark_conf: - spark_conf = {} + if not conf: + conf = {} zoo_bigdl_path_on_executor = ":".join(self._assemble_zoo_classpath_for_executor()) - if "spark.executor.extraClassPath" in spark_conf: - spark_conf["spark.executor.extraClassPath"] = "{}:{}".format( - zoo_bigdl_path_on_executor, spark_conf["spark.executor.extraClassPath"]) + if "spark.executor.extraClassPath" in conf: + conf["spark.executor.extraClassPath"] = "{}:{}".format( + zoo_bigdl_path_on_executor, conf["spark.executor.extraClassPath"]) else: - spark_conf["spark.executor.extraClassPath"] = zoo_bigdl_path_on_executor + conf["spark.executor.extraClassPath"] = zoo_bigdl_path_on_executor - spark_conf["spark.executorEnv.PYTHONHOME"] = self.PYTHON_ENV + conf["spark.executorEnv.PYTHONHOME"] = self.PYTHON_ENV - for item in spark_conf.items(): - conf[str(item[0])] = str(item[1]) - sc = self._create_sc(submit_args, conf) + for item in conf.items(): + _conf[str(item[0])] = str(item[1]) + sc = self._create_sc(submit_args, _conf) finally: if conda_name and penv_archive and pack_env: os.remove(penv_archive)