Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for MapType in selected operators #984

Merged
merged 20 commits into from
Oct 23, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import java.time.ZoneId

import scala.collection.JavaConverters._

import com.nvidia.spark.rapids.GpuOverrides.isSupportedType

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.spark300.RapidsShuffleManager

Expand Down Expand Up @@ -138,6 +140,11 @@ class Spark300Shims extends SparkShims {
GpuOverrides.exec[FileSourceScanExec](
"Reading data from files, often from Hive tables",
(fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) {
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, true) => true
revans2 marked this conversation as resolved.
Show resolved Hide resolved
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
// partition filters and data filters are not run on the GPU
override val childExprs: Seq[ExprMeta[_]] = Seq.empty

Expand Down Expand Up @@ -255,6 +262,11 @@ class Spark300Shims extends SparkShims {
conf,
conf.isParquetMultiThreadReadEnabled)
}
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, true) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
}),
GpuOverrides.scan[OrcScan](
"ORC parsing",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.nvidia.spark.rapids.shims.spark300db

import java.time.ZoneId

import com.nvidia.spark.rapids.GpuOverrides.isSupportedType
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.shims.spark300.Spark300Shims
import org.apache.spark.sql.rapids.shims.spark300db._
Expand Down Expand Up @@ -88,6 +89,11 @@ class Spark300dbShims extends Spark300Shims {
GpuOverrides.exec[FileSourceScanExec](
"Reading data from files, often from Hive tables",
(fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) {
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, true) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
// partition filters and data filters are not run on the GPU
override val childExprs: Seq[ExprMeta[_]] = Seq.empty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.time.ZoneId
import scala.collection.JavaConverters._

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.GpuOverrides.isSupportedType
import com.nvidia.spark.rapids.shims.spark301.Spark301Shims
import com.nvidia.spark.rapids.spark310.RapidsShuffleManager

Expand Down Expand Up @@ -196,6 +197,11 @@ class Spark310Shims extends Spark301Shims {
conf,
conf.isParquetMultiThreadReadEnabled)
}
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, true) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
}),
GpuOverrides.scan[OrcScan](
"ORC parsing",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import ai.rapids.cudf.Scalar;
import ai.rapids.cudf.Schema;
import ai.rapids.cudf.Table;

import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.util.Arrays;
import java.util.List;

/**
Expand All @@ -39,7 +41,7 @@
public class GpuColumnVector extends GpuColumnVectorBase {

public static final class GpuColumnarBatchBuilder implements AutoCloseable {
private final ai.rapids.cudf.HostColumnVector.Builder[] builders;
private final ai.rapids.cudf.HostColumnVector.ColumnBuilder[] builders;
private final StructField[] fields;

/**
Expand All @@ -54,7 +56,7 @@ public static final class GpuColumnarBatchBuilder implements AutoCloseable {
public GpuColumnarBatchBuilder(StructType schema, int rows, ColumnarBatch batch) {
fields = schema.fields();
int len = fields.length;
builders = new ai.rapids.cudf.HostColumnVector.Builder[len];
builders = new ai.rapids.cudf.HostColumnVector.ColumnBuilder[len];
boolean success = false;
try {
for (int i = 0; i < len; i++) {
Expand All @@ -68,31 +70,41 @@ public GpuColumnarBatchBuilder(StructType schema, int rows, ColumnarBatch batch)
if (batch != null) {
ColumnVector cv = batch.column(i);
if (cv instanceof WritableColumnVector) {
WritableColumnVector wcv = (WritableColumnVector)cv;
WritableColumnVector wcv = (WritableColumnVector) cv;
if (!wcv.hasDictionary()) {
bufferSize = wcv.getArrayOffset(rows-1) +
bufferSize = wcv.getArrayOffset(rows - 1) +
wcv.getArrayLength(rows - 1);
}
}
}
builders[i] = ai.rapids.cudf.HostColumnVector.builder(rows, bufferSize);
builders[i] = new ai.rapids.cudf.HostColumnVector.ColumnBuilder(new HostColumnVector.BasicType(true, DType.STRING), rows);
} else if (type == DType.LIST) {
revans2 marked this conversation as resolved.
Show resolved Hide resolved
System.out.println("MAIN ROWS=" + rows);
builders[i] = new ai.rapids.cudf.HostColumnVector.ColumnBuilder(new HostColumnVector.ListType(true,
new HostColumnVector.StructType(true, Arrays.asList(
new HostColumnVector.BasicType(true, DType.STRING),
new HostColumnVector.BasicType(true, DType.STRING)))), rows);
} else {
revans2 marked this conversation as resolved.
Show resolved Hide resolved
builders[i] = ai.rapids.cudf.HostColumnVector.builder(type, rows);
builders[i] = new ai.rapids.cudf.HostColumnVector.ColumnBuilder(new HostColumnVector.BasicType(true, type), rows);
}
success = true;
}
} finally {
if (!success) {
for (ai.rapids.cudf.HostColumnVector.Builder b: builders) {
for (ai.rapids.cudf.HostColumnVector.ColumnBuilder b: builders) {
if (b != null) {
b.close();
try {
b.close();
} catch (Exception e) {
revans2 marked this conversation as resolved.
Show resolved Hide resolved
e.printStackTrace();
}
}
}
}
}
}

public ai.rapids.cudf.HostColumnVector.Builder builder(int i) {
public ai.rapids.cudf.HostColumnVector.ColumnBuilder builder(int i) {
return builders[i];
}

Expand Down Expand Up @@ -121,9 +133,13 @@ public ColumnarBatch build(int rows) {

@Override
public void close() {
for (ai.rapids.cudf.HostColumnVector.Builder b: builders) {
for (ai.rapids.cudf.HostColumnVector.ColumnBuilder b: builders) {
if (b != null) {
b.close();
try {
b.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
Expand All @@ -150,6 +166,8 @@ private static final DType toRapidsOrNull(DataType type) {
return DType.TIMESTAMP_MICROSECONDS;
} else if (type instanceof StringType) {
return DType.STRING;
} else if (type instanceof MapType) {
return DType.LIST;
revans2 marked this conversation as resolved.
Show resolved Hide resolved
}
return null;
}
Expand Down Expand Up @@ -202,7 +220,21 @@ protected static final <T> DataType getSparkTypeFrom(ColumnViewAccess<T> access)
DType type = access.getDataType();
if (type == DType.LIST) {
try (ColumnViewAccess<T> child = access.getChildColumnViewAccess(0)) {
return new ArrayType(getSparkTypeFrom(child), true);
DataType ret;
revans2 marked this conversation as resolved.
Show resolved Hide resolved
if (child.getDataType() == DType.STRUCT) {
try (ColumnViewAccess<T> firstChild = child.getChildColumnViewAccess(0);
ColumnViewAccess<T> secondChild = child.getChildColumnViewAccess(1)) {
if (firstChild.getDataType() == DType.STRING && secondChild.getDataType() == DType.STRING) {
ret = new MapType(DataTypes.StringType, DataTypes.StringType, true);
} else {
throw new IllegalStateException("Maps with non string type fields is not" +
"supported and something has gone wrong!");
}
}
} else {
ret = new ArrayType(getSparkTypeFrom(child), true);
}
return ret;
}
} else {
return getSparkType(type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package com.nvidia.spark.rapids;

import ai.rapids.cudf.DeviceMemoryBuffer;
import ai.rapids.cudf.DType;
import ai.rapids.cudf.HostMemoryBuffer;

import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.vectorized.ColumnVector;
Expand All @@ -25,6 +29,8 @@
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.UTF8String;

import java.util.Optional;

/**
* A GPU accelerated version of the Spark ColumnVector.
* Most of the standard Spark APIs should never be called, as they assume that the data
Expand Down Expand Up @@ -143,7 +149,62 @@ public ColumnarArray getArray(int rowId) {

@Override
public ColumnarMap getMap(int ordinal) {
throw new IllegalStateException("Maps are currently not supported by rapids cudf");
ai.rapids.cudf.ColumnViewAccess<HostMemoryBuffer> structHcv = cudfCv.getChildColumnViewAccess(0);
revans2 marked this conversation as resolved.
Show resolved Hide resolved
// keys
ai.rapids.cudf.ColumnViewAccess<HostMemoryBuffer> firstHcv = structHcv.getChildColumnViewAccess(0);
// values
ai.rapids.cudf.ColumnViewAccess<HostMemoryBuffer> secondHcv = structHcv.getChildColumnViewAccess(1);

//first keys column get all buffers
DeviceMemoryBuffer firstDevData = null;
revans2 marked this conversation as resolved.
Show resolved Hide resolved
DeviceMemoryBuffer firstDevOffset = null;
DeviceMemoryBuffer firstDevValid = null;
HostMemoryBuffer firstData = firstHcv.getDataBuffer();
HostMemoryBuffer firstOffset = firstHcv.getOffsetBuffer();
HostMemoryBuffer firstValid = firstHcv.getValidityBuffer();
if (firstData != null) {
firstDevData = DeviceMemoryBuffer.allocate(firstData.getLength());
firstDevData.copyFromHostBuffer(0, firstData, 0, firstData.getLength());
}
if (firstOffset != null) {
firstDevOffset = DeviceMemoryBuffer.allocate(firstOffset.getLength());
firstDevOffset.copyFromHostBuffer(0, firstOffset, 0, firstOffset.getLength());
}
if (firstValid != null) {
firstDevValid = DeviceMemoryBuffer.allocate(firstValid.getLength());
firstDevValid.copyFromHostBuffer(0, firstValid, 0, firstValid.getLength());
}
//second values column get all buffers
DeviceMemoryBuffer secondDevData = null;
DeviceMemoryBuffer secondDevOffset = null;
DeviceMemoryBuffer secondDevValid = null;
HostMemoryBuffer secondData = secondHcv.getDataBuffer();
HostMemoryBuffer secondOffset = secondHcv.getOffsetBuffer();
HostMemoryBuffer secondValid = secondHcv.getValidityBuffer();
if (secondData != null) {
secondDevData = DeviceMemoryBuffer.allocate(secondData.getLength());
secondDevData.copyFromHostBuffer(0, secondData, 0, secondData.getLength());
}
if (secondOffset != null) {
secondDevOffset = DeviceMemoryBuffer.allocate(secondOffset.getLength());
secondDevOffset.copyFromHostBuffer(0, secondOffset, 0, secondOffset.getLength());
}
if (secondValid != null) {
secondDevValid = DeviceMemoryBuffer.allocate(secondValid.getLength());
secondDevValid.copyFromHostBuffer(0, secondValid, 0, secondValid.getLength());
}

ai.rapids.cudf.ColumnVector firstDevCv = new ai.rapids.cudf.ColumnVector(firstHcv.getDataType(),
firstHcv.getRowCount(), Optional.of(firstHcv.getNullCount()),
firstDevData, firstDevValid, firstDevOffset);
ai.rapids.cudf.ColumnVector secondDevCv = new ai.rapids.cudf.ColumnVector(secondHcv.getDataType(),
secondHcv.getRowCount(), Optional.of(secondHcv.getNullCount()),
secondDevData, secondDevValid, secondDevOffset);
GpuColumnVector finFirstCv = GpuColumnVector.from(firstDevCv);
GpuColumnVector finSecondCv = GpuColumnVector.from(secondDevCv);
//TODO: test more that offset and len are right
return new ColumnarMap(finFirstCv.copyToHost(),finSecondCv.copyToHost(),
ordinal* DType.INT32.getSizeInBytes(), (ordinal + 1)* DType.INT32.getSizeInBytes());
}
revans2 marked this conversation as resolved.
Show resolved Hide resolved

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,12 +520,22 @@ object GpuOverrides {
expr[Alias](
"Gives a column a name",
(a, conf, p, r) => new UnaryExprMeta[Alias](a, conf, p, r) {
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, true) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
override def convertToGpu(child: Expression): GpuExpression =
GpuAlias(child, a.name)(a.exprId, a.qualifier, a.explicitMetadata)
}),
expr[AttributeReference](
"References an input column",
(att, conf, p, r) => new BaseExprMeta[AttributeReference](att, conf, p, r) {
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, true) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
// This is the only NOOP operator. It goes away when things are bound
override def convertToGpu(): Expression = att

Expand Down Expand Up @@ -722,6 +732,11 @@ object GpuOverrides {
expr[IsNotNull](
"Checks if a value is not null",
(a, conf, p, r) => new UnaryExprMeta[IsNotNull](a, conf, p, r) {
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, true) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
override def convertToGpu(child: Expression): GpuExpression = GpuIsNotNull(child)
}),
expr[IsNaN](
Expand Down Expand Up @@ -1084,6 +1099,11 @@ object GpuOverrides {
expr[EqualTo](
"Check if the values are equal",
(a, conf, p, r) => new BinaryExprMeta[EqualTo](a, conf, p, r) {
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, true) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression =
GpuEqualTo(lhs, rhs)
}),
Expand Down Expand Up @@ -1419,6 +1439,9 @@ object GpuOverrides {
expr[GetArrayItem](
"Gets the field at `ordinal` in the Array",
(in, conf, p, r) => new GpuGetArrayItemMeta(in, conf, p, r)),
expr[GetMapValue](
"Gets Value from a Map based on a key",
(in, conf, p, r) => new GpuGetMapValueMeta(in, conf, p, r)),
expr[StringLocate](
"Substring search operator",
(in, conf, p, r) => new TernaryExprMeta[StringLocate](in, conf, p, r) {
Expand Down Expand Up @@ -1692,6 +1715,11 @@ object GpuOverrides {
"The backend for most select, withColumn and dropColumn statements",
(proj, conf, p, r) => {
new SparkPlanMeta[ProjectExec](proj, conf, p, r) {
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, true) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
override def convertToGpu(): GpuExec =
GpuProjectExec(childExprs.map(_.convertToGpu()), childPlans(0).convertIfNeeded())
}
Expand Down Expand Up @@ -1774,6 +1802,11 @@ object GpuOverrides {
exec[FilterExec](
"The backend for most filter statements",
(filter, conf, p, r) => new SparkPlanMeta[FilterExec](filter, conf, p, r) {
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, true) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
override def convertToGpu(): GpuExec =
GpuFilterExec(childExprs(0).convertToGpu(), childPlans(0).convertIfNeeded())
}),
Expand Down
Loading