Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for MapType in selected operators #984

Merged
merged 20 commits into from
Oct 23, 2020
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.Floor"></a>spark.rapids.sql.expression.Floor|`floor`|Floor of a number|true|None|
<a name="sql.expression.FromUnixTime"></a>spark.rapids.sql.expression.FromUnixTime|`from_unixtime`|Get the string from a unix timestamp|true|None|
<a name="sql.expression.GetArrayItem"></a>spark.rapids.sql.expression.GetArrayItem| |Gets the field at `ordinal` in the Array|true|None|
<a name="sql.expression.GetMapValue"></a>spark.rapids.sql.expression.GetMapValue| |Gets Value from a Map based on a key|true|None|
<a name="sql.expression.GreaterThan"></a>spark.rapids.sql.expression.GreaterThan|`>`|> operator|true|None|
<a name="sql.expression.GreaterThanOrEqual"></a>spark.rapids.sql.expression.GreaterThanOrEqual|`>=`|>= operator|true|None|
<a name="sql.expression.Hour"></a>spark.rapids.sql.expression.Hour|`hour`|Returns the hour component of the string/timestamp|true|None|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ package com.nvidia.spark.rapids.shims.spark300

import java.time.ZoneId

import scala.collection.JavaConverters._

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.GpuOverrides.isSupportedType
import com.nvidia.spark.rapids.spark300.RapidsShuffleManager

import org.apache.spark.SparkEnv
Expand Down Expand Up @@ -138,6 +137,11 @@ class Spark300Shims extends SparkShims {
GpuOverrides.exec[FileSourceScanExec](
"Reading data from files, often from Hive tables",
(fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) {
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, _) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
// partition filters and data filters are not run on the GPU
override val childExprs: Seq[ExprMeta[_]] = Seq.empty

Expand Down Expand Up @@ -255,6 +259,11 @@ class Spark300Shims extends SparkShims {
conf,
conf.isParquetMultiThreadReadEnabled)
}
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, _) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
}),
GpuOverrides.scan[OrcScan](
"ORC parsing",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.nvidia.spark.rapids.shims.spark300db

import java.time.ZoneId

import com.nvidia.spark.rapids.GpuOverrides.isSupportedType
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.shims.spark300.Spark300Shims
import org.apache.spark.sql.rapids.shims.spark300db._
Expand Down Expand Up @@ -88,6 +89,11 @@ class Spark300dbShims extends Spark300Shims {
GpuOverrides.exec[FileSourceScanExec](
"Reading data from files, often from Hive tables",
(fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) {
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, _) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
// partition filters and data filters are not run on the GPU
override val childExprs: Seq[ExprMeta[_]] = Seq.empty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.time.ZoneId
import scala.collection.JavaConverters._

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.GpuOverrides.isSupportedType
import com.nvidia.spark.rapids.shims.spark301.Spark301Shims
import com.nvidia.spark.rapids.spark310.RapidsShuffleManager

Expand Down Expand Up @@ -196,6 +197,11 @@ class Spark310Shims extends Spark301Shims {
conf,
conf.isParquetMultiThreadReadEnabled)
}
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, _) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
}),
GpuOverrides.scan[OrcScan](
"ORC parsing",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import ai.rapids.cudf.Scalar;
import ai.rapids.cudf.Schema;
import ai.rapids.cudf.Table;

import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.util.Arrays;
import java.util.List;

/**
Expand All @@ -39,7 +41,7 @@
public class GpuColumnVector extends GpuColumnVectorBase {

public static final class GpuColumnarBatchBuilder implements AutoCloseable {
private final ai.rapids.cudf.HostColumnVector.Builder[] builders;
private final ai.rapids.cudf.HostColumnVector.ColumnBuilder[] builders;
private final StructField[] fields;

/**
Expand All @@ -54,36 +56,41 @@ public static final class GpuColumnarBatchBuilder implements AutoCloseable {
public GpuColumnarBatchBuilder(StructType schema, int rows, ColumnarBatch batch) {
fields = schema.fields();
int len = fields.length;
builders = new ai.rapids.cudf.HostColumnVector.Builder[len];
builders = new ai.rapids.cudf.HostColumnVector.ColumnBuilder[len];
boolean success = false;
try {
for (int i = 0; i < len; i++) {
StructField field = fields[i];
DType type = getRapidsType(field);
if (type == DType.STRING) {
if (field.dataType() instanceof StringType) {
// If we cannot know the exact size, assume the string is small and allocate
// 8 bytes per row. The buffer of the builder will grow as needed if it is
// too small.
int bufferSize = rows * 8;
if (batch != null) {
ColumnVector cv = batch.column(i);
if (cv instanceof WritableColumnVector) {
WritableColumnVector wcv = (WritableColumnVector)cv;
WritableColumnVector wcv = (WritableColumnVector) cv;
if (!wcv.hasDictionary()) {
bufferSize = wcv.getArrayOffset(rows-1) +
bufferSize = wcv.getArrayOffset(rows - 1) +
wcv.getArrayLength(rows - 1);
}
}
}
builders[i] = ai.rapids.cudf.HostColumnVector.builder(rows, bufferSize);
builders[i] = new ai.rapids.cudf.HostColumnVector.ColumnBuilder(new HostColumnVector.BasicType(true, DType.STRING), rows);
} else if (field.dataType() instanceof MapType) {
builders[i] = new ai.rapids.cudf.HostColumnVector.ColumnBuilder(new HostColumnVector.ListType(true,
new HostColumnVector.StructType(true, Arrays.asList(
new HostColumnVector.BasicType(true, DType.STRING),
new HostColumnVector.BasicType(true, DType.STRING)))), rows);
} else {
revans2 marked this conversation as resolved.
Show resolved Hide resolved
builders[i] = ai.rapids.cudf.HostColumnVector.builder(type, rows);
DType type = getRapidsType(field);
builders[i] = new ai.rapids.cudf.HostColumnVector.ColumnBuilder(new HostColumnVector.BasicType(true, type), rows);
}
success = true;
}
} finally {
if (!success) {
for (ai.rapids.cudf.HostColumnVector.Builder b: builders) {
for (ai.rapids.cudf.HostColumnVector.ColumnBuilder b: builders) {
if (b != null) {
b.close();
}
Expand All @@ -92,7 +99,7 @@ public GpuColumnarBatchBuilder(StructType schema, int rows, ColumnarBatch batch)
}
}

public ai.rapids.cudf.HostColumnVector.Builder builder(int i) {
public ai.rapids.cudf.HostColumnVector.ColumnBuilder builder(int i) {
return builders[i];
}

Expand Down Expand Up @@ -121,7 +128,7 @@ public ColumnarBatch build(int rows) {

@Override
public void close() {
for (ai.rapids.cudf.HostColumnVector.Builder b: builders) {
for (ai.rapids.cudf.HostColumnVector.ColumnBuilder b: builders) {
if (b != null) {
b.close();
}
Expand Down Expand Up @@ -198,17 +205,6 @@ static final DataType getSparkType(DType type) {
}
}

protected static final <T> DataType getSparkTypeFrom(ColumnViewAccess<T> access) {
DType type = access.getDataType();
if (type == DType.LIST) {
try (ColumnViewAccess<T> child = access.getChildColumnViewAccess(0)) {
return new ArrayType(getSparkTypeFrom(child), true);
}
} else {
return getSparkType(type);
}
}

/**
* Create an empty batch from the given format. This should be used very sparingly because
* returning an empty batch from an operator is almost always the wrong thing to do.
Expand Down Expand Up @@ -262,6 +258,9 @@ public static final ColumnarBatch from(Table table) {
return from(table, 0, table.getNumberOfColumns());
}

public static final ColumnarBatch from(Table table, List<DataType> colTypes) {
return from(table, colTypes, 0, table.getNumberOfColumns());
}
/**
* Get a ColumnarBatch from a set of columns in the Table. This gets the columns
* starting at startColIndex and going until but not including untilColIndex. This will
Expand Down Expand Up @@ -302,13 +301,58 @@ public static final ColumnarBatch from(Table table, int startColIndex, int until
}
}

/**
* Get a ColumnarBatch from a set of columns in the Table. This gets the columns
* starting at startColIndex and going until but not including untilColIndex. This will
* increment the reference count for all columns converted so you will need to close
* both the table that is passed in and the batch returned to be sure that there are no leaks.
*
* @param table a table of vectors
* @param colTypes List of the column data types in the table passed in
* @param startColIndex index of the first vector you want in the final ColumnarBatch
* @param untilColIndex until index of the columns. (ie doesn't include that column num)
* @return a ColumnarBatch of the vectors from the table
*/
public static final ColumnarBatch from(Table table, List<DataType> colTypes, int startColIndex, int untilColIndex) {
assert table != null : "Table cannot be null";
kuhushukla marked this conversation as resolved.
Show resolved Hide resolved
int numColumns = untilColIndex - startColIndex;
ColumnVector[] columns = new ColumnVector[numColumns];
int finalLoc = 0;
boolean success = false;
try {
for (int i = startColIndex; i < untilColIndex; i++) {
columns[finalLoc] = from(colTypes.get(i), table.getColumn(i).incRefCount());
finalLoc++;
}
long rows = table.getRowCount();
if (rows != (int) rows) {
throw new IllegalStateException("Cannot support a batch larger that MAX INT rows");
}
ColumnarBatch ret = new ColumnarBatch(columns, (int)rows);
success = true;
return ret;
} finally {
if (!success) {
for (ColumnVector cv: columns) {
if (cv != null) {
cv.close();
}
}
}
}
}

/**
* Converts a cudf internal vector to a spark compatible vector. No reference counts
* are incremented so you need to either close the returned value or the input value,
* but not both.
*/
public static final GpuColumnVector from(ai.rapids.cudf.ColumnVector cudfCv) {
return new GpuColumnVector(getSparkTypeFrom(cudfCv), cudfCv);
return new GpuColumnVector(getSparkType(cudfCv.getType()), cudfCv);
}

public static final GpuColumnVector from(DataType type, ai.rapids.cudf.ColumnVector cudfCv) {
revans2 marked this conversation as resolved.
Show resolved Hide resolved
return new GpuColumnVector(type, cudfCv);
}

public static final GpuColumnVector from(Scalar scalar, int count) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static ColumnarBatch from(Table table, DeviceMemoryBuffer buffer) {
try {
for (int i = 0; i < numColumns; ++i) {
ColumnVector v = table.getColumn(i);
DataType type = getSparkTypeFrom(v);
DataType type = getSparkType(v.getType());
columns[i] = new GpuColumnVectorFromBuffer(type, v.incRefCount(), buffer);
}
return new ColumnarBatch(columns, (int) rows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package com.nvidia.spark.rapids;

import ai.rapids.cudf.DType;
import ai.rapids.cudf.HostColumnVectorCore;
import ai.rapids.cudf.HostMemoryBuffer;

import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.vectorized.ColumnVector;
Expand All @@ -25,6 +29,9 @@
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.UTF8String;

import java.util.ArrayList;
import java.util.Optional;

/**
* A GPU accelerated version of the Spark ColumnVector.
* Most of the standard Spark APIs should never be called, as they assume that the data
Expand Down Expand Up @@ -82,7 +89,7 @@ public RapidsHostColumnVector incRefCount() {

@Override
public void close() {
// Just pass through the reference counting
// Just pass through
cudfCv.close();
}

Expand Down Expand Up @@ -143,7 +150,21 @@ public ColumnarArray getArray(int rowId) {

@Override
public ColumnarMap getMap(int ordinal) {
throw new IllegalStateException("Maps are currently not supported by rapids cudf");
ai.rapids.cudf.ColumnViewAccess<HostMemoryBuffer> structHcv = cudfCv.getChildColumnViewAccess(0);
revans2 marked this conversation as resolved.
Show resolved Hide resolved
// keys
ai.rapids.cudf.ColumnViewAccess<HostMemoryBuffer> firstHcv = structHcv.getChildColumnViewAccess(0);
HostColumnVectorCore firstHcvCore = (HostColumnVectorCore) firstHcv;
// values
ai.rapids.cudf.ColumnViewAccess<HostMemoryBuffer> secondHcv = structHcv.getChildColumnViewAccess(1);
HostColumnVectorCore secondHcvCore = (HostColumnVectorCore) secondHcv;

RapidsHostColumnVectorCore firstChild = new RapidsHostColumnVectorCore(
revans2 marked this conversation as resolved.
Show resolved Hide resolved
GpuColumnVector.getSparkType(firstHcvCore.getType()), firstHcvCore);
RapidsHostColumnVectorCore secondChild = new RapidsHostColumnVectorCore(
GpuColumnVector.getSparkType(secondHcvCore.getType()), secondHcvCore);
int startOffset = cudfCv.getOffsetBuffer().getInt(ordinal * DType.INT32.getSizeInBytes());
return new ColumnarMap(firstChild, secondChild, startOffset,
cudfCv.getOffsetBuffer().getInt((ordinal + 1) * DType.INT32.getSizeInBytes()) - startOffset);
}
revans2 marked this conversation as resolved.
Show resolved Hide resolved

@Override
Expand Down
Loading