Skip to content

Commit

Permalink
Update order by to not load native libraries when sorting (NVIDIA#2022)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Mar 25, 2021
1 parent c83a8c6 commit a8ce923
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{ColumnVector, DType, NvtxColor, NvtxRange, Table}
import ai.rapids.cudf.{ColumnVector, DType, NvtxColor, NvtxRange, OrderByArg, Table}

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, HashClusteredDistribution}
Expand Down Expand Up @@ -65,7 +65,7 @@ case class GpuHashPartitioning(expressions: Seq[Expression], numPartitions: Int)
allColumns += parts
allColumns ++= GpuColumnVector.extractBases(batch)
withResource(new Table(allColumns: _*)) { fullTable =>
fullTable.orderBy(Table.asc(0))
fullTable.orderBy(OrderByArg.asc(0))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{ColumnVector, NvtxColor, Table}
import ai.rapids.cudf.{ColumnVector, NvtxColor, OrderByArg, Table}

import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Expression, NullsFirst, NullsLast, SortOrder}
import org.apache.spark.sql.types.DataType
Expand All @@ -33,11 +33,11 @@ object SortUtils extends Arm {
case _ => None
}

def getOrder(order: SortOrder, index: Int): Table.OrderByArg =
def getOrder(order: SortOrder, index: Int): OrderByArg =
if (order.isAscending) {
Table.asc(index, order.nullOrdering == NullsFirst)
OrderByArg.asc(index, order.nullOrdering == NullsFirst)
} else {
Table.desc(index, order.nullOrdering == NullsLast)
OrderByArg.desc(index, order.nullOrdering == NullsLast)
}
}

Expand Down Expand Up @@ -88,7 +88,7 @@ class GpuSorter(
private[this] lazy val (sortOrdersThatNeedComputation, cudfOrdering, cpuOrderingInternal) = {
val sortOrdersThatNeedsComputation = mutable.ArrayBuffer[SortOrder]()
val cpuOrdering = mutable.ArrayBuffer[SortOrder]()
val cudfOrdering = mutable.ArrayBuffer[Table.OrderByArg]()
val cudfOrdering = mutable.ArrayBuffer[OrderByArg]()
var newColumnIndex = numInputColumns
// Remove duplicates in the ordering itself because
// there is no need to do it twice.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,7 @@ package org.apache.spark.sql.rapids

import scala.collection.mutable

import ai.rapids.cudf.{ContiguousTable, Table}
import ai.rapids.cudf.{ContiguousTable, OrderByArg, Table}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -276,7 +276,7 @@ class GpuDynamicPartitionDataWriter(
val columnIds = 0 until t.getNumberOfColumns
val distinct = t.groupBy(columnIds: _*).aggregate()
try {
distinct.orderBy(columnIds.map(Table.asc(_, nullsSmallest)): _*)
distinct.orderBy(columnIds.map(OrderByArg.asc(_, nullsSmallest)): _*)
} finally {
distinct.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

package org.apache.spark.sql.rapids.execution.python

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf
import ai.rapids.cudf.{Aggregation, Table}
import ai.rapids.cudf.{Aggregation, OrderByArg}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.python.PythonWorkerSemaphore
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.TaskContext
import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
Expand Down Expand Up @@ -125,7 +126,7 @@ class GroupingIterator(
}
}
val orderedTable = withResource(cntTable) { table =>
table.orderBy(partitionIndices.map(id => Table.asc(id, true)): _*)
table.orderBy(partitionIndices.map(id => OrderByArg.asc(id, true)): _*)
}
val (countHostCol, numRows) = withResource(orderedTable) { table =>
// Yes copying the data to host, it would be OK since just copying the aggregated
Expand Down

0 comments on commit a8ce923

Please sign in to comment.