Skip to content

Commit

Permalink
Reverting changes for nds refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
bilalbari committed Jun 3, 2024
1 parent ece6971 commit 76647ce
Show file tree
Hide file tree
Showing 27 changed files with 769 additions and 40 deletions.
127 changes: 127 additions & 0 deletions nds/PysparkBenchReport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# SPDX-FileCopyrightText: Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# -----
#
# Certain portions of the contents of this file are derived from TPC-DS version 3.2.0
# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp).
# Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”)
# and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also
# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”).
#
# You may not use this file except in compliance with the TPC EULA.
# DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results
# obtained using this file are not comparable to published TPC-DS Benchmark results, as the results
# obtained from using this file do not comply with the TPC-DS Benchmark.
#

import json
import os
import time
import traceback
from typing import Callable
from pyspark.sql import SparkSession

import python_listener

class PysparkBenchReport:
"""Class to generate json summary report for a benchmark
"""
def __init__(self, spark_session: SparkSession, query_name) -> None:
self.spark_session = spark_session
self.summary = {
'env': {
'envVars': {},
'sparkConf': {},
'sparkVersion': None
},
'queryStatus': [],
'exceptions': [],
'startTime': None,
'queryTimes': [],
'query': query_name,
}

def report_on(self, fn: Callable, *args):
"""Record a function for its running environment, running status etc. and exclude sentive
information like tokens, secret and password Generate summary in dict format for it.
Args:
fn (Callable): a function to be recorded
Returns:
dict: summary of the fn
"""
spark_conf = dict(self.spark_session.sparkContext._conf.getAll())
env_vars = dict(os.environ)
redacted = ["TOKEN", "SECRET", "PASSWORD"]
filtered_env_vars = dict((k, env_vars[k]) for k in env_vars.keys() if not (k in redacted))
self.summary['env']['envVars'] = filtered_env_vars
self.summary['env']['sparkConf'] = spark_conf
self.summary['env']['sparkVersion'] = self.spark_session.version
listener = None
try:
listener = python_listener.PythonListener()
listener.register()
except TypeError as e:
print("Not found com.nvidia.spark.rapids.listener.Manager", str(e))
listener = None
if listener is not None:
print("TaskFailureListener is registered.")
try:
start_time = int(time.time() * 1000)
fn(*args)
end_time = int(time.time() * 1000)
if listener and len(listener.failures) != 0:
self.summary['queryStatus'].append("CompletedWithTaskFailures")
else:
self.summary['queryStatus'].append("Completed")
except Exception as e:
# print the exception to ease debugging
print('ERROR BEGIN')
print(e)
traceback.print_tb(e.__traceback__)
print('ERROR END')
end_time = int(time.time() * 1000)
self.summary['queryStatus'].append("Failed")
self.summary['exceptions'].append(str(e))
finally:
self.summary['startTime'] = start_time
self.summary['queryTimes'].append(end_time - start_time)
if listener is not None:
listener.unregister()
return self.summary

def write_summary(self, prefix=""):
"""_summary_
Args:
query_name (str): name of the query
prefix (str, optional): prefix for the output json summary file. Defaults to "".
"""
# Power BI side is retrieving some information from the summary file name, so keep this file
# name format for pipeline compatibility
filename = prefix + '-' + self.summary['query'] + '-' +str(self.summary['startTime']) + '.json'
self.summary['filename'] = filename
with open(filename, "w") as f:
json.dump(self.summary, f, indent=2)

def is_success(self):
"""Check if the query succeeded, queryStatus == Completed
"""
return self.summary['queryStatus'][0] == 'Completed'
28 changes: 12 additions & 16 deletions nds/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,12 @@ You may not use NDS except in compliance with the Apache License, Version 2.0 an

To help user run NDS, we provide a template to define the main Spark configs for spark-submit command.
User can use different templates to run NDS with different configurations for different environment.
We create [spark-submit-template](../shared/spark-submit-template), which accepts a template file and
We create [spark-submit-template](./spark-submit-template), which accepts a template file and
submit the Spark job with the configs defined in the template file.

Example command to submit via `spark-submit-template` utility:

```bash
cd shared
./spark-submit-template convert_submit_cpu.template \
nds_transcode.py raw_sf3k parquet_sf3k report.txt
```
Expand All @@ -74,7 +73,7 @@ For example, we provide below template files to run nds_transcode.py for differe
* `convert_submit_gpu.template` is for Spark GPU cluster

You need to choose one as your template file and modify it to fit your environment.
We define a [base.template](../shared/base.template) to help you define some basic variables for your envionment.
We define a [base.template](./base.template) to help you define some basic variables for your envionment.
And all the other templates will source `base.template` to get the basic variables.
When you hope to run multiple steps of NDS, you just need to modify `base.template` to fit for your cluster.

Expand Down Expand Up @@ -142,8 +141,8 @@ python nds_gen_data.py hdfs 100 100 /data/raw_sf100 --overwrite_output
### Convert CSV to Parquet or Other data sources
To do the data conversion, the `nds_transcode.py` need to be submitted as a Spark job. User can leverage
the [spark-submit-template](../shared/spark-submit-template) utility to simplify the submission.
The utility requires a pre-defined [template file](../shared/convert_submit_gpu.template) where user needs to put
the [spark-submit-template](./spark-submit-template) utility to simplify the submission.
The utility requires a pre-defined [template file](./convert_submit_gpu.template) where user needs to put
necessary Spark configurations. Either user can submit the `nds_transcode.py` directly to spark with
arbitrary Spark parameters.
Expand All @@ -153,7 +152,7 @@ Parquet, Orc, Avro, JSON and Iceberg are supported for output data format at pre
only Parquet and Orc are supported.
Note: when exporting data from CSV to Iceberg, user needs to set necessary configs for Iceberg in submit template.
e.g. [convert_submit_cpu_iceberg.template](../shared/convert_submit_cpu_iceberg.template)
e.g. [convert_submit_cpu_iceberg.template](./convert_submit_cpu_iceberg.template)
User can also specify `--tables` to convert specific table or tables. See argument details below.
Expand Down Expand Up @@ -217,7 +216,6 @@ optional arguments:
Example command to submit via `spark-submit-template` utility:
```bash
cd shared
./spark-submit-template convert_submit_gpu.template \
nds_transcode.py raw_sf3k parquet_sf3k report.txt
```
Expand Down Expand Up @@ -346,13 +344,12 @@ optional arguments:
Example command to submit nds_power.py by spark-submit-template utility:
```bash
cd shared
./spark-submit-template power_run_gpu.template \
../nds/nds_power.py \
../nds_power.py \
parquet_sf3k \
<query_stream_folder>/query_0.sql \
time.csv \
--property_file ../utils/properties/aqe-on.properties
--property_file ./properties/aqe-on.properties
```
User can also use `spark-submit` to submit `nds_power.py` directly.
Expand All @@ -361,16 +358,15 @@ To simplify the performance analysis process, the script will create a local CSV
Note the template file must follow the `spark-submit-template` utility as the _first_ argument.
All Spark configuration words (such as `--conf` and corresponding `k=v` values) are quoted by
double quotes in the template file. Please follow the format in [power_run_gpu.template](../shared/power_run_gpu.template).
double quotes in the template file. Please follow the format in [power_run_gpu.template](./power_run_gpu.template).
User can define the `properties` file like [aqe-on.properties](../utils/properties/aqe-on.properties). The properties will be passed to the submitted Spark job along with the configurations defined in the template file. User can define some common properties in the template file and put some other properties that usually varies in the property file.
User can define the `properties` file like [aqe-on.properties](./properties/aqe-on.properties). The properties will be passed to the submitted Spark job along with the configurations defined in the template file. User can define some common properties in the template file and put some other properties that usually varies in the property file.
The command above will use `collect()` action to trigger Spark job for each query. It is also supported to save query output to some place for further verification. User can also specify output format e.g. csv, parquet or orc:
```bash
cd shared
./spark-submit-template power_run_gpu.template \
../nds/nds_power.py \
./nds_power.py \
parquet_sf3k \
<query_stream_folder>/query_0.sql \
time.csv \
Expand All @@ -390,7 +386,7 @@ and _query_2.sql_ and produces csv log for execution time _time_1.csv_ and _time
```bash
./nds-throughput 1,2 \
../shared/spark-submit-template ../shared/power_run_gpu.template \
./spark-submit-template ./power_run_gpu.template \
nds_power.py \
parquet_sf3k \
./nds_query_streams/query_'{}'.sql \
Expand All @@ -409,7 +405,7 @@ update operations cannot be done atomically on raw Parquet/Orc files, so we use
[Iceberg](https://iceberg.apache.org/) as dataset metadata manager to overcome the issue.
Enabling Iceberg requires additional configuration. Please refer to [Iceberg Spark](https://iceberg.apache.org/docs/latest/getting-started/)
for details. We also provide a Spark submit template with necessary Iceberg configs: [maintenance_iceberg.template](../shared/maintenance_iceberg.template)
for details. We also provide a Spark submit template with necessary Iceberg configs: [maintenance_iceberg.template](./maintenance_iceberg.template)
The data maintenance queries are in [data_maintenance](./data_maintenance) folder. `DF_*.sql` are
DELETE queries while `LF_*.sql` are INSERT queries.
Expand Down
37 changes: 37 additions & 0 deletions nds/base.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# This is the base template file for the common information about test environment
# including the information about Spark, cluster configuration and the Jar files,
# which are required in the other templates.
# We'll source this base file in all the other templates so that we just need to update
# here once instead of updating in all the templates.
# If you have any different configuration in a specific template, you can override
# the variables in that template.

export SPARK_HOME=${SPARK_HOME:-/usr/lib/spark}
export SPARK_MASTER=${SPARK_MASTER:-yarn}
export DRIVER_MEMORY=${DRIVER_MEMORY:-10G}
export EXECUTOR_CORES=${EXECUTOR_CORES:-12}
export NUM_EXECUTORS=${NUM_EXECUTORS:-8}
export EXECUTOR_MEMORY=${EXECUTOR_MEMORY:-16G}

# The NDS listener jar which is built in jvm_listener directory.
export NDS_LISTENER_JAR=${NDS_LISTENER_JAR:-./jvm_listener/target/nds-benchmark-listener-1.0-SNAPSHOT.jar}
# The spark-rapids jar which is required when running on GPU
export SPARK_RAPIDS_PLUGIN_JAR=${SPARK_RAPIDS_PLUGIN_JAR:-rapids-4-spark_2.12-22.06.0.jar}
export PYTHONPATH=$SPARK_HOME/python:`echo $SPARK_HOME/python/lib/py4j-*.zip`
Loading

0 comments on commit 76647ce

Please sign in to comment.