Skip to content

Commit

Permalink
Another round of fixes for mapping of DataType to DType (NVIDIA#1078)
Browse files Browse the repository at this point in the history
* Another round of fixes for mapping of DataType to DType

Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>

* More cleanup and fixes

* Addressed review comments
  • Loading branch information
revans2 authored Nov 6, 2020
1 parent 857074d commit 5018fbd
Show file tree
Hide file tree
Showing 58 changed files with 704 additions and 953 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,6 @@ trait GpuHashJoin extends GpuExec with HashJoin {

/**
* Filter the builtBatch if needed. builtBatch will be closed.
* @param builtBatch
* @return
*/
def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch =
if (shouldFilterBuiltTableForNulls) {
Expand Down Expand Up @@ -317,9 +315,9 @@ trait GpuHashJoin extends GpuExec with HashJoin {
s" supported")
}
try {
val result = joinIndices.map(joinIndex =>
GpuColumnVector.from(joinedTable.getColumn(joinIndex).incRefCount()))
.toArray[ColumnVector]
val result = joinIndices.zip(output).map { case (joinIndex, outAttr) =>
GpuColumnVector.from(joinedTable.getColumn(joinIndex).incRefCount(), outAttr.dataType)
}.toArray[ColumnVector]

new ColumnarBatch(result, joinedTable.getRowCount.toInt)
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,6 @@ trait GpuHashJoin extends GpuExec with HashJoin {

/**
* Filter the builtBatch if needed. builtBatch will be closed.
* @param builtBatch
* @return
*/
def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch =
if (shouldFilterBuiltTableForNulls) {
Expand Down Expand Up @@ -318,9 +316,9 @@ trait GpuHashJoin extends GpuExec with HashJoin {
s" supported")
}
try {
val result = joinIndices.map(joinIndex =>
GpuColumnVector.from(joinedTable.getColumn(joinIndex).incRefCount()))
.toArray[ColumnVector]
val result = joinIndices.zip(output).map { case (joinIndex, outAttr) =>
GpuColumnVector.from(joinedTable.getColumn(joinIndex).incRefCount(), outAttr.dataType)
}.toArray[ColumnVector]

new ColumnarBatch(result, joinedTable.getRowCount.toInt)
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,6 @@ trait GpuHashJoin extends GpuExec with HashJoinWithoutCodegen {

/**
* Filter the builtBatch if needed. builtBatch will be closed.
* @param builtBatch
* @return
*/
def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch =
if (shouldFilterBuiltTableForNulls) {
Expand Down Expand Up @@ -318,9 +316,9 @@ trait GpuHashJoin extends GpuExec with HashJoinWithoutCodegen {
s" supported")
}
try {
val result = joinIndices.map(joinIndex =>
GpuColumnVector.from(joinedTable.getColumn(joinIndex).incRefCount()))
.toArray[ColumnVector]
val result = joinIndices.zip(output).map { case (joinIndex, outAttr) =>
GpuColumnVector.from(joinedTable.getColumn(joinIndex).incRefCount(), outAttr.dataType)
}.toArray[ColumnVector]

new ColumnarBatch(result, joinedTable.getRowCount.toInt)
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,44 +182,6 @@ public static DType getRapidsType(DataType type) {
return result;
}

static DataType getSparkType(DType type) {
switch (type.getTypeId()) {
case BOOL8:
return DataTypes.BooleanType;
case INT8:
return DataTypes.ByteType;
case INT16:
return DataTypes.ShortType;
case INT32:
return DataTypes.IntegerType;
case INT64:
return DataTypes.LongType;
case FLOAT32:
return DataTypes.FloatType;
case FLOAT64:
return DataTypes.DoubleType;
case TIMESTAMP_DAYS:
return DataTypes.DateType;
case TIMESTAMP_MICROSECONDS:
return DataTypes.TimestampType;
case STRING:
return DataTypes.StringType;
default:
throw new IllegalArgumentException(type + " is not supported by spark yet.");
}
}

protected static <T> DataType getSparkTypeFrom(ColumnViewAccess<T> access) {
DType type = access.getDataType();
if (type == DType.LIST) {
try (ColumnViewAccess<T> child = access.getChildColumnViewAccess(0)) {
return new ArrayType(getSparkTypeFrom(child), true);
}
} else {
return getSparkType(type);
}
}

/**
* Create an empty batch from the given format. This should be used very sparingly because
* returning an empty batch from an operator is almost always the wrong thing to do.
Expand Down Expand Up @@ -271,8 +233,8 @@ private static StructType structFromAttributes(List<Attribute> format) {
}

/**
* Convert a spark schema into a cudf schema
* @param input the spark schema to convert
* Convert a Spark schema into a cudf schema
* @param input the Spark schema to convert
* @return the cudf schema
*/
public static Schema from(StructType input) {
Expand Down Expand Up @@ -328,7 +290,7 @@ public static ColumnarBatch from(Table table, DataType[] colTypes) {
private static <T> boolean typeConversionAllowed(ColumnViewAccess<T> cv, DataType colType) {
DType dt = cv.getDataType();
if (!dt.isNestedType()) {
return getSparkType(dt).equals(colType);
return getRapidsType(colType).equals(dt);
}
if (colType instanceof MapType) {
MapType mType = (MapType) colType;
Expand Down Expand Up @@ -453,22 +415,18 @@ assert typeConversionAllowed(table, colTypes) : "Type conversion is not allowed
}

/**
* Converts a cudf internal vector to a spark compatible vector. No reference counts
* Converts a cudf internal vector to a Spark compatible vector. No reference counts
* are incremented so you need to either close the returned value or the input value,
* but not both.
* @deprecated use the version that takes a data type
*/
@Deprecated
public static GpuColumnVector from(ai.rapids.cudf.ColumnVector cudfCv) {
return new GpuColumnVector(getSparkTypeFrom(cudfCv), cudfCv);
}

public static GpuColumnVector from(ai.rapids.cudf.ColumnVector cudfCv, DataType type) {
assert typeConversionAllowed(cudfCv, type) : "Type conversion is not allowed from " + cudfCv +
" to " + type;
return new GpuColumnVector(type, cudfCv);
}

public static GpuColumnVector from(Scalar scalar, int count) {
return from(ai.rapids.cudf.ColumnVector.fromScalar(scalar, count));
public static GpuColumnVector from(Scalar scalar, int count, DataType sparkType) {
return from(ai.rapids.cudf.ColumnVector.fromScalar(scalar, count), sparkType);
}

/**
Expand All @@ -486,7 +444,7 @@ public static ai.rapids.cudf.ColumnVector[] extractBases(ColumnarBatch batch) {
}

/**
* Get the underlying spark compatible columns from the batch. This does not increment any
* Get the underlying Spark compatible columns from the batch. This does not increment any
* reference counts so if you want to use these columns after the batch is closed
* you will need to do that on your own.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,43 +46,6 @@ public static ColumnarBatch from(ContiguousTable contigTable, DataType[] colType
return from(table, buffer, colTypes);
}

/**
* Get a ColumnarBatch from a set of columns in a table, and the corresponding device buffer,
* which backs such columns. The resulting batch is composed of columns which are instances of
* GpuColumnVectorFromBuffer. This will increment the reference count for all columns
* converted so you will need to close both the table that is passed in and the batch
* returned to be sure that there are no leaks.
*
* @param table a table with columns at offsets of `buffer`
* @param buffer a device buffer that packs data for columns in `table`
* @return batch of GpuColumnVectorFromBuffer instances derived from the table and buffer
* @deprecated spark data types must be provided with it.
*/
@Deprecated
public static ColumnarBatch from(Table table, DeviceMemoryBuffer buffer) {
long rows = table.getRowCount();
if (rows != (int) rows) {
throw new IllegalStateException("Cannot support a batch larger that MAX INT rows");
}
int numColumns = table.getNumberOfColumns();
GpuColumnVector[] columns = new GpuColumnVector[numColumns];
try {
for (int i = 0; i < numColumns; ++i) {
ColumnVector v = table.getColumn(i);
DataType type = getSparkType(v.getType());
columns[i] = new GpuColumnVectorFromBuffer(type, v.incRefCount(), buffer);
}
return new ColumnarBatch(columns, (int) rows);
} catch (Exception e) {
for (GpuColumnVector v : columns) {
if (v != null) {
v.close();
}
}
throw e;
}
}

/**
* Get a ColumnarBatch from a set of columns in a table, and the corresponding device buffer,
* which backs such columns. The resulting batch is composed of columns which are instances of
Expand All @@ -108,7 +71,7 @@ public static ColumnarBatch from(Table table, DeviceMemoryBuffer buffer, DataTyp
try {
for (int i = 0; i < numColumns; ++i) {
ColumnVector v = table.getColumn(i);
DataType type = getSparkType(v.getType());
DataType type = colTypes[i];
columns[i] = new GpuColumnVectorFromBuffer(type, v.incRefCount(), buffer);
}
return new ColumnarBatch(columns, (int) rows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,48 +81,13 @@ public static long[] getUncompressedColumnSizes(ColumnarBatch batch) {
return sizes;
}

/**
* Build a columnar batch from a compressed data buffer and specified table metadata
* NOTE: The data remains compressed and cannot be accessed directly from the columnar batch.
* @deprecated use the version that takes the spark data types.
*/
@Deprecated
public static ColumnarBatch from(DeviceMemoryBuffer compressedBuffer, TableMeta tableMeta) {
long rows = tableMeta.rowCount();
if (rows != (int) rows) {
throw new IllegalStateException("Cannot support a batch larger that MAX INT rows");
}

ColumnMeta columnMeta = new ColumnMeta();
int numColumns = tableMeta.columnMetasLength();
ColumnVector[] columns = new ColumnVector[numColumns];
try {
for (int i = 0; i < numColumns; ++i) {
tableMeta.columnMetas(columnMeta, i);
DType dtype = DType.fromNative(columnMeta.dtypeId(), columnMeta.dtypeScale());
DataType type = GpuColumnVector.getSparkType(dtype);
compressedBuffer.incRefCount();
columns[i] = new GpuCompressedColumnVector(type, compressedBuffer, tableMeta);
}
} catch (Throwable t) {
for (int i = 0; i < numColumns; ++i) {
if (columns[i] != null) {
columns[i].close();
}
}
throw t;
}

return new ColumnarBatch(columns, (int) rows);
}

/**
* This should only ever be called from an assertion.
*/
private static boolean typeConversionAllowed(ColumnMeta columnMeta, DataType colType) {
DType dt = DType.fromNative(columnMeta.dtypeId(), columnMeta.dtypeScale());
if (!dt.isNestedType()) {
return GpuColumnVector.getSparkType(dt).equals(colType);
return GpuColumnVector.getRapidsType(colType).equals(dt);
}
if (colType instanceof MapType) {
MapType mType = (MapType) colType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@

import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.UTF8String;

import java.util.ArrayList;
import java.util.Optional;

/**
* A GPU accelerated version of the Spark ColumnVector.
* Most of the standard Spark APIs should never be called, as they assume that the data
Expand Down Expand Up @@ -150,6 +148,7 @@ public ColumnarArray getArray(int rowId) {

@Override
public ColumnarMap getMap(int ordinal) {
MapType mt = (MapType) dataType();
ai.rapids.cudf.ColumnViewAccess<HostMemoryBuffer> structHcv = cudfCv.getChildColumnViewAccess(0);
// keys
ai.rapids.cudf.ColumnViewAccess<HostMemoryBuffer> firstHcv = structHcv.getChildColumnViewAccess(0);
Expand All @@ -158,10 +157,8 @@ public ColumnarMap getMap(int ordinal) {
ai.rapids.cudf.ColumnViewAccess<HostMemoryBuffer> secondHcv = structHcv.getChildColumnViewAccess(1);
HostColumnVectorCore secondHcvCore = (HostColumnVectorCore) secondHcv;

RapidsHostColumnVectorCore firstChild = new RapidsHostColumnVectorCore(
GpuColumnVector.getSparkType(firstHcvCore.getType()), firstHcvCore);
RapidsHostColumnVectorCore secondChild = new RapidsHostColumnVectorCore(
GpuColumnVector.getSparkType(secondHcvCore.getType()), secondHcvCore);
RapidsHostColumnVectorCore firstChild = new RapidsHostColumnVectorCore(mt.keyType(), firstHcvCore);
RapidsHostColumnVectorCore secondChild = new RapidsHostColumnVectorCore(mt.valueType(), secondHcvCore);
int startOffset = cudfCv.getOffsetBuffer().getInt(ordinal * DType.INT32.getSizeInBytes());
return new ColumnarMap(firstChild, secondChild, startOffset,
cudfCv.getOffsetBuffer().getInt((ordinal + 1) * DType.INT32.getSizeInBytes()) - startOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
Expand All @@ -31,15 +31,17 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
*/
class ColumnarPartitionReaderWithPartitionValues(
fileReader: PartitionReader[ColumnarBatch],
partitionValues: Array[Scalar]) extends PartitionReader[ColumnarBatch] {
partitionValues: Array[Scalar],
partValueTypes: Array[DataType]) extends PartitionReader[ColumnarBatch] {
override def next(): Boolean = fileReader.next()

override def get(): ColumnarBatch = {
if (partitionValues.isEmpty) {
fileReader.get()
} else {
val fileBatch: ColumnarBatch = fileReader.get()
ColumnarPartitionReaderWithPartitionValues.addPartitionValues(fileBatch, partitionValues)
ColumnarPartitionReaderWithPartitionValues.addPartitionValues(fileBatch,
partitionValues, partValueTypes)
}
}

Expand All @@ -49,13 +51,14 @@ class ColumnarPartitionReaderWithPartitionValues(
}
}

object ColumnarPartitionReaderWithPartitionValues {
object ColumnarPartitionReaderWithPartitionValues extends Arm {
def newReader(partFile: PartitionedFile,
baseReader: PartitionReader[ColumnarBatch],
partitionSchema: StructType): PartitionReader[ColumnarBatch] = {
val partitionValues = partFile.partitionValues.toSeq(partitionSchema)
val partitionScalars = createPartitionValues(partitionValues, partitionSchema)
new ColumnarPartitionReaderWithPartitionValues(baseReader, partitionScalars)
new ColumnarPartitionReaderWithPartitionValues(baseReader, partitionScalars,
GpuColumnVector.extractTypes(partitionSchema))
}

def createPartitionValues(
Expand All @@ -69,10 +72,11 @@ object ColumnarPartitionReaderWithPartitionValues {

def addPartitionValues(
fileBatch: ColumnarBatch,
partitionValues: Array[Scalar]): ColumnarBatch = {
partitionValues: Array[Scalar],
sparkTypes: Array[DataType]): ColumnarBatch = {
var partitionColumns: Array[GpuColumnVector] = null
try {
partitionColumns = buildPartitionColumns(fileBatch.numRows, partitionValues)
partitionColumns = buildPartitionColumns(fileBatch.numRows, partitionValues, sparkTypes)
val fileBatchCols = (0 until fileBatch.numCols).map(fileBatch.column)
val resultCols = fileBatchCols ++ partitionColumns
val result = new ColumnarBatch(resultCols.toArray, fileBatch.numRows)
Expand All @@ -91,13 +95,14 @@ object ColumnarPartitionReaderWithPartitionValues {

private def buildPartitionColumns(
numRows: Int,
partitionValues: Array[Scalar]): Array[GpuColumnVector] = {
partitionValues: Array[Scalar],
sparkTypes: Array[DataType]): Array[GpuColumnVector] = {
var succeeded = false
val result = new Array[GpuColumnVector](partitionValues.length)
try {
for (i <- result.indices) {
result(i) = GpuColumnVector.from(ai.rapids.cudf.ColumnVector.fromScalar(partitionValues(i),
numRows))
result(i) = GpuColumnVector.from(
ai.rapids.cudf.ColumnVector.fromScalar(partitionValues(i), numRows), sparkTypes(i))
}
succeeded = true
result
Expand Down
Loading

0 comments on commit 5018fbd

Please sign in to comment.