Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add command-line interface for TPC-* for use with spark-submit #823

Merged
merged 3 commits into from
Sep 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions integration_tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.test.version}</version>
</dependency>
<dependency>
<groupId>org.rogach</groupId>
<artifactId>scallop_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.sql.execution.{InputAdapter, QueryExecution, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.util.QueryExecutionListener

object BenchUtils {
Expand Down Expand Up @@ -69,7 +70,7 @@ object BenchUtils {
runBench(
spark,
createDataFrame,
WriteParquet(path, mode, writeOptions),
WriteCsv(path, mode, writeOptions),
queryDescription,
filenameStub,
iterations,
Expand Down Expand Up @@ -399,6 +400,12 @@ object BenchUtils {

if (count1 == count2) {
println(s"Both DataFrames contain $count1 rows")

if (!ignoreOrdering && (df1.rdd.getNumPartitions > 1 || df2.rdd.getNumPartitions > 1)) {
throw new IllegalStateException("Cannot run with ignoreOrdering=false because one or " +
"more inputs have multiple partitions")
}

val result1 = collectResults(df1, ignoreOrdering, useIterator)
val result2 = collectResults(df2, ignoreOrdering, useIterator)

Expand Down Expand Up @@ -435,7 +442,11 @@ object BenchUtils {
// apply sorting if specified
val resultDf = if (ignoreOrdering) {
// let Spark do the sorting
df.sort(df.columns.map(col): _*)
val nonFloatCols = df.schema.fields
.filter(field => !(field.dataType == DataTypes.FloatType ||
field.dataType == DataTypes.DoubleType))
.map(field => col(field.name))
df.sort(nonFloatCols: _*)
} else {
df
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids.tests.common

import org.rogach.scallop.ScallopConf

import org.apache.spark.sql.SparkSession

/**
* Utility for comparing two csv or parquet files, such as the output from a benchmark, to
* verify that they match, allowing for differences in precision.
*
* This utility is intended to be run via spark-submit.
*
* Example usage:
*
* <pre>
* $SPARK_HOME/bin/spark-submit --jars $SPARK_RAPIDS_PLUGIN_JAR,$CUDF_JAR \
* --master local[*] \
* --class com.nvidia.spark.rapids.tests.common.CompareResults \
* $SPARK_RAPIDS_PLUGIN_INTEGRATION_TEST_JAR \
* --input1 /path/to/result1 \
* --input2 /path/to/result2 \
* --input-format parquet
* </pre>
*/
object CompareResults {
def main(arg: Array[String]): Unit = {
val conf = new Conf(arg)

val spark = SparkSession.builder.appName("CompareResults").getOrCreate()

val (df1, df2) = conf.inputFormat() match {
case "csv" =>
(spark.read.csv(conf.input1()), spark.read.csv(conf.input2()))
case "parquet" =>
(spark.read.parquet(conf.input1()), spark.read.parquet(conf.input2()))
}

BenchUtils.compareResults(
df1,
df2,
conf.ignoreOrdering(),
conf.useIterator(),
conf.maxErrors(),
conf.epsilon())
}
}

class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
/** Path to first data set */
val input1 = opt[String](required = true)
/** Path to second data set */
val input2 = opt[String](required = true)
/** Input format (csv or parquet) */
val inputFormat = opt[String](required = true)
/** Sort the data collected from the DataFrames before comparing them. */
val ignoreOrdering = opt[Boolean](required = false, default = Some(false))
/**
* When set to true, use `toLocalIterator` to load one partition at a time into driver memory,
* reducing memory usage at the cost of performance because processing will be single-threaded.
*/
val useIterator = opt[Boolean](required = false, default = Some(false))
/** Maximum number of differences to report */
val maxErrors = opt[Int](required = false, default = Some(10))
/** Allow for differences in precision when comparing floating point values */
val epsilon = opt[Double](required = false, default = Some(0.00001))
verify()
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package com.nvidia.spark.rapids.tests.tpcds

import com.nvidia.spark.rapids.tests.common.BenchUtils
import org.rogach.scallop.ScallopConf

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.{SaveMode, SparkSession}

object TpcdsLikeBench extends Logging {

Expand All @@ -32,6 +33,8 @@ object TpcdsLikeBench extends Logging {
* @param spark The Spark session
* @param query The name of the query to run e.g. "q5"
* @param iterations The number of times to run the query.
* @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to
* call `unregisterShuffle`
*/
def collect(
spark: SparkSession,
Expand All @@ -55,7 +58,12 @@ object TpcdsLikeBench extends Logging {
*
* @param spark The Spark session
* @param query The name of the query to run e.g. "q5"
* @param path The path to write the results to
* @param mode The SaveMode to use when writing the results
* @param writeOptions Write options
* @param iterations The number of times to run the query.
* @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to
* call `unregisterShuffle`
*/
def writeCsv(
spark: SparkSession,
Expand Down Expand Up @@ -85,7 +93,12 @@ object TpcdsLikeBench extends Logging {
*
* @param spark The Spark session
* @param query The name of the query to run e.g. "q5"
* @param iterations The number of times to run the query.
* @param path The path to write the results to
* @param mode The SaveMode to use when writing the results
* @param writeOptions Write options
* @param iterations The number of times to run the query
* @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to
* call `unregisterShuffle`
*/
def writeParquet(
spark: SparkSession,
Expand All @@ -111,15 +124,49 @@ object TpcdsLikeBench extends Logging {
* The main method can be invoked by using spark-submit.
*/
def main(args: Array[String]): Unit = {
val input = args(0)
val conf = new Conf(args)

val spark = SparkSession.builder.appName("TPC-DS Like Bench").getOrCreate()
TpcdsLikeSpark.setupAllParquet(spark, input)

args.drop(1).foreach(query => {
println(s"*** RUNNING TPC-DS QUERY $query")
collect(spark, query)
})
conf.inputFormat().toLowerCase match {
case "parquet" => TpcdsLikeSpark.setupAllParquet(spark, conf.input())
case "csv" => TpcdsLikeSpark.setupAllCSV(spark, conf.input())
case other =>
println(s"Invalid input format: $other")
System.exit(-1)
}

println(s"*** RUNNING TPC-DS QUERY ${conf.query()}")
conf.output.toOption match {
case Some(path) => conf.outputFormat().toLowerCase match {
case "parquet" =>
writeParquet(
spark,
conf.query(),
path,
iterations = conf.iterations())
case "csv" =>
writeCsv(
spark,
conf.query(),
path,
iterations = conf.iterations())
case _ =>
println("Invalid or unspecified output format")
System.exit(-1)
}
case _ =>
collect(spark, conf.query(), conf.iterations())
}
}
}

class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val input = opt[String](required = true)
val inputFormat = opt[String](required = true)
val query = opt[String](required = true)
val iterations = opt[Int](default = Some(3))
val output = opt[String](required = false)
val outputFormat = opt[String](required = false)
verify()
}

This file was deleted.

Loading