Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in some basic support for Structs #1131

Merged
merged 3 commits into from
Nov 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions integration_tests/src/main/python/cmp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,27 +117,27 @@ def test_isnan(data_gen):
lambda spark : unary_op_df(spark, data_gen).select(
f.isnan(f.col('a'))))

@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample, ids=idfn)
@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample + struct_gens_sample, ids=idfn)
def test_dropna_any(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).dropna())

@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample, ids=idfn)
@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample + struct_gens_sample, ids=idfn)
def test_dropna_all(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).dropna(how='all'))

#dropna is really a filter along with a test for null, but lets do an explicit filter test too
@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample, ids=idfn)
@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample + struct_gens_sample, ids=idfn)
def test_filter(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : three_col_df(spark, BooleanGen(), data_gen, data_gen).filter(f.col('a')))

# coalesce batch happens after a filter, but only if something else happens on the GPU after that
@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample, ids=idfn)
@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample + struct_gens_sample, ids=idfn)
def test_filter_with_project(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : three_col_df(spark, BooleanGen(), data_gen, data_gen).filter(f.col('a')).selectExpr('*', 'a as a2'))
lambda spark : two_col_df(spark, BooleanGen(), data_gen).filter(f.col('a')).selectExpr('*', 'a as a2'))

@pytest.mark.parametrize('expr', [f.lit(True), f.lit(False), f.lit(None).cast('boolean')], ids=idfn)
def test_filter_with_lit(expr):
Expand Down
12 changes: 11 additions & 1 deletion integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,17 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False):
# Be careful to not make these too large of data generation takes for ever
# This is only a few nested array gens, because nesting can be very deep
nested_array_gens_sample = [ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10),
ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10)]
ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10),
ArrayGen(StructGen([['child0', byte_gen], ['child1', string_gen], ['child2', float_gen]]))]

# Some array gens, but not all because of nesting
array_gens_sample = single_level_array_gens + nested_array_gens_sample

# all of the basic types in a single struct
all_basic_struct_gen = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(all_basic_gens)])

# Some struct gens, but not all because of nesting
struct_gens_sample = [all_basic_struct_gen,
StructGen([['child0', byte_gen]]),
StructGen([['child0', ArrayGen(short_gen)], ['child1', double_gen]])]

3 changes: 2 additions & 1 deletion integration_tests/src/main/python/row_conversion_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def test_row_conversions():
["e", float_gen], ["f", double_gen], ["g", string_gen], ["h", boolean_gen],
["i", timestamp_gen], ["j", date_gen], ["k", ArrayGen(byte_gen)],
["l", ArrayGen(string_gen)], ["m", ArrayGen(float_gen)],
["n", ArrayGen(boolean_gen)], ["o", ArrayGen(ArrayGen(short_gen))]]
["n", ArrayGen(boolean_gen)], ["o", ArrayGen(ArrayGen(short_gen))],
["p", StructGen([["c0", byte_gen], ["c1", ArrayGen(byte_gen)]])]]
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, gens).selectExpr("*", "a as a_again"))
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ private static HostColumnVector.DataType convertFrom(DataType spark, boolean nul
convertFrom(mapType.keyType(), false),
convertFrom(mapType.valueType(), mapType.valueContainsNull())
)));
} else if (spark instanceof StructType) {
StructType stType = (StructType) spark;
HostColumnVector.DataType[] children = new HostColumnVector.DataType[stType.size()];
StructField[] fields = stType.fields();
for (int i = 0; i < children.length; i++) {
children[i] = convertFrom(fields[i].dataType(), fields[i].nullable());
}
return new HostColumnVector.StructType(nullable, children);
} else {
// Only works for basic types
return new HostColumnVector.BasicType(nullable, getRapidsType(spark));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarMap;
Expand Down Expand Up @@ -174,7 +176,15 @@ public final byte[] getBinary(int rowId) {

@Override
public final ColumnVector getChild(int ordinal) {
throw new IllegalStateException("Struct and struct like types are currently not supported by rapids cudf");
if (cachedChildren[ordinal] == null) {
StructType st = (StructType) dataType();
StructField[] fields = st.fields();
for (int i = 0; i < fields.length; i++) {
HostColumnVectorCore tmp = (HostColumnVectorCore) cudfCv.getChildColumnViewAccess(i);
cachedChildren[i] = new RapidsHostColumnVectorCore(fields[i].dataType(), tmp);
}
}
return cachedChildren[ordinal];
}

public HostColumnVectorCore getBase() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,7 @@ object GpuOverrides {
GpuOverrides.isSupportedType(t,
allowStringMaps = true,
allowArray = true,
allowStruct = true,
allowNesting = true)

override def convertToGpu(child: Expression): GpuExpression =
Expand All @@ -624,6 +625,7 @@ object GpuOverrides {
GpuOverrides.isSupportedType(t,
allowStringMaps = true,
allowArray = true,
allowStruct = true,
allowNesting = true)

// This is the only NOOP operator. It goes away when things are bound
Expand Down Expand Up @@ -820,6 +822,7 @@ object GpuOverrides {
GpuOverrides.isSupportedType(t,
allowStringMaps = true,
allowArray = true,
allowStruct = true,
allowNesting = true)

override def convertToGpu(child: Expression): GpuExpression = GpuIsNull(child)
Expand All @@ -831,6 +834,7 @@ object GpuOverrides {
GpuOverrides.isSupportedType(t,
allowStringMaps = true,
allowArray = true,
allowStruct = true,
allowNesting = true)

override def convertToGpu(child: Expression): GpuExpression = GpuIsNotNull(child)
Expand Down Expand Up @@ -859,7 +863,10 @@ object GpuOverrides {
.map(GpuOverrides.wrapExpr(_, conf, Some(this)))

override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowArray = true, allowNesting = true)
GpuOverrides.isSupportedType(t,
allowArray = true,
allowStruct = true,
allowNesting = true)

def convertToGpu(): GpuExpression = {
GpuAtLeastNNonNulls(a.n, childExprs.map(_.convertToGpu()))
Expand Down Expand Up @@ -1825,6 +1832,7 @@ object GpuOverrides {
GpuOverrides.isSupportedType(t,
allowStringMaps = true,
allowArray = true,
allowStruct = true,
allowNesting = true)

override def convertToGpu(): GpuExec =
Expand Down Expand Up @@ -1913,6 +1921,7 @@ object GpuOverrides {
GpuOverrides.isSupportedType(t,
allowStringMaps = true,
allowArray = true,
allowStruct = true,
allowNesting = true)

override def convertToGpu(): GpuExec =
Expand Down
Loading