Skip to content

Commit

Permalink
ColumnarBatch to CachedBatch and back (NVIDIA#1001)
Browse files Browse the repository at this point in the history
Write ColumnarBatch to CachedBatch and Read CachedBatch into
ColumnarBatch

Sign off empty-commit

Signed-off-by: Raza Jafri <rjafri@nvidia.com>
  • Loading branch information
razajafri authored Oct 27, 2020
1 parent 2b17a1e commit 4cdf62a
Show file tree
Hide file tree
Showing 3 changed files with 400 additions and 157 deletions.
22 changes: 22 additions & 0 deletions integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,25 @@ def test_cache_partial_load(data_gen, enableVectorizedConf):
.cache()
.limit(50).select(f.col("b")), enableVectorizedConf
)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('ts_write', ['TIMESTAMP_MICROS', 'TIMESTAMP_MILLIS'])
@pytest.mark.parametrize('enableVectorized', ['true', 'false'], ids=idfn)
@ignore_order
def test_cache_columnar(spark_tmp_path, data_gen, enableVectorized, ts_write):
data_path_gpu = spark_tmp_path + '/PARQUET_DATA'
def read_parquet_cached(data_path):
def write_read_parquet_cached(spark):
df = unary_op_df(spark, data_gen)
df.write.mode('overwrite').parquet(data_path)
cached = spark.read.parquet(data_path).cache()
cached.count()
return cached.select(f.col("a"))
return write_read_parquet_cached
# rapids-spark doesn't support LEGACY read for parquet
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED',
'spark.sql.legacy.parquet.datetimeRebaseModeInRead' : 'CORRECTED',
'spark.sql.inMemoryColumnarStorage.enableVectorizedReader' : enableVectorized,
'spark.sql.parquet.outputTimestampType': ts_write}

assert_gpu_and_cpu_are_equal_collect(read_parquet_cached(data_path_gpu), conf)
10 changes: 7 additions & 3 deletions jenkins/spark-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ $MVN_GET_CMD \
-DgroupId=com.nvidia -DartifactId=rapids-4-spark-integration-tests_$SCALA_BINARY_VER -Dversion=$PROJECT_VER -Dclassifier=pytest -Dpackaging=tar.gz

RAPIDS_INT_TESTS_HOME="$ARTF_ROOT/integration_tests/"
RAPDIS_INT_TESTS_TGZ="$ARTF_ROOT/rapids-4-spark-integration-tests_${SCALA_BINARY_VER}-$PROJECT_VER-pytest.tar.gz"
tar xzf "$RAPDIS_INT_TESTS_TGZ" -C $ARTF_ROOT && rm -f "$RAPDIS_INT_TESTS_TGZ"
RAPIDS_INT_TESTS_TGZ="$ARTF_ROOT/rapids-4-spark-integration-tests_${SCALA_BINARY_VER}-$PROJECT_VER-pytest.tar.gz"
tar xzf "$RAPIDS_INT_TESTS_TGZ" -C $ARTF_ROOT && rm -f "$RAPIDS_INT_TESTS_TGZ"

$MVN_GET_CMD \
-DgroupId=org.apache -DartifactId=spark -Dversion=$SPARK_VER -Dclassifier=bin-hadoop3.2 -Dpackaging=tgz
Expand All @@ -61,6 +61,10 @@ tar zxf $SPARK_HOME.tgz -C $ARTF_ROOT && \
PARQUET_PERF="$WORKSPACE/integration_tests/src/test/resources/parquet_perf"
PARQUET_ACQ="$WORKSPACE/integration_tests/src/test/resources/parquet_acq"
OUTPUT="$WORKSPACE/output"

# spark.sql.cache.serializer conf is ignored for versions prior to 3.1.0
SERIALIZER="--conf spark.sql.cache.serializer=com.nvidia.spark.rapids.shims.spark310.ParquetCachedBatchSerializer"

BASE_SPARK_SUBMIT_ARGS="--master spark://$HOSTNAME:7077 \
--executor-memory 12G \
--total-executor-cores 6 \
Expand Down Expand Up @@ -95,6 +99,6 @@ jps

echo "----------------------------START TEST------------------------------------"
rm -rf $OUTPUT
spark-submit $BASE_SPARK_SUBMIT_ARGS $MORTGAGE_SPARK_SUBMIT_ARGS $TEST_PARAMS
spark-submit $BASE_SPARK_SUBMIT_ARGS $SERIALIZER $MORTGAGE_SPARK_SUBMIT_ARGS $TEST_PARAMS
cd $RAPIDS_INT_TESTS_HOME && spark-submit $BASE_SPARK_SUBMIT_ARGS --jars $RAPIDS_TEST_JAR ./runtests.py -v -rfExXs --std_input_path="$WORKSPACE/integration_tests/src/test/resources/"
spark-submit $BASE_SPARK_SUBMIT_ARGS $CUDF_UDF_TEST_ARGS --jars $RAPIDS_TEST_JAR ./runtests.py -m "cudf_udf" -v -rfExXs --cudf_udf
Loading

0 comments on commit 4cdf62a

Please sign in to comment.