diff --git a/nds/PysparkBenchReport.py b/nds/PysparkBenchReport.py new file mode 100644 index 0000000..5029211 --- /dev/null +++ b/nds/PysparkBenchReport.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# SPDX-FileCopyrightText: Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ----- +# +# Certain portions of the contents of this file are derived from TPC-DS version 3.2.0 +# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +# Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +# and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +# +# You may not use this file except in compliance with the TPC EULA. +# DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results +# obtained using this file are not comparable to published TPC-DS Benchmark results, as the results +# obtained from using this file do not comply with the TPC-DS Benchmark. +# + +import json +import os +import time +import traceback +from typing import Callable +from pyspark.sql import SparkSession + +import python_listener + +class PysparkBenchReport: + """Class to generate json summary report for a benchmark + """ + def __init__(self, spark_session: SparkSession, query_name) -> None: + self.spark_session = spark_session + self.summary = { + 'env': { + 'envVars': {}, + 'sparkConf': {}, + 'sparkVersion': None + }, + 'queryStatus': [], + 'exceptions': [], + 'startTime': None, + 'queryTimes': [], + 'query': query_name, + } + + def report_on(self, fn: Callable, *args): + """Record a function for its running environment, running status etc. and exclude sentive + information like tokens, secret and password Generate summary in dict format for it. + + Args: + fn (Callable): a function to be recorded + + Returns: + dict: summary of the fn + """ + spark_conf = dict(self.spark_session.sparkContext._conf.getAll()) + env_vars = dict(os.environ) + redacted = ["TOKEN", "SECRET", "PASSWORD"] + filtered_env_vars = dict((k, env_vars[k]) for k in env_vars.keys() if not (k in redacted)) + self.summary['env']['envVars'] = filtered_env_vars + self.summary['env']['sparkConf'] = spark_conf + self.summary['env']['sparkVersion'] = self.spark_session.version + listener = None + try: + listener = python_listener.PythonListener() + listener.register() + except TypeError as e: + print("Not found com.nvidia.spark.rapids.listener.Manager", str(e)) + listener = None + if listener is not None: + print("TaskFailureListener is registered.") + try: + start_time = int(time.time() * 1000) + fn(*args) + end_time = int(time.time() * 1000) + if listener and len(listener.failures) != 0: + self.summary['queryStatus'].append("CompletedWithTaskFailures") + else: + self.summary['queryStatus'].append("Completed") + except Exception as e: + # print the exception to ease debugging + print('ERROR BEGIN') + print(e) + traceback.print_tb(e.__traceback__) + print('ERROR END') + end_time = int(time.time() * 1000) + self.summary['queryStatus'].append("Failed") + self.summary['exceptions'].append(str(e)) + finally: + self.summary['startTime'] = start_time + self.summary['queryTimes'].append(end_time - start_time) + if listener is not None: + listener.unregister() + return self.summary + + def write_summary(self, prefix=""): + """_summary_ + + Args: + query_name (str): name of the query + prefix (str, optional): prefix for the output json summary file. Defaults to "". + """ + # Power BI side is retrieving some information from the summary file name, so keep this file + # name format for pipeline compatibility + filename = prefix + '-' + self.summary['query'] + '-' +str(self.summary['startTime']) + '.json' + self.summary['filename'] = filename + with open(filename, "w") as f: + json.dump(self.summary, f, indent=2) + + def is_success(self): + """Check if the query succeeded, queryStatus == Completed + """ + return self.summary['queryStatus'][0] == 'Completed' \ No newline at end of file diff --git a/nds/README.md b/nds/README.md index a933df6..e67bf89 100644 --- a/nds/README.md +++ b/nds/README.md @@ -48,13 +48,12 @@ You may not use NDS except in compliance with the Apache License, Version 2.0 an To help user run NDS, we provide a template to define the main Spark configs for spark-submit command. User can use different templates to run NDS with different configurations for different environment. -We create [spark-submit-template](../shared/spark-submit-template), which accepts a template file and +We create [spark-submit-template](./spark-submit-template), which accepts a template file and submit the Spark job with the configs defined in the template file. Example command to submit via `spark-submit-template` utility: ```bash -cd shared ./spark-submit-template convert_submit_cpu.template \ nds_transcode.py raw_sf3k parquet_sf3k report.txt ``` @@ -74,7 +73,7 @@ For example, we provide below template files to run nds_transcode.py for differe * `convert_submit_gpu.template` is for Spark GPU cluster You need to choose one as your template file and modify it to fit your environment. -We define a [base.template](../shared/base.template) to help you define some basic variables for your envionment. +We define a [base.template](./base.template) to help you define some basic variables for your envionment. And all the other templates will source `base.template` to get the basic variables. When you hope to run multiple steps of NDS, you just need to modify `base.template` to fit for your cluster. @@ -142,8 +141,8 @@ python nds_gen_data.py hdfs 100 100 /data/raw_sf100 --overwrite_output ### Convert CSV to Parquet or Other data sources To do the data conversion, the `nds_transcode.py` need to be submitted as a Spark job. User can leverage -the [spark-submit-template](../shared/spark-submit-template) utility to simplify the submission. -The utility requires a pre-defined [template file](../shared/convert_submit_gpu.template) where user needs to put +the [spark-submit-template](./spark-submit-template) utility to simplify the submission. +The utility requires a pre-defined [template file](./convert_submit_gpu.template) where user needs to put necessary Spark configurations. Either user can submit the `nds_transcode.py` directly to spark with arbitrary Spark parameters. @@ -153,7 +152,7 @@ Parquet, Orc, Avro, JSON and Iceberg are supported for output data format at pre only Parquet and Orc are supported. Note: when exporting data from CSV to Iceberg, user needs to set necessary configs for Iceberg in submit template. -e.g. [convert_submit_cpu_iceberg.template](../shared/convert_submit_cpu_iceberg.template) +e.g. [convert_submit_cpu_iceberg.template](./convert_submit_cpu_iceberg.template) User can also specify `--tables` to convert specific table or tables. See argument details below. @@ -217,7 +216,6 @@ optional arguments: Example command to submit via `spark-submit-template` utility: ```bash -cd shared ./spark-submit-template convert_submit_gpu.template \ nds_transcode.py raw_sf3k parquet_sf3k report.txt ``` @@ -346,13 +344,12 @@ optional arguments: Example command to submit nds_power.py by spark-submit-template utility: ```bash -cd shared ./spark-submit-template power_run_gpu.template \ -../nds/nds_power.py \ +../nds_power.py \ parquet_sf3k \ /query_0.sql \ time.csv \ ---property_file ../utils/properties/aqe-on.properties +--property_file ./properties/aqe-on.properties ``` User can also use `spark-submit` to submit `nds_power.py` directly. @@ -361,16 +358,15 @@ To simplify the performance analysis process, the script will create a local CSV Note the template file must follow the `spark-submit-template` utility as the _first_ argument. All Spark configuration words (such as `--conf` and corresponding `k=v` values) are quoted by -double quotes in the template file. Please follow the format in [power_run_gpu.template](../shared/power_run_gpu.template). +double quotes in the template file. Please follow the format in [power_run_gpu.template](./power_run_gpu.template). -User can define the `properties` file like [aqe-on.properties](../utils/properties/aqe-on.properties). The properties will be passed to the submitted Spark job along with the configurations defined in the template file. User can define some common properties in the template file and put some other properties that usually varies in the property file. +User can define the `properties` file like [aqe-on.properties](./properties/aqe-on.properties). The properties will be passed to the submitted Spark job along with the configurations defined in the template file. User can define some common properties in the template file and put some other properties that usually varies in the property file. The command above will use `collect()` action to trigger Spark job for each query. It is also supported to save query output to some place for further verification. User can also specify output format e.g. csv, parquet or orc: ```bash -cd shared ./spark-submit-template power_run_gpu.template \ -../nds/nds_power.py \ +./nds_power.py \ parquet_sf3k \ /query_0.sql \ time.csv \ @@ -390,7 +386,7 @@ and _query_2.sql_ and produces csv log for execution time _time_1.csv_ and _time ```bash ./nds-throughput 1,2 \ -../shared/spark-submit-template ../shared/power_run_gpu.template \ +./spark-submit-template ./power_run_gpu.template \ nds_power.py \ parquet_sf3k \ ./nds_query_streams/query_'{}'.sql \ @@ -409,7 +405,7 @@ update operations cannot be done atomically on raw Parquet/Orc files, so we use [Iceberg](https://iceberg.apache.org/) as dataset metadata manager to overcome the issue. Enabling Iceberg requires additional configuration. Please refer to [Iceberg Spark](https://iceberg.apache.org/docs/latest/getting-started/) -for details. We also provide a Spark submit template with necessary Iceberg configs: [maintenance_iceberg.template](../shared/maintenance_iceberg.template) +for details. We also provide a Spark submit template with necessary Iceberg configs: [maintenance_iceberg.template](./maintenance_iceberg.template) The data maintenance queries are in [data_maintenance](./data_maintenance) folder. `DF_*.sql` are DELETE queries while `LF_*.sql` are INSERT queries. diff --git a/nds/base.template b/nds/base.template new file mode 100644 index 0000000..a085d9a --- /dev/null +++ b/nds/base.template @@ -0,0 +1,37 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This is the base template file for the common information about test environment +# including the information about Spark, cluster configuration and the Jar files, +# which are required in the other templates. +# We'll source this base file in all the other templates so that we just need to update +# here once instead of updating in all the templates. +# If you have any different configuration in a specific template, you can override +# the variables in that template. + +export SPARK_HOME=${SPARK_HOME:-/usr/lib/spark} +export SPARK_MASTER=${SPARK_MASTER:-yarn} +export DRIVER_MEMORY=${DRIVER_MEMORY:-10G} +export EXECUTOR_CORES=${EXECUTOR_CORES:-12} +export NUM_EXECUTORS=${NUM_EXECUTORS:-8} +export EXECUTOR_MEMORY=${EXECUTOR_MEMORY:-16G} + +# The NDS listener jar which is built in jvm_listener directory. +export NDS_LISTENER_JAR=${NDS_LISTENER_JAR:-./jvm_listener/target/nds-benchmark-listener-1.0-SNAPSHOT.jar} +# The spark-rapids jar which is required when running on GPU +export SPARK_RAPIDS_PLUGIN_JAR=${SPARK_RAPIDS_PLUGIN_JAR:-rapids-4-spark_2.12-22.06.0.jar} +export PYTHONPATH=$SPARK_HOME/python:`echo $SPARK_HOME/python/lib/py4j-*.zip` \ No newline at end of file diff --git a/nds/check.py b/nds/check.py new file mode 100644 index 0000000..8dda74f --- /dev/null +++ b/nds/check.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python3 +# +# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ----- +# +# Certain portions of the contents of this file are derived from TPC-DS version 3.2.0 +# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +# Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +# and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +# +# You may not use this file except in compliance with the TPC EULA. +# DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results +# obtained using this file are not comparable to published TPC-DS Benchmark results, as the results +# obtained from using this file do not comply with the TPC-DS Benchmark. +# + +import argparse +import os +import sys +from pathlib import Path + + +def check_version(): + req_ver = (3, 6) + cur_ver = sys.version_info + if cur_ver < req_ver: + raise Exception('Minimum required Python version is 3.6, but current python version is {}.' + .format(str(cur_ver.major) + '.' + str(cur_ver.minor)) + + ' Please use proper Python version') + + +def check_build(): + """check jar and tpcds executable + + Raises: + Exception: the build is not done or broken + + Returns: + PosixPath, PosixPath: path of jar and dsdgen executable + """ + # Check if necessary executable or jars are built. + # we assume user won't move this script. + src_dir = Path(__file__).parent.absolute() + jar_path = list( + Path(src_dir / 'tpcds-gen/target').rglob("tpcds-gen-*.jar")) + tool_path = list(Path(src_dir / 'tpcds-gen/target/tools').rglob("dsdgen")) + if jar_path == [] or tool_path == []: + raise Exception('Target jar file is not found in `target` folder or dsdgen executable is ' + + 'not found in `target/tools` folder.' + + 'Please refer to README document and build this project first.') + return jar_path[0], tool_path[0] + + +def get_abs_path(input_path): + """receive a user input path and return absolute path of it. + + Args: + input_path (str): user's input path + + Returns: + str: if the input is absolute, return it; if it's relative path, return the absolute path of + it. + """ + if Path(input_path).is_absolute(): + # it's absolute path + output_path = input_path + else: + # it's relative path where this script is executed + output_path = os.getcwd() + '/' + input_path + return output_path + + +def valid_range(range, parallel): + """check the range validation + + Args: + range (str): a range specified for a range data generation, e.g. "1,10" + parallel (str): string type number for parallelism in TPC-DS data generation, e.g. "20" + + Raises: + Exception: error message for invalid range input. + """ + if len(range.split(',')) != 2: + msg = 'Invalid range: please specify a range with a comma between start and end. e.g., "1,10".' + raise Exception(msg) + range_start = int(range.split(',')[0]) + range_end = int(range.split(',')[1]) + if range_start < 1 or range_start > range_end or range_end > int(parallel): + msg = 'Please provide correct child range: 1 <= range_start <= range_end <= parallel' + raise Exception(msg) + return range_start, range_end + + +def parallel_value_type(p): + """helper function to check parallel valuie + + Args: + p (str): parallel value + + Raises: + argparse.ArgumentTypeError: ArgumentTypeError exception + + Returns: + str: parallel in string + """ + if int(p) < 2: + raise argparse.ArgumentTypeError("PARALLEL must be >= 2") + return p + + +def get_dir_size(start_path): + total_size = 0 + for dirpath, dirnames, filenames in os.walk(start_path): + for f in filenames: + fp = os.path.join(dirpath, f) + # skip if it is symbolic link + if not os.path.islink(fp): + total_size += os.path.getsize(fp) + return total_size + +def check_json_summary_folder(json_summary_folder): + if json_summary_folder: + # prepare a folder to save json summaries of query results + if not os.path.exists(json_summary_folder): + os.makedirs(json_summary_folder) + else: + if os.listdir(json_summary_folder): + raise Exception(f"json_summary_folder {json_summary_folder} is not empty. " + + "There may be already some json files there. Please clean the folder " + + "or specify another one.") + +def check_query_subset_exists(query_dict, subset_list): + """check if the query subset exists in the query dictionary""" + for q in subset_list: + if q not in query_dict.keys(): + raise Exception(f"Query {q} is not in the query dictionary. Please check the query subset.") + return True \ No newline at end of file diff --git a/nds/convert_submit_cpu.template b/nds/convert_submit_cpu.template new file mode 100644 index 0000000..7a706b8 --- /dev/null +++ b/nds/convert_submit_cpu.template @@ -0,0 +1,27 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source base.template +export SHUFFLE_PARTITIONS=${SHUFFLE_PARTITIONS:-200} + +export SPARK_CONF=("--master" "${SPARK_MASTER}" + "--deploy-mode" "client" + "--conf" "spark.driver.memory=${DRIVER_MEMORY}" + "--conf" "spark.executor.cores=${EXECUTOR_CORES}" + "--conf" "spark.executor.instances=${NUM_EXECUTORS}" + "--conf" "spark.executor.memory=${EXECUTOR_MEMORY}" + "--conf" "spark.sql.shuffle.partitions=${SHUFFLE_PARTITIONS}") \ No newline at end of file diff --git a/shared/convert_submit_cpu_delta.template b/nds/convert_submit_cpu_delta.template similarity index 100% rename from shared/convert_submit_cpu_delta.template rename to nds/convert_submit_cpu_delta.template diff --git a/shared/convert_submit_cpu_iceberg.template b/nds/convert_submit_cpu_iceberg.template similarity index 100% rename from shared/convert_submit_cpu_iceberg.template rename to nds/convert_submit_cpu_iceberg.template diff --git a/nds/convert_submit_gpu.template b/nds/convert_submit_gpu.template new file mode 100644 index 0000000..d970b7e --- /dev/null +++ b/nds/convert_submit_gpu.template @@ -0,0 +1,41 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source base.template +export CONCURRENT_GPU_TASKS=${CONCURRENT_GPU_TASKS:-2} +export SHUFFLE_PARTITIONS=${SHUFFLE_PARTITIONS:-200} + +export SPARK_CONF=("--master" "${SPARK_MASTER}" + "--deploy-mode" "client" + "--conf" "spark.driver.memory=${DRIVER_MEMORY}" + "--conf" "spark.executor.cores=${EXECUTOR_CORES}" + "--conf" "spark.executor.instances=${NUM_EXECUTORS}" + "--conf" "spark.executor.memory=${EXECUTOR_MEMORY}" + "--conf" "spark.sql.shuffle.partitions=${SHUFFLE_PARTITIONS}" + "--conf" "spark.executor.resource.gpu.amount=1" + "--conf" "spark.executor.resource.gpu.discoveryScript=./getGpusResources.sh" + "--conf" "spark.plugins=com.nvidia.spark.SQLPlugin" + "--conf" "spark.rapids.memory.pinnedPool.size=8g" + "--conf" "spark.rapids.sql.concurrentGpuTasks=${CONCURRENT_GPU_TASKS}" + "--conf" "spark.rapids.sql.explain=NOT_ON_GPU" + "--conf" "spark.rapids.sql.incompatibleOps.enabled=true" + "--conf" "spark.rapids.sql.variableFloatAgg.enabled=true" + "--conf" "spark.sql.files.maxPartitionBytes=2g" + "--conf" "spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED" + "--conf" "spark.task.resource.gpu.amount=0.05" + "--files" "$SPARK_HOME/examples/src/main/scripts/getGpusResources.sh" + "--jars" "$SPARK_RAPIDS_PLUGIN_JAR") \ No newline at end of file diff --git a/nds/jvm_listener/pom.xml b/nds/jvm_listener/pom.xml new file mode 100644 index 0000000..ddf0950 --- /dev/null +++ b/nds/jvm_listener/pom.xml @@ -0,0 +1,74 @@ + + + + 4.0.0 + + com.nvidia + nds-benchmark-listener + jar + 1.0-SNAPSHOT + + + 8 + 8 + + + + + org.apache.spark + spark-core_2.12 + 3.1.2 + + + + org.apache.spark + spark-sql_2.12 + 3.1.2 + provided + + + + + src/main/scala/ + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 1.8 + 1.8 + + + + org.scala-tools + maven-scala-plugin + 2.15.2 + + + + compile + testCompile + + + + + + + + \ No newline at end of file diff --git a/nds/jvm_listener/src/main/scala/com/nvidia/spark/rapids/listener/Listener.scala b/nds/jvm_listener/src/main/scala/com/nvidia/spark/rapids/listener/Listener.scala new file mode 100644 index 0000000..113f2db --- /dev/null +++ b/nds/jvm_listener/src/main/scala/com/nvidia/spark/rapids/listener/Listener.scala @@ -0,0 +1,24 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.listener + +trait Listener { + /* Listener interface to be implemented at Python side + */ + def notify(x: Any): Any +} diff --git a/nds/jvm_listener/src/main/scala/com/nvidia/spark/rapids/listener/Manager.scala b/nds/jvm_listener/src/main/scala/com/nvidia/spark/rapids/listener/Manager.scala new file mode 100644 index 0000000..13a13e6 --- /dev/null +++ b/nds/jvm_listener/src/main/scala/com/nvidia/spark/rapids/listener/Manager.scala @@ -0,0 +1,66 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.listener + +import org.apache.spark.SparkContext + +object Manager { + /* Manager class to manage all extra customized listeners. + */ + private var listeners: Map[String, Listener] = Map() + private val spark_listener = new TaskFailureListener() + private var isRegistered = false + + def register(listener: Listener): String = { + /* Note this register method has nothing to do with SparkContext.addSparkListener method. + * This method is only to provide an interface to developers to have a better control over + * all customized listeners. + */ + this.synchronized { + // We register to the spark listener when the first listener is registered. + registerSparkListener() + val uuid = java.util.UUID.randomUUID().toString + listeners = listeners + (uuid -> listener) + uuid + } + } + + def unregister(uuid: String) = { + this.synchronized { + listeners = listeners - uuid + } + } + + def notifyAll(message: String): Unit = { + for { (_, listener) <- listeners } listener.notify(message) + } + + def registerSparkListener() : Unit = { + if (!isRegistered) { + SparkContext.getOrCreate().addSparkListener(spark_listener) + isRegistered = true + } + } + + def unregisterSparkListener() : Unit = { + if (isRegistered) { + SparkContext.getOrCreate().removeSparkListener(spark_listener) + isRegistered = false + } + } +} diff --git a/nds/jvm_listener/src/main/scala/com/nvidia/spark/rapids/listener/TaskFailureListener.scala b/nds/jvm_listener/src/main/scala/com/nvidia/spark/rapids/listener/TaskFailureListener.scala new file mode 100644 index 0000000..791be72 --- /dev/null +++ b/nds/jvm_listener/src/main/scala/com/nvidia/spark/rapids/listener/TaskFailureListener.scala @@ -0,0 +1,37 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.listener + +import org.apache.spark.{Success, TaskEndReason} +import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} +import scala.collection.mutable.ListBuffer + + +/* A simple listener which captures SparkListenerTaskEnd, + * extracts "reason" of the task. If the reason is not "Success", + * send this reason to python side. + */ +class TaskFailureListener extends SparkListener { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + taskEnd.reason match { + case Success => + case reason => Manager.notifyAll(reason.toString) + } + super.onTaskEnd(taskEnd) + } +} diff --git a/shared/maintenance_delta.template b/nds/maintenance_delta.template similarity index 100% rename from shared/maintenance_delta.template rename to nds/maintenance_delta.template diff --git a/shared/maintenance_iceberg.template b/nds/maintenance_iceberg.template similarity index 100% rename from shared/maintenance_iceberg.template rename to nds/maintenance_iceberg.template diff --git a/nds/nds_gen_data.py b/nds/nds_gen_data.py index b0d23b8..72611ed 100644 --- a/nds/nds_gen_data.py +++ b/nds/nds_gen_data.py @@ -34,13 +34,8 @@ import os import shutil import subprocess -import sys -parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) -utils_dir = os.path.join(parent_dir, 'utils') -sys.path.insert(0, utils_dir) - -from check import check_build_nds, check_version, get_abs_path, get_dir_size, parallel_value_type, valid_range +from check import check_build, check_version, get_abs_path, get_dir_size, parallel_value_type, valid_range check_version() @@ -250,7 +245,7 @@ def generate_data_local(args, range_start, range_end, tool_path): def generate_data(args): - jar_path, tool_path = check_build_nds() + jar_path, tool_path = check_build() range_start = 1 range_end = int(args.parallel) if args.range: diff --git a/nds/nds_gen_query_stream.py b/nds/nds_gen_query_stream.py index a11d31a..1c97658 100644 --- a/nds/nds_gen_query_stream.py +++ b/nds/nds_gen_query_stream.py @@ -35,11 +35,7 @@ import subprocess import sys -parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) -utils_dir = os.path.join(parent_dir, 'utils') -sys.path.insert(0, utils_dir) - -from check import check_build_nds, check_version, get_abs_path +from check import check_build, check_version, get_abs_path check_version() @@ -107,7 +103,7 @@ def split_special_query(q): return part_1, part_2 if __name__ == "__main__": - _, tool_path = check_build_nds() + _, tool_path = check_build() parser = parser = argparse.ArgumentParser() parser.add_argument('template_dir', help='directory to find query templates and dialect file.') diff --git a/nds/nds_maintenance.py b/nds/nds_maintenance.py index 99faef3..448701d 100644 --- a/nds/nds_maintenance.py +++ b/nds/nds_maintenance.py @@ -35,12 +35,8 @@ from datetime import datetime import os -parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) -utils_dir = os.path.join(parent_dir, 'utils') -sys.path.insert(0, utils_dir) - from pyspark.sql import SparkSession -from python_benchmark_reporter.PysparkBenchReport import PysparkBenchReport +from PysparkBenchReport import PysparkBenchReport from check import check_json_summary_folder, get_abs_path from nds_schema import get_maintenance_schemas diff --git a/nds/nds_power.py b/nds/nds_power.py index a2ceef1..a56cfb9 100644 --- a/nds/nds_power.py +++ b/nds/nds_power.py @@ -37,15 +37,10 @@ import time from collections import OrderedDict from pyspark.sql import SparkSession +from PysparkBenchReport import PysparkBenchReport from pyspark.sql import DataFrame -parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) -utils_dir = os.path.join(parent_dir, 'utils') -sys.path.insert(0, utils_dir) - -from python_benchmark_reporter.PysparkBenchReport import PysparkBenchReport from check import check_json_summary_folder, check_query_subset_exists, check_version - from nds_gen_query_stream import split_special_query from nds_schema import get_schemas diff --git a/nds/power_run_cpu.template b/nds/power_run_cpu.template new file mode 100644 index 0000000..395fb59 --- /dev/null +++ b/nds/power_run_cpu.template @@ -0,0 +1,32 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source base.template +export SHUFFLE_PARTITIONS=${SHUFFLE_PARTITIONS:-200} + +export SPARK_CONF=("--master" "${SPARK_MASTER}" + "--deploy-mode" "client" + "--conf" "spark.driver.memory=${DRIVER_MEMORY}" + "--conf" "spark.executor.cores=${EXECUTOR_CORES}" + "--conf" "spark.executor.instances=${NUM_EXECUTORS}" + "--conf" "spark.executor.memory=${EXECUTOR_MEMORY}" + "--conf" "spark.sql.shuffle.partitions=${SHUFFLE_PARTITIONS}" + "--conf" "spark.scheduler.minRegisteredResourcesRatio=1.0" + "--conf" "spark.sql.adaptive.enabled=true" + "--conf" "spark.sql.broadcastTimeout=1200" + "--conf" "spark.dynamicAllocation.enabled=false" + "--jars" "$NDS_LISTENER_JAR") \ No newline at end of file diff --git a/shared/power_run_cpu_delta.template b/nds/power_run_cpu_delta.template similarity index 100% rename from shared/power_run_cpu_delta.template rename to nds/power_run_cpu_delta.template diff --git a/shared/power_run_cpu_iceberg.template b/nds/power_run_cpu_iceberg.template similarity index 100% rename from shared/power_run_cpu_iceberg.template rename to nds/power_run_cpu_iceberg.template diff --git a/nds/power_run_gpu.template b/nds/power_run_gpu.template new file mode 100644 index 0000000..453cd2d --- /dev/null +++ b/nds/power_run_gpu.template @@ -0,0 +1,40 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source base.template +export CONCURRENT_GPU_TASKS=${CONCURRENT_GPU_TASKS:-2} +export SHUFFLE_PARTITIONS=${SHUFFLE_PARTITIONS:-200} + +export SPARK_CONF=("--master" "${SPARK_MASTER}" + "--deploy-mode" "client" + "--conf" "spark.driver.maxResultSize=2GB" + "--conf" "spark.driver.memory=${DRIVER_MEMORY}" + "--conf" "spark.executor.cores=${EXECUTOR_CORES}" + "--conf" "spark.executor.instances=${NUM_EXECUTORS}" + "--conf" "spark.executor.memory=${EXECUTOR_MEMORY}" + "--conf" "spark.sql.shuffle.partitions=${SHUFFLE_PARTITIONS}" + "--conf" "spark.sql.files.maxPartitionBytes=2gb" + "--conf" "spark.sql.adaptive.enabled=true" + "--conf" "spark.executor.resource.gpu.amount=1" + "--conf" "spark.executor.resource.gpu.discoveryScript=./getGpusResources.sh" + "--conf" "spark.task.resource.gpu.amount=0.0625" + "--conf" "spark.plugins=com.nvidia.spark.SQLPlugin" + "--conf" "spark.rapids.memory.host.spillStorageSize=32G" + "--conf" "spark.rapids.memory.pinnedPool.size=8g" + "--conf" "spark.rapids.sql.concurrentGpuTasks=${CONCURRENT_GPU_TASKS}" + "--files" "$SPARK_HOME/examples/src/main/scripts/getGpusResources.sh" + "--jars" "$SPARK_RAPIDS_PLUGIN_JAR,$NDS_LISTENER_JAR") \ No newline at end of file diff --git a/shared/power_run_gpu_delta.template b/nds/power_run_gpu_delta.template similarity index 100% rename from shared/power_run_gpu_delta.template rename to nds/power_run_gpu_delta.template diff --git a/shared/power_run_gpu_iceberg.template b/nds/power_run_gpu_iceberg.template similarity index 100% rename from shared/power_run_gpu_iceberg.template rename to nds/power_run_gpu_iceberg.template diff --git a/nds/python_listener/PythonListener.py b/nds/python_listener/PythonListener.py new file mode 100644 index 0000000..8e9c949 --- /dev/null +++ b/nds/python_listener/PythonListener.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python3 +# +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from pyspark import SparkContext +from pyspark.java_gateway import ensure_callback_server_started + +class PythonListener(object): + package = "com.nvidia.spark.rapids.listener" + + @staticmethod + def get_manager(): + jvm = SparkContext.getOrCreate()._jvm + manager = getattr(jvm, "{}.{}".format(PythonListener.package, "Manager")) + return manager + + def __init__(self): + self.uuid = None + self.failures = [] + + def notify(self, obj): + """This method is required by Scala Listener interface + we defined above. + """ + self.failures.append(obj) + + def register(self): + ensure_callback_server_started(gw = SparkContext.getOrCreate()._gateway) + manager = PythonListener.get_manager() + self.uuid = manager.register(self) + return self.uuid + + def unregister(self): + manager = PythonListener.get_manager() + manager.unregister(self.uuid) + self.uuid = None + + # should call after register + def register_spark_listener(self): + manager = PythonListener.get_manager() + manager.registerSparkListener() + + def unregister_spark_listener(self): + manager = PythonListener.get_manager() + manager.unregisterSparkListener() + + class Java: + implements = ["com.nvidia.spark.rapids.listener.Listener"] \ No newline at end of file diff --git a/nds/python_listener/__init__.py b/nds/python_listener/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/nds/spark-submit-template b/nds/spark-submit-template new file mode 100755 index 0000000..4eda0ae --- /dev/null +++ b/nds/spark-submit-template @@ -0,0 +1,33 @@ +#!/bin/bash +# +# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +set -ex +# e.g. +# ./spark-submit-template power_run_gpu.template nds_power.py \ +# local_data_parquet/ +# ./nds_query_streams/query_0.sql \ +# time.csv + +# the first argument must be the template file +source "$1" +# build spark-submit command +MORE_ARGS=("${@:2}") +CMD=("$SPARK_HOME/bin/spark-submit") +CMD+=("${SPARK_CONF[@]}") +CMD+=("${MORE_ARGS[@]}") +# submit +"${CMD[@]}" \ No newline at end of file