Skip to content

Commit

Permalink
Add command-line interface to benchmarks
Browse files Browse the repository at this point in the history
Signed-off-by: Andy Grove <andygrove@nvidia.com>
  • Loading branch information
andygrove committed Sep 28, 2020
1 parent f5482aa commit f1f79d7
Show file tree
Hide file tree
Showing 8 changed files with 431 additions and 126 deletions.
24 changes: 24 additions & 0 deletions integration_tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.test.version}</version>
</dependency>
<dependency>
<groupId>org.rogach</groupId>
<artifactId>scallop_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -122,6 +126,26 @@
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>

<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>

<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>

</plugin>
<!-- disable surefire as we are using scalatest only -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.sql.execution.{InputAdapter, QueryExecution, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.util.QueryExecutionListener

object BenchUtils {
Expand Down Expand Up @@ -69,7 +70,7 @@ object BenchUtils {
runBench(
spark,
createDataFrame,
WriteParquet(path, mode, writeOptions),
WriteCsv(path, mode, writeOptions),
queryDescription,
filenameStub,
iterations,
Expand Down Expand Up @@ -399,6 +400,12 @@ object BenchUtils {

if (count1 == count2) {
println(s"Both DataFrames contain $count1 rows")

if (!ignoreOrdering && (df1.rdd.getNumPartitions > 1 || df2.rdd.getNumPartitions > 1)) {
throw new IllegalStateException("Cannot run with ignoreOrdering=false because one or " +
"more inputs have multiple partitions")
}

val result1 = collectResults(df1, ignoreOrdering, useIterator)
val result2 = collectResults(df2, ignoreOrdering, useIterator)

Expand Down Expand Up @@ -435,7 +442,11 @@ object BenchUtils {
// apply sorting if specified
val resultDf = if (ignoreOrdering) {
// let Spark do the sorting
df.sort(df.columns.map(col): _*)
val nonFloatCols = df.schema.fields
.filter(field => !(field.dataType == DataTypes.FloatType ||
field.dataType == DataTypes.DoubleType))
.map(field => col(field.name))
df.sort(nonFloatCols: _*)
} else {
df
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nvidia.spark.rapids.tests.common

import org.rogach.scallop.ScallopConf

import org.apache.spark.sql.SparkSession

/**
* Utility for comparing two csv or parquet files, such as the output from a benchmark, to
* verify that they match, allowing for differences in precision.
*
* This utility is intended to be run via spark-submit.
*
* Example usage:
*
* <pre>
* $SPARK_HOME/bin/spark-submit --jars $SPARK_RAPIDS_PLUGIN_JAR,$CUDF_JAR \
* --master local[*] \
* --class com.nvidia.spark.rapids.tests.common.CompareResults \
* $SPARK_RAPIDS_PLUGIN_INTEGRATION_TEST_JAR \
* --input1 /path/to/result1 \
* --input2 /path/to/result2 \
* --input-format parquet
* </pre>
*/
object CompareResults {
def main(arg: Array[String]): Unit = {
val conf = new Conf(arg)

val spark = SparkSession.builder.appName("CompareResults").getOrCreate()

val (df1, df2) = conf.inputFormat() match {
case "csv" =>
(spark.read.csv(conf.input1()), spark.read.csv(conf.input2()))
case "parquet" =>
(spark.read.parquet(conf.input1()), spark.read.parquet(conf.input2()))
}

BenchUtils.compareResults(
df1,
df2,
conf.ignoreOrdering(),
conf.useIterator(),
conf.maxErrors(),
conf.epsilon())
}
}

class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
/** Path to first data set */
val input1 = opt[String](required = true)
/** Path to second data set */
val input2 = opt[String](required = true)
/** Input format (csv or parquet) */
val inputFormat = opt[String](required = true)
/** Sort the data collected from the DataFrames before comparing them. */
val ignoreOrdering = opt[Boolean](required = false, default = Some(false))
/**
* When set to true, use `toLocalIterator` to load one partition at a time into driver memory,
* reducing memory usage at the cost of performance because processing will be single-threaded.
*/
val useIterator = opt[Boolean](required = false, default = Some(false))
/** Maximum number of differences to report */
val maxErrors = opt[Int](required = false, default = Some(10))
/** Allow for differences in precision when comparing floating point values */
val epsilon = opt[Double](required = false, default = Some(0.00001))
verify()
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package com.nvidia.spark.rapids.tests.tpcds

import com.nvidia.spark.rapids.tests.common.BenchUtils
import org.rogach.scallop.ScallopConf

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.{SaveMode, SparkSession}

object TpcdsLikeBench extends Logging {

Expand All @@ -32,6 +33,8 @@ object TpcdsLikeBench extends Logging {
* @param spark The Spark session
* @param query The name of the query to run e.g. "q5"
* @param iterations The number of times to run the query.
* @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to
* call `unregisterShuffle`
*/
def collect(
spark: SparkSession,
Expand All @@ -55,7 +58,12 @@ object TpcdsLikeBench extends Logging {
*
* @param spark The Spark session
* @param query The name of the query to run e.g. "q5"
* @param path The path to write the results to
* @param mode The SaveMode to use when writing the results
* @param writeOptions Write options
* @param iterations The number of times to run the query.
* @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to
* call `unregisterShuffle`
*/
def writeCsv(
spark: SparkSession,
Expand Down Expand Up @@ -85,7 +93,12 @@ object TpcdsLikeBench extends Logging {
*
* @param spark The Spark session
* @param query The name of the query to run e.g. "q5"
* @param iterations The number of times to run the query.
* @param path The path to write the results to
* @param mode The SaveMode to use when writing the results
* @param writeOptions Write options
* @param iterations The number of times to run the query
* @param gcBetweenRuns Whether to call `System.gc` between iterations to cause Spark to
* call `unregisterShuffle`
*/
def writeParquet(
spark: SparkSession,
Expand All @@ -111,15 +124,49 @@ object TpcdsLikeBench extends Logging {
* The main method can be invoked by using spark-submit.
*/
def main(args: Array[String]): Unit = {
val input = args(0)
val conf = new Conf(args)

val spark = SparkSession.builder.appName("TPC-DS Like Bench").getOrCreate()
TpcdsLikeSpark.setupAllParquet(spark, input)

args.drop(1).foreach(query => {
println(s"*** RUNNING TPC-DS QUERY $query")
collect(spark, query)
})
conf.inputFormat().toLowerCase match {
case "parquet" => TpcdsLikeSpark.setupAllParquet(spark, conf.input())
case "csv" => TpcdsLikeSpark.setupAllCSV(spark, conf.input())
case other =>
println(s"Invalid input format: $other")
System.exit(-1)
}

println(s"*** RUNNING TPC-DS QUERY ${conf.query()}")
conf.output.toOption match {
case Some(path) => conf.outputFormat().toLowerCase match {
case "parquet" =>
writeParquet(
spark,
conf.query(),
path,
iterations = conf.iterations())
case "csv" =>
writeCsv(
spark,
conf.query(),
path,
iterations = conf.iterations())
case _ =>
println("Invalid or unspecified output format")
System.exit(-1)
}
case _ =>
collect(spark, conf.query(), conf.iterations())
}
}
}

class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val input = opt[String](required = true)
val inputFormat = opt[String](required = true)
val query = opt[String](required = true)
val iterations = opt[Int](default = Some(3))
val output = opt[String](required = false)
val outputFormat = opt[String](required = false)
verify()
}

This file was deleted.

Loading

0 comments on commit f1f79d7

Please sign in to comment.