Skip to content

Commit

Permalink
[SPARK-45667][CORE][SQL][CONNECT] Clean up the deprecated API usage r…
Browse files Browse the repository at this point in the history
…elated to `IterableOnceExtensionMethods`

### What changes were proposed in this pull request?
This PR cleans up the use of the following APIs in `IterableOnceExtensionMethods`, as they have been deprecated after Scala 2.13.0:

- `.toSeq` -> `.iterator.to(Seq)`
- `.toIterable` -> `.iterator.to(Iterable)`
- `.toTraversable` -> `.iterator.to(Iterable)`
- `.toArray` -> `.iterator.toArray`
- `.map` -> `.iterator.map`
- `.foreach` -> `.iterator.foreach`
- `.isEmpty` -> `.iterator.isEmpty`

```scala
  deprecated("Use .iterator.to(Seq) instead", "2.13.0")
  `inline` def toSeq: immutable.Seq[A] = immutable.Seq.from(it)

  deprecated("Use .iterator.to(Iterable) instead", "2.13.0")
  `inline` final def toIterable: Iterable[A] = Iterable.from(it)

  deprecated("Use .iterator.to(Iterable) instead", "2.13.0")
  `inline` final def toTraversable: Traversable[A] = toIterable

  deprecated("Use .iterator.toArray", "2.13.0")
  def toArray[B >: A: ClassTag]: Array[B] = it match {
    case it: Iterable[B] => it.toArray[B]
    case _ => it.iterator.toArray[B]
  }

  deprecated("Use .iterator.map instead or consider requiring an Iterable", "2.13.0")
  def map[B](f: A => B): IterableOnce[B] = it match {
    case it: Iterable[A] => it.map(f)
    case _ => it.iterator.map(f)
  }

  deprecated("Use .iterator.foreach(...) instead", "2.13.0")
  `inline` def foreach[U](f: A => U): Unit = it match {
    case it: Iterable[A] => it.foreach(f)
    case _ => it.iterator.foreach(f)
  }

  deprecated("Use .iterator.isEmpty instead", "2.13.0")
  def isEmpty: Boolean = it match {
    case it: Iterable[A] => it.isEmpty
    case _ => it.iterator.isEmpty
  }
```

### Why are the changes needed?
Clean up deprecated API usage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Acitons.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43532 from LuciferYang/SPARK-45667.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
  • Loading branch information
LuciferYang committed Oct 27, 2023
1 parent 62a3868 commit 0f145f5
Show file tree
Hide file tree
Showing 15 changed files with 17 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private[sql] object UdfUtils extends Serializable {
def noOp[V, K](): V => K = _ => null.asInstanceOf[K]

def iterableOnceToSeq[A, B](f: A => IterableOnce[B]): A => Seq[B] = { value =>
f(value).toSeq
f(value).iterator.to(Seq)
}

// (1 to 22).foreach { i =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.flatMap { case (k, v) =>
cleanF(v).map(x => (k, x))
cleanF(v).iterator.map(x => (k, x))
},
preservesPartitioning = true)
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class UnionRDD[T: ClassTag](
} else {
rdds
}
val array = new Array[Partition](parRDDs.map(_.partitions.length).sum)
val array = new Array[Partition](parRDDs.iterator.map(_.partitions.length).sum)
var pos = 0
for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) {
array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private[spark] class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Orderin
override def knownSize: Int = size

override def addAll(xs: IterableOnce[A]): this.type = {
xs.foreach { this += _ }
xs.iterator.foreach { this += _ }
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class StatCounter(values: IterableOnce[Double]) extends Serializable {

/** Add multiple values into this StatCounter, updating the internal statistics. */
def merge(values: IterableOnce[Double]): StatCounter = {
values.foreach(v => merge(v))
values.iterator.foreach(v => merge(v))
this
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ private[spark] object Utils
* uses a local random number generator, avoiding inter-thread contention.
*/
def randomize[T: ClassTag](seq: IterableOnce[T]): Seq[T] = {
randomizeInPlace(seq.toArray)
randomizeInPlace(seq.iterator.toArray)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable
}

case _ =>
values.foreach(e => this += e)
values.iterator.foreach(e => this += e)
}
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.types._

class GeneratorExpressionSuite extends SparkFunSuite with ExpressionEvalHelper {
private def checkTuple(actual: Expression, expected: Seq[InternalRow]): Unit = {
assert(actual.eval(null).asInstanceOf[IterableOnce[InternalRow]].toSeq === expected)
assert(actual.eval(null).asInstanceOf[IterableOnce[InternalRow]].iterator.to(Seq) === expected)
}

private final val empty_array = CreateArray(Seq.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
Nil

private def checkJsonTuple(jt: JsonTuple, expected: InternalRow): Unit = {
assert(jt.eval(null).toSeq.head === expected)
assert(jt.eval(null).iterator.to(Seq).head === expected)
}

test("json_tuple escaping") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ case class GenerateExec(
// we should always set the left (required child output)
joinedRow.withLeft(pruneChildForResult(row))
val outputRows = boundGenerator.eval(row)
if (outer && outputRows.isEmpty) {
if (outer && outputRows.iterator.isEmpty) {
joinedRow.withRight(generatorNullRow) :: Nil
} else {
outputRows.iterator.map(joinedRow.withRight)
Expand All @@ -110,7 +110,7 @@ case class GenerateExec(
} else {
iter.flatMap { row =>
val outputRows = boundGenerator.eval(row)
if (outer && outputRows.isEmpty) {
if (outer && outputRows.iterator.isEmpty) {
Seq(generatorNullRow)
} else {
outputRows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ case class MapGroupsExec(
val result = func(
getKey(key),
rowIter.map(getValue))
result.map(outputObject)
result.iterator.map(outputObject)
}
}
}
Expand Down Expand Up @@ -653,7 +653,7 @@ case class CoGroupExec(
getKey(key),
leftResult.map(getLeft),
rightResult.map(getRight))
result.map(outputObject)
result.iterator.map(outputObject)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ case class MemoryStream[A : Encoder](
protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)

def addData(data: IterableOnce[A]): Offset = {
val objects = data.toSeq
val objects = data.iterator.to(Seq)
val rows = objects.iterator.map(d => toRow(d).copy().asInstanceOf[UnsafeRow]).toArray
logDebug(s"Adding: $objects")
this.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa

def addData(data: IterableOnce[A]): Offset = synchronized {
// Distribute data evenly among partition lists.
data.toSeq.zipWithIndex.map {
data.iterator.to(Seq).zipWithIndex.map {
case (item, index) =>
records(index % numPartitions) += toRow(item).copy().asInstanceOf[UnsafeRow]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class ExpressionInfoSuite extends SparkFunSuite with SharedSparkSession {
withClue(s"Expression class '$className'") {
val exprExamples = info.getOriginalExamples
if (!exprExamples.isEmpty && !ignoreSet.contains(className)) {
assert(exampleRe.findAllIn(exprExamples).toIterable
assert(exampleRe.findAllIn(exprExamples).iterator.to(Iterable)
.filter(setStmtRe.findFirstIn(_).isEmpty) // Ignore SET commands
.forall(_.contains("_FUNC_")))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private[streaming] class MapWithStateDStreamImpl[
/** Return a pair DStream where each RDD is the snapshot of the state of all the keys. */
def stateSnapshots(): DStream[(KeyType, StateType)] = {
internalStream.flatMap {
_.stateMap.getAll().map { case (k, s, _) => (k, s) }.toTraversable }
_.stateMap.getAll().map { case (k, s, _) => (k, s) }.iterator.to(Iterable) }
}

def keyClass: Class[_] = implicitly[ClassTag[KeyType]].runtimeClass
Expand Down

0 comments on commit 0f145f5

Please sign in to comment.