diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 5033ab00601ab..3bfd1abb48d9c 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -692,11 +692,7 @@ jobs: - name: Install Python linter dependencies if: inputs.branch != 'branch-3.3' && inputs.branch != 'branch-3.4' && inputs.branch != 'branch-3.5' run: | - # TODO(SPARK-32407): Sphinx 3.1+ does not correctly index nested classes. - # See also https://github.com/sphinx-doc/sphinx/issues/7551. - # Jinja2 3.0.0+ causes error when building with Sphinx. - # See also https://issues.apache.org/jira/browse/SPARK-35375. - python3.9 -m pip install 'flake8==3.9.0' pydata_sphinx_theme 'mypy==0.982' 'pytest==7.1.3' 'pytest-mypy-plugins==1.9.3' numpydoc 'jinja2<3.0.0' 'black==23.9.1' + python3.9 -m pip install 'flake8==3.9.0' pydata_sphinx_theme 'mypy==0.982' 'pytest==7.1.3' 'pytest-mypy-plugins==1.9.3' numpydoc jinja2 'black==23.9.1' python3.9 -m pip install 'pandas-stubs==1.2.0.53' ipython 'grpcio==1.59.3' 'grpc-stubs==1.24.11' 'googleapis-common-protos-stubs==2.2.0' - name: Python linter run: PYTHON_EXECUTABLE=python3.9 ./dev/lint-python @@ -745,13 +741,9 @@ jobs: Rscript -e "devtools::install_version('preferably', version='0.4', repos='https://cloud.r-project.org')" - name: Install dependencies for documentation generation run: | - # TODO(SPARK-32407): Sphinx 3.1+ does not correctly index nested classes. - # See also https://github.com/sphinx-doc/sphinx/issues/7551. - # Jinja2 3.0.0+ causes error when building with Sphinx. - # See also https://issues.apache.org/jira/browse/SPARK-35375. # Pin the MarkupSafe to 2.0.1 to resolve the CI error. # See also https://issues.apache.org/jira/browse/SPARK-38279. - python3.9 -m pip install 'sphinx<3.1.0' mkdocs pydata_sphinx_theme sphinx-copybutton nbsphinx numpydoc 'jinja2<3.0.0' 'markupsafe==2.0.1' 'pyzmq<24.0.0' + python3.9 -m pip install 'sphinx==4.2.0' mkdocs 'pydata_sphinx_theme>=0.13' sphinx-copybutton nbsphinx numpydoc jinja2 'markupsafe==2.0.1' 'pyzmq<24.0.0' python3.9 -m pip install ipython_genutils # See SPARK-38517 python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' pyarrow pandas 'plotly>=4.8' python3.9 -m pip install 'docutils<0.18.0' # See SPARK-39421 diff --git a/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/UTF8StringPropertyCheckSuite.scala b/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/UTF8StringPropertyCheckSuite.scala index ab488e18ba3f4..75c56451592e4 100644 --- a/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/UTF8StringPropertyCheckSuite.scala +++ b/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/UTF8StringPropertyCheckSuite.scala @@ -80,7 +80,9 @@ class UTF8StringPropertyCheckSuite extends AnyFunSuite with ScalaCheckDrivenProp test("compare") { forAll { (s1: String, s2: String) => - assert(Math.signum(toUTF8(s1).compareTo(toUTF8(s2))) === Math.signum(s1.compareTo(s2))) + assert(Math.signum { + toUTF8(s1).compareTo(toUTF8(s2)).toFloat + } === Math.signum(s1.compareTo(s2).toFloat)) } } diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 19b70307a1cdd..5b70edf249d14 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -1067,7 +1067,7 @@ }, "FAILED_EXECUTE_UDF" : { "message" : [ - "Failed to execute user defined function (: () => )." + "User defined function (: () => ) failed due to: ." ], "sqlState" : "39000" }, diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala index a1e57226e530f..d760c9d97693b 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1532,6 +1532,41 @@ class Dataset[T] private[sql] ( proto.Aggregate.GroupType.GROUP_TYPE_CUBE) } + /** + * Create multi-dimensional aggregation for the current Dataset using the specified grouping + * sets, so we can run aggregation on them. See [[RelationalGroupedDataset]] for all the + * available aggregate functions. + * + * {{{ + * // Compute the average for all numeric columns group by specific grouping sets. + * ds.groupingSets(Seq(Seq($"department", $"group"), Seq()), $"department", $"group").avg() + * + * // Compute the max age and average salary, group by specific grouping sets. + * ds.groupingSets(Seq($"department", $"gender"), Seq()), $"department", $"group").agg(Map( + * "salary" -> "avg", + * "age" -> "max" + * )) + * }}} + * + * @group untypedrel + * @since 4.0.0 + */ + @scala.annotation.varargs + def groupingSets(groupingSets: Seq[Seq[Column]], cols: Column*): RelationalGroupedDataset = { + val groupingSetMsgs = groupingSets.map { groupingSet => + val groupingSetMsg = proto.Aggregate.GroupingSets.newBuilder() + for (groupCol <- groupingSet) { + groupingSetMsg.addGroupingSet(groupCol.expr) + } + groupingSetMsg.build() + } + new RelationalGroupedDataset( + toDF(), + cols, + proto.Aggregate.GroupType.GROUP_TYPE_GROUPING_SETS, + groupingSets = Some(groupingSetMsgs)) + } + /** * (Scala-specific) Aggregates on the entire Dataset without groups. * {{{ diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index 5ed97e45c7701..776a6231eaecd 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -39,7 +39,8 @@ class RelationalGroupedDataset private[sql] ( private[sql] val df: DataFrame, private[sql] val groupingExprs: Seq[Column], groupType: proto.Aggregate.GroupType, - pivot: Option[proto.Aggregate.Pivot] = None) { + pivot: Option[proto.Aggregate.Pivot] = None, + groupingSets: Option[Seq[proto.Aggregate.GroupingSets]] = None) { private[this] def toDF(aggExprs: Seq[Column]): DataFrame = { df.sparkSession.newDataFrame { builder => @@ -60,6 +61,11 @@ class RelationalGroupedDataset private[sql] ( builder.getAggregateBuilder .setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_PIVOT) .setPivot(pivot.get) + case proto.Aggregate.GroupType.GROUP_TYPE_GROUPING_SETS => + assert(groupingSets.isDefined) + val aggBuilder = builder.getAggregateBuilder + .setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_GROUPING_SETS) + groupingSets.get.foreach(aggBuilder.addGroupingSets) case g => throw new UnsupportedOperationException(g.toString) } } diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala index 5cc63bc45a04a..c5c917ebfa955 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala @@ -3017,6 +3017,12 @@ class PlanGenerationTestSuite simple.groupBy(Column("id")).pivot("a").agg(functions.count(Column("b"))) } + test("groupingSets") { + simple + .groupingSets(Seq(Seq(fn.col("a")), Seq.empty[Column]), fn.col("a")) + .agg("a" -> "max", "a" -> "count") + } + test("width_bucket") { simple.select(fn.width_bucket(fn.col("b"), fn.col("b"), fn.col("b"), fn.col("a"))) } diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetryPolicy.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetryPolicy.scala index cb5b97f2e4aff..8c8472d780dbc 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetryPolicy.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/RetryPolicy.scala @@ -55,7 +55,7 @@ object RetryPolicy { def defaultPolicy(): RetryPolicy = RetryPolicy( name = "DefaultPolicy", // Please synchronize changes here with Python side: - // pyspark/sql/connect/client/core.py + // pyspark/sql/connect/client/retries.py // // Note: these constants are selected so that the maximum tolerated wait is guaranteed // to be at least 10 minutes diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReader.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReader.scala index 488208574809b..53d8d46e62689 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReader.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowVectorReader.scala @@ -134,7 +134,7 @@ private[arrow] class SmallIntVectorReader(v: SmallIntVector) private[arrow] class IntVectorReader(v: IntVector) extends TypedArrowVectorReader[IntVector](v) { override def getInt(i: Int): Int = vector.get(i) override def getLong(i: Int): Long = getInt(i) - override def getFloat(i: Int): Float = getInt(i) + override def getFloat(i: Int): Float = getInt(i).toFloat override def getDouble(i: Int): Double = getInt(i) override def getString(i: Int): String = String.valueOf(getInt(i)) override def getJavaDecimal(i: Int): JBigDecimal = JBigDecimal.valueOf(getInt(i)) @@ -143,8 +143,8 @@ private[arrow] class IntVectorReader(v: IntVector) extends TypedArrowVectorReade private[arrow] class BigIntVectorReader(v: BigIntVector) extends TypedArrowVectorReader[BigIntVector](v) { override def getLong(i: Int): Long = vector.get(i) - override def getFloat(i: Int): Float = getLong(i) - override def getDouble(i: Int): Double = getLong(i) + override def getFloat(i: Int): Float = getLong(i).toFloat + override def getDouble(i: Int): Double = getLong(i).toDouble override def getString(i: Int): String = String.valueOf(getLong(i)) override def getJavaDecimal(i: Int): JBigDecimal = JBigDecimal.valueOf(getLong(i)) override def getTimestamp(i: Int): Timestamp = toJavaTimestamp(getLong(i) * MICROS_PER_SECOND) diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_encode.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_encode.explain index 56da919abf4c5..2f65436059230 100644 --- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_encode.explain +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_encode.explain @@ -1,2 +1,2 @@ -Project [encode(g#0, UTF-8) AS encode(g, UTF-8)#0] +Project [encode(g#0, UTF-8, false) AS encode(g, UTF-8)#0] +- LocalRelation , [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_binary_with_format.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_binary_with_format.explain index e9513f0103c81..b62ccccc0c15e 100644 --- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_binary_with_format.explain +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_binary_with_format.explain @@ -1,2 +1,2 @@ -Project [encode(g#0, UTF-8) AS to_binary(g, utf-8)#0] +Project [encode(g#0, UTF-8, false) AS to_binary(g, utf-8)#0] +- LocalRelation , [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/groupingSets.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/groupingSets.explain new file mode 100644 index 0000000000000..1e3fe1a987ef5 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/groupingSets.explain @@ -0,0 +1,4 @@ +Aggregate [a#0, spark_grouping_id#0L], [a#0, max(a#0) AS max(a)#0, count(a#0) AS count(a)#0L] ++- Expand [[id#0L, a#0, b#0, a#0, 0], [id#0L, a#0, b#0, null, 1]], [id#0L, a#0, b#0, a#0, spark_grouping_id#0L] + +- Project [id#0L, a#0, b#0, a#0 AS a#0] + +- LocalRelation , [id#0L, a#0, b#0] diff --git a/connector/connect/common/src/test/resources/query-tests/queries/groupingSets.json b/connector/connect/common/src/test/resources/query-tests/queries/groupingSets.json new file mode 100644 index 0000000000000..6e84824ec7a3a --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/groupingSets.json @@ -0,0 +1,50 @@ +{ + "common": { + "planId": "1" + }, + "aggregate": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double\u003e" + } + }, + "groupType": "GROUP_TYPE_GROUPING_SETS", + "groupingExpressions": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }], + "aggregateExpressions": [{ + "unresolvedFunction": { + "functionName": "max", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "a", + "planId": "0" + } + }] + } + }, { + "unresolvedFunction": { + "functionName": "count", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "a", + "planId": "0" + } + }] + } + }], + "groupingSets": [{ + "groupingSet": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }] + }, { + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/groupingSets.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/groupingSets.proto.bin new file mode 100644 index 0000000000000..ce0294096706e Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/groupingSets.proto.bin differ diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 95c5acc803d49..abfc063139056 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -2235,7 +2235,7 @@ class SparkConnectPlanner( JoinWith.typedJoinWith( joined, - session.sqlContext.conf.dataFrameSelfJoinAutoResolveAmbiguity, + session.sessionState.conf.dataFrameSelfJoinAutoResolveAmbiguity, session.sessionState.analyzer.resolver, rel.getJoinDataType.getIsLeftStruct, rel.getJoinDataType.getIsRightStruct) @@ -2563,6 +2563,8 @@ class SparkConnectPlanner( // To avoid explicit handling of the result on the client, we build the expected input // of the relation on the server. The client has to simply forward the result. val result = SqlCommandResult.newBuilder() + // Only filled when isCommand + val metrics = ExecutePlanResponse.Metrics.newBuilder() if (isCommand) { // Convert the results to Arrow. val schema = df.schema @@ -2596,10 +2598,10 @@ class SparkConnectPlanner( proto.LocalRelation .newBuilder() .setData(ByteString.copyFrom(bytes)))) + metrics.addAllMetrics(MetricGenerator.transformPlan(df).asJava) } else { - // Trigger assertExecutedPlanPrepared to ensure post ReadyForExecution before finished - // executedPlan is currently called by createMetricsResponse below - df.queryExecution.assertExecutedPlanPrepared() + // No execution triggered for relations. Manually set ready + tracker.setReadyForExecution() result.setRelation( proto.Relation .newBuilder() @@ -2622,8 +2624,17 @@ class SparkConnectPlanner( .setSqlCommandResult(result) .build()) - // Send Metrics - responseObserver.onNext(MetricGenerator.createMetricsResponse(sessionHolder, df)) + // Send Metrics when isCommand (i.e. show tables) which is eagerly executed & has metrics + // Skip metrics when !isCommand (i.e. select 1) which is not executed & doesn't have metrics + if (isCommand) { + responseObserver.onNext( + ExecutePlanResponse + .newBuilder() + .setSessionId(sessionHolder.sessionId) + .setServerSideSessionId(sessionHolder.serverSessionId) + .setMetrics(metrics.build) + .build) + } } private def handleRegisterUserDefinedFunction( diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/MetricGenerator.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/MetricGenerator.scala index c9bba653e8a8f..e2e4128311871 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/MetricGenerator.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/MetricGenerator.scala @@ -51,6 +51,12 @@ private[connect] object MetricGenerator extends AdaptiveSparkPlanHelper { allChildren(p).flatMap(c => transformPlan(c, p.id)) } + private[connect] def transformPlan( + rows: DataFrame): Seq[ExecutePlanResponse.Metrics.MetricObject] = { + val executedPlan = rows.queryExecution.executedPlan + transformPlan(executedPlan, executedPlan.id) + } + private def transformPlan( p: SparkPlan, parentId: Int): Seq[ExecutePlanResponse.Metrics.MetricObject] = { diff --git a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index f5967a74ad339..c412486ce197e 100644 --- a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -146,7 +146,7 @@ private[spark] class DirectKafkaInputDStream[K, V]( val maxRateLimitPerPartition = ppc.maxRatePerPartition(tp) val backpressureRate = lag / totalLag.toDouble * rate tp -> (if (maxRateLimitPerPartition > 0) { - Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate) + Math.min(backpressureRate, maxRateLimitPerPartition.toDouble)} else backpressureRate) } case None => offsets.map { case (tp, offset) => tp -> ppc.maxRatePerPartition(tp).toDouble } } diff --git a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index 286b073125ff0..6c57091bc3c46 100644 --- a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -98,7 +98,7 @@ private[spark] class KafkaRDD[K, V]( if (compacted) { super.countApprox(timeout, confidence) } else { - val c = count() + val c = count().toDouble new PartialResult(new BoundedDouble(c, 1.0, c, c), true) } diff --git a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index faf114108fac5..28f0906258303 100644 --- a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -805,7 +805,7 @@ private[streaming] class ConstantEstimator(@volatile private var rate: Long) time: Long, elements: Long, processingDelay: Long, - schedulingDelay: Long): Option[Double] = Some(rate) + schedulingDelay: Long): Option[Double] = Some(rate.toDouble) } private[streaming] class ConstantRateController(id: Int, estimator: RateEstimator, rate: Long) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index d6363182606d9..e6d5a750ea325 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -378,7 +378,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( resources.foreach { case (k, v) => PythonRDD.writeUTF(k, dataOut) PythonRDD.writeUTF(v.name, dataOut) - dataOut.writeInt(v.addresses.size) + dataOut.writeInt(v.addresses.length) v.addresses.foreach { case addr => PythonRDD.writeUTF(addr, dataOut) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index cb325b37958ec..b2f35984d37f8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -83,13 +83,13 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { .flatMap(_.iterator) .groupBy(_._1) // group by resource name .map { case (rName, rInfoArr) => - rName -> rInfoArr.map(_._2.addresses.size).sum + rName -> rInfoArr.map(_._2.addresses.length).sum } val usedInfo = aliveWorkers.map(_.resourcesInfoUsed) .flatMap(_.iterator) .groupBy(_._1) // group by resource name .map { case (rName, rInfoArr) => - rName -> rInfoArr.map(_._2.addresses.size).sum + rName -> rInfoArr.map(_._2.addresses.length).sum } formatResourcesUsed(totalInfo, usedInfo) } diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala index 486e59652218b..8c474e9b76c6a 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala @@ -46,7 +46,7 @@ class ExecutorMetrics private[spark] extends Serializable { private[spark] def this(metrics: Array[Long]) = { this() - Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, this.metrics.size)) + Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.length, this.metrics.length)) } private[spark] def this(metrics: AtomicLongArray) = { diff --git a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala index 978afaffab30b..4897cf694ae8e 100644 --- a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala @@ -74,7 +74,7 @@ private[spark] class FixedLengthBinaryInputFormat if (defaultSize < recordLength) { recordLength.toLong } else { - (Math.floor(defaultSize / recordLength) * recordLength).toLong + defaultSize / recordLength * recordLength } } diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala index 877f04b1adc01..189d390d37999 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala @@ -124,9 +124,9 @@ private[spark] class StatsdReporter( private def reportTimer(name: String, timer: Timer)(implicit socket: DatagramSocket): Unit = { val snapshot = timer.getSnapshot - send(fullName(name, "max"), format(convertDuration(snapshot.getMax)), TIMER) + send(fullName(name, "max"), format(convertDuration(snapshot.getMax.toDouble)), TIMER) send(fullName(name, "mean"), format(convertDuration(snapshot.getMean)), TIMER) - send(fullName(name, "min"), format(convertDuration(snapshot.getMin)), TIMER) + send(fullName(name, "min"), format(convertDuration(snapshot.getMin.toDouble)), TIMER) send(fullName(name, "stddev"), format(convertDuration(snapshot.getStdDev)), TIMER) send(fullName(name, "p50"), format(convertDuration(snapshot.getMedian)), TIMER) send(fullName(name, "p75"), format(convertDuration(snapshot.get75thPercentile)), TIMER) diff --git a/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala index cbee136871012..a974ca2f1a05b 100644 --- a/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala +++ b/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala @@ -35,7 +35,7 @@ private[spark] class CountEvaluator(totalOutputs: Int, confidence: Double) override def currentResult(): BoundedDouble = { if (outputsMerged == totalOutputs) { - new BoundedDouble(sum, 1.0, sum, sum) + new BoundedDouble(sum.toDouble, 1.0, sum.toDouble, sum.toDouble) } else if (outputsMerged == 0 || sum == 0) { new BoundedDouble(0, 0.0, 0.0, Double.PositiveInfinity) } else { @@ -57,7 +57,8 @@ private[partial] object CountEvaluator { val low = dist.inverseCumulativeProbability((1 - confidence) / 2) val high = dist.inverseCumulativeProbability((1 + confidence) / 2) // Add 'sum' to each because distribution is just of remaining count, not observed - new BoundedDouble(sum + dist.getNumericalMean, confidence, sum + low, sum + high) + new BoundedDouble( + sum + dist.getNumericalMean, confidence, (sum + low).toDouble, (sum + high).toDouble) } diff --git a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala index d2b4187df5d50..7cd60815fadbe 100644 --- a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala +++ b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala @@ -41,7 +41,9 @@ private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, conf override def currentResult(): Map[T, BoundedDouble] = { if (outputsMerged == totalOutputs) { - sums.map { case (key, sum) => (key, new BoundedDouble(sum, 1.0, sum, sum)) }.toMap + sums.map { case (key, sum) => + (key, new BoundedDouble(sum.toDouble, 1.0, sum.toDouble, sum.toDouble)) + }.toMap } else if (outputsMerged == 0) { new HashMap[T, BoundedDouble] } else { diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala index 9080be01a9e66..fe08e8337f76f 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala @@ -303,7 +303,7 @@ private[spark] object ResourceUtils extends Logging { allocations: Map[String, ResourceInformation], execReqs: Map[String, ExecutorResourceRequest]): Unit = { execReqs.foreach { case (rName, req) => - require(allocations.contains(rName) && allocations(rName).addresses.size >= req.amount, + require(allocations.contains(rName) && allocations(rName).addresses.length >= req.amount, s"Resource: ${rName}, with addresses: " + s"${allocations(rName).addresses.mkString(",")} " + s"is less than what the user requested: ${req.amount})") @@ -476,7 +476,7 @@ private[spark] object ResourceUtils extends Logging { if (maxTaskPerExec < (execAmount * numParts / taskAmount)) { val origTaskAmount = treq.amount val taskReqStr = s"${origTaskAmount}/${numParts}" - val resourceNumSlots = Math.floor(execAmount * numParts / taskAmount).toInt + val resourceNumSlots = (execAmount * numParts / taskAmount).toInt val message = s"The configuration of resource: ${treq.resourceName} " + s"(exec = ${execAmount}, task = ${taskReqStr}, " + s"runnable tasks = ${resourceNumSlots}) will " + diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index d10cf55ed0d10..113521453ad7b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -95,7 +95,7 @@ private[spark] object MapStatus { } else if (size <= 1L) { 1 } else { - math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte + math.min(255, math.ceil(math.log(size.toDouble) / math.log(LOG_BASE)).toInt).toByte } } @@ -276,12 +276,12 @@ private[spark] object HighlyCompressedMapStatus { val skewSizeThreshold = Math.max( medianSize * accurateBlockSkewedFactor, - sortedSizes(totalNumBlocks - maxAccurateSkewedBlockNumber) + sortedSizes(totalNumBlocks - maxAccurateSkewedBlockNumber).toDouble ) - Math.min(shuffleAccurateBlockThreshold, skewSizeThreshold) + Math.min(shuffleAccurateBlockThreshold.toDouble, skewSizeThreshold) } else { // Disable skew detection if accurateBlockSkewedFactor <= 0 - shuffleAccurateBlockThreshold + shuffleAccurateBlockThreshold.toDouble } val hugeBlockSizes = mutable.Map.empty[Int, Byte] diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala index 6e6507782a49e..75032086ead72 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala @@ -80,7 +80,7 @@ private[spark] object TaskDescription { map.foreach { case (key, value) => dataOut.writeUTF(key) dataOut.writeUTF(value.name) - dataOut.writeInt(value.addresses.size) + dataOut.writeInt(value.addresses.length) value.addresses.foreach(dataOut.writeUTF(_)) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 41f6b3ad64bf5..15ae2fef221d1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -434,7 +434,7 @@ private[spark] class TaskSchedulerImpl( // addresses are the same as that we allocated in taskResourceAssignments since it's // synchronized. We don't remove the exact addresses allocated because the current // approach produces the identical result with less time complexity. - availableResources(i)(rName).remove(0, rInfo.addresses.size) + availableResources(i)(rName).remove(0, rInfo.addresses.length) } } } catch { @@ -752,7 +752,7 @@ private[spark] class TaskSchedulerImpl( .mkString(",") addressesWithDescs.foreach(_._2.properties.setProperty("addresses", addressesStr)) - logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for " + + logInfo(s"Successfully scheduled all the ${addressesWithDescs.length} tasks for " + s"barrier stage ${taskSet.stageId}.") } taskSet.barrierPendingLaunchTasks.clear() diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 6157a3e46c875..d17e6735c4ecf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -809,7 +809,7 @@ private[spark] class TaskSetManager( info.markFinished(TaskState.FINISHED, clock.getTimeMillis()) if (speculationEnabled) { - successfulTaskDurations.insert(info.duration) + successfulTaskDurations.insert(info.duration.toDouble) taskProcessRateCalculator.foreach(_.updateAvgTaskProcessRate(tid, result)) } removeRunningTask(tid) @@ -1196,7 +1196,7 @@ private[spark] class TaskSetManager( val timeMs = clock.getTimeMillis() if (numSuccessfulTasks >= minFinishedForSpeculation) { val medianDuration = successfulTaskDurations.percentile() - val threshold = max(speculationMultiplier * medianDuration, minTimeToSpeculation) + val threshold = max(speculationMultiplier * medianDuration, minTimeToSpeculation.toDouble) // TODO: Threshold should also look at standard deviation of task durations and have a lower // bound based on that. logDebug("Task length threshold for speculation: " + threshold) @@ -1204,7 +1204,8 @@ private[spark] class TaskSetManager( } else if (isSpeculationThresholdSpecified && speculationTasksLessEqToSlots) { val threshold = speculationTaskDurationThresOpt.get logDebug(s"Tasks taking longer time than provided speculation threshold: $threshold") - foundTasks = checkAndSubmitSpeculatableTasks(timeMs, threshold, customizedThreshold = true) + foundTasks = checkAndSubmitSpeculatableTasks( + timeMs, threshold.toDouble, customizedThreshold = true) } // avoid more warning logs. if (foundTasks) { diff --git a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala index dff94b4e875de..b5473e076946b 100644 --- a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala +++ b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala @@ -74,7 +74,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging { * the progress bar, then progress bar will be showed in next line without overwrite logs. */ private def show(now: Long, stages: Seq[StageData]): Unit = { - val width = TerminalWidth / stages.size + val width = TerminalWidth / stages.length val bar = stages.map { s => val total = s.numTasks val header = s"[Stage ${s.stageId}:" diff --git a/core/src/main/scala/org/apache/spark/util/Clock.scala b/core/src/main/scala/org/apache/spark/util/Clock.scala index 226f15d3d38c2..e0cb3f4188e6d 100644 --- a/core/src/main/scala/org/apache/spark/util/Clock.scala +++ b/core/src/main/scala/org/apache/spark/util/Clock.scala @@ -85,7 +85,7 @@ private[spark] class SystemClock extends Clock { return currentTime } - val pollTime = math.max(waitTime / 10.0, minPollTime).toLong + val pollTime = math.max(waitTime / 10.0, minPollTime.toDouble).toLong while (true) { currentTime = System.currentTimeMillis() diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index 3245a528b74cf..4c7b12f60cc8d 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -245,7 +245,7 @@ private[spark] object HadoopFSUtils extends Logging { val allLeafStatuses = { val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) val filteredNestedFiles: Seq[FileStatus] = contextOpt match { - case Some(context) if dirs.size > parallelismThreshold => + case Some(context) if dirs.length > parallelismThreshold => parallelListLeafFilesInternal( context, dirs.map(_.getPath).toImmutableArraySeq, diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala index 23fc0f88f0b93..ec74ce0473efd 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala @@ -69,7 +69,7 @@ private[io] class ChunkedByteBufferFileRegion( if (keepGoing) { // advance to the next chunk (if there are any more) currentChunkIdx += 1 - if (currentChunkIdx == chunks.size) { + if (currentChunkIdx == chunks.length) { keepGoing = false } else { currentChunk = chunks(currentChunkIdx) diff --git a/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala b/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala index f08cf44e4e12b..08e2ea01f623e 100644 --- a/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala @@ -98,8 +98,8 @@ private[spark] object StratifiedSamplingUtils extends Logging { if (acceptResult.areBoundsEmpty) { val n = counts.get(key) val sampleSize = math.ceil(n * fraction).toLong - val lmbd1 = PoissonBounds.getLowerBound(sampleSize) - val lmbd2 = PoissonBounds.getUpperBound(sampleSize) + val lmbd1 = PoissonBounds.getLowerBound(sampleSize.toDouble) + val lmbd2 = PoissonBounds.getUpperBound(sampleSize.toDouble) acceptResult.acceptBound = lmbd1 / n acceptResult.waitListBound = (lmbd2 - lmbd1) / n } diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index c425596eb0433..874f4896bb01e 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -170,10 +170,10 @@ trait RDDCheckpointTester { self: SparkFunSuite => * upon checkpointing. Ignores the checkpointData field, which may grow when we checkpoint. */ private def getSerializedSizes(rdd: RDD[_]): (Int, Int) = { - val rddSize = Utils.serialize(rdd).size - val rddCpDataSize = Utils.serialize(rdd.checkpointData).size - val rddPartitionSize = Utils.serialize(rdd.partitions).size - val rddDependenciesSize = Utils.serialize(rdd.dependencies).size + val rddSize = Utils.serialize(rdd).length + val rddCpDataSize = Utils.serialize(rdd.checkpointData).length + val rddPartitionSize = Utils.serialize(rdd.partitions).length + val rddDependenciesSize = Utils.serialize(rdd.dependencies).length // Print detailed size, helps in debugging logInfo("Serialized sizes of " + rdd + @@ -339,7 +339,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS runTest("ParallelCollectionRDD") { reliableCheckpoint: Boolean => val parCollection = sc.makeRDD(1 to 4, 2) - val numPartitions = parCollection.partitions.size + val numPartitions = parCollection.partitions.length checkpoint(parCollection, reliableCheckpoint) assert(parCollection.dependencies === Nil) val result = parCollection.collect() @@ -358,7 +358,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS val blockManager = SparkEnv.get.blockManager blockManager.putSingle(blockId, "test", StorageLevel.MEMORY_ONLY) val blockRDD = new BlockRDD[String](sc, Array(blockId)) - val numPartitions = blockRDD.partitions.size + val numPartitions = blockRDD.partitions.length checkpoint(blockRDD, reliableCheckpoint) val result = blockRDD.collect() if (reliableCheckpoint) { @@ -507,7 +507,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS runTest("CheckpointRDD with zero partitions") { reliableCheckpoint: Boolean => val rdd = new BlockRDD[Int](sc, Array.empty[BlockId]) - assert(rdd.partitions.size === 0) + assert(rdd.partitions.length === 0) assert(rdd.isCheckpointed === false) assert(rdd.isCheckpointedAndMaterialized === false) checkpoint(rdd, reliableCheckpoint) @@ -516,7 +516,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS assert(rdd.count() === 0) assert(rdd.isCheckpointed) assert(rdd.isCheckpointedAndMaterialized) - assert(rdd.partitions.size === 0) + assert(rdd.partitions.length === 0) } runTest("checkpointAllMarkedAncestors") { reliableCheckpoint: Boolean => diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index e156533be15ca..a2b09f0ef3c3a 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -80,7 +80,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex sc = new SparkContext(clusterUrl, "test") val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (2, 1)), 5) val groups = pairs.groupByKey(5).collect() - assert(groups.size === 2) + assert(groups.length === 2) val valuesFor1 = groups.find(_._1 == 1).get._2 assert(valuesFor1.toList.sorted === List(1, 2, 3)) val valuesFor2 = groups.find(_._1 == 2).get._2 @@ -264,8 +264,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex sc = new SparkContext(clusterUrl, "test") val data = sc.parallelize(Seq(true, true), 2) assert(data.count() === 2) // force executors to start - assert(data.map(markNodeIfIdentity).collect().size === 2) - assert(data.map(failOnMarkedIdentity).collect().size === 2) + assert(data.map(markNodeIfIdentity).collect().length === 2) + assert(data.map(failOnMarkedIdentity).collect().length === 2) } test("recover from repeated node failures during shuffle-map") { @@ -275,7 +275,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex for (i <- 1 to 3) { val data = sc.parallelize(Seq(true, false), 2) assert(data.count() === 2) - assert(data.map(markNodeIfIdentity).collect().size === 2) + assert(data.map(markNodeIfIdentity).collect().length === 2) assert(data.map(failOnMarkedIdentity).map(x => x -> x).groupByKey().count() === 2) } } @@ -287,7 +287,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex for (i <- 1 to 3) { val data = sc.parallelize(Seq(true, true), 2) assert(data.count() === 2) - assert(data.map(markNodeIfIdentity).collect().size === 2) + assert(data.map(markNodeIfIdentity).collect().length === 2) // This relies on mergeCombiners being used to perform the actual reduce for this // test to actually be testing what it claims. val grouped = data.map(x => x -> x).combineByKey( @@ -295,7 +295,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex (x: Boolean, y: Boolean) => x, (x: Boolean, y: Boolean) => failOnMarkedIdentity(x) ) - assert(grouped.collect().size === 1) + assert(grouped.collect().length === 1) } } @@ -310,8 +310,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex data.persist(StorageLevel.MEMORY_ONLY_2) assert(data.count() === 4) - assert(data.map(markNodeIfIdentity).collect().size === 4) - assert(data.map(failOnMarkedIdentity).collect().size === 4) + assert(data.map(markNodeIfIdentity).collect().length === 4) + assert(data.map(failOnMarkedIdentity).collect().length === 4) // Create a new replicated RDD to make sure that cached peer information doesn't cause // problems. diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 4a2b2339159cb..7750db6020887 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -236,7 +236,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { // Try reading the output back as an object file val ct = reflect.ClassTag[Any](Utils.classForName(className, noSparkClassLoader = true)) val output = sc.objectFile[Any](outputDir) - assert(output.collect().size === 3) + assert(output.collect().length === 3) assert(output.collect().head.getClass.getName === className) } } diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index dde30aee82878..5d635011d2ec6 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -237,13 +237,13 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { // as it has 4 out of 7 bytes of output. val topLocs50 = tracker.getLocationsWithLargestOutputs(10, 0, 1, 0.5) assert(topLocs50.nonEmpty) - assert(topLocs50.get.size === 1) + assert(topLocs50.get.length === 1) assert(topLocs50.get.head === BlockManagerId("a", "hostA", 1000)) // When the threshold is 20%, both hosts should be returned as preferred locations. val topLocs20 = tracker.getLocationsWithLargestOutputs(10, 0, 1, 0.2) assert(topLocs20.nonEmpty) - assert(topLocs20.get.size === 2) + assert(topLocs20.get.length === 2) assert(topLocs20.get.toSet === Seq(BlockManagerId("a", "hostA", 1000), BlockManagerId("b", "hostB", 1000)).toSet) diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 28fa9f5e23e79..3447ba8c1765e 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -77,7 +77,7 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext with Priva for (element <- 1 to 1000) { val partition = partitioner.getPartition(element) if (numPartitions > 1) { - if (partition < rangeBounds.size) { + if (partition < rangeBounds.length) { assert(element <= rangeBounds(partition)) } if (partition > 0) { @@ -111,7 +111,7 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext with Priva assert(count === rdd.count()) sketched.foreach { case (idx, n, sample) => assert(n === idx) - assert(sample.size === math.min(n, sampleSizePerPartition)) + assert(sample.length === math.min(n, sampleSizePerPartition)) } } diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index a92d532907adf..ac10a00d98e04 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -51,7 +51,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalRootDi sc = new SparkContext("local", "test", myConf) val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (2, 1)), 4) val groups = pairs.groupByKey(4).collect() - assert(groups.size === 2) + assert(groups.length === 2) val valuesFor1 = groups.find(_._1 == 1).get._2 assert(valuesFor1.toList.sorted === List(1, 2, 3)) val valuesFor2 = groups.find(_._1 == 2).get._2 diff --git a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala index 0b33e2a9426ce..e7315d6119be0 100644 --- a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala +++ b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala @@ -163,7 +163,7 @@ private[spark] class Benchmark( // scalastyle:on assert(runTimes.nonEmpty) val best = runTimes.min - val avg = runTimes.sum / runTimes.size + val avg = runTimes.sum.toDouble / runTimes.size val stdev = if (runTimes.size > 1) { math.sqrt(runTimes.map(time => (time - avg) * (time - avg)).sum / (runTimes.size - 1)) } else 0 diff --git a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala index 3b3bcff0c5a3f..20993df718a3b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala @@ -439,7 +439,7 @@ class DecommissionWorkerSuite val appId = sc.applicationId eventually(timeout(1.minute), interval(1.seconds)) { val apps = getApplications() - assert(apps.size === 1) + assert(apps.length === 1) assert(apps.head.id === appId) assert(apps.head.getExecutorLimit === Int.MaxValue) } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index a032e9aa16be9..553d001285b2d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -1736,7 +1736,7 @@ object SimpleApplicationTest { .map(x => SparkEnv.get.conf.get(config)) .collect() .distinct - if (executorValues.size != 1) { + if (executorValues.length != 1) { throw new SparkException(s"Inconsistent values for $config: " + s"${executorValues.mkString("values(", ", ", ")")}") } @@ -1795,7 +1795,7 @@ class TestFileSystem extends org.apache.hadoop.fs.LocalFileSystem { class TestSparkApplication extends SparkApplication with Matchers { override def start(args: Array[String], conf: SparkConf): Unit = { - assert(args.size === 1) + assert(args.length === 1) assert(args(0) === "hello") assert(conf.get("spark.test.hello") === "world") assert(sys.props.get("spark.test.hello") === None) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 01995ca3632d2..5ecc551c16b8c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -69,7 +69,7 @@ class StandaloneDynamicAllocationSuite workers = makeWorkers(10, 2048) // Wait until all workers register with master successfully eventually(timeout(1.minute), interval(10.milliseconds)) { - assert(getMasterState.workers.size === numWorkers) + assert(getMasterState.workers.length === numWorkers) } } @@ -93,7 +93,7 @@ class StandaloneDynamicAllocationSuite val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() - assert(apps.size === 1) + assert(apps.length === 1) assert(apps.head.id === appId) assert(apps.head.executors.size === 2) assert(apps.head.getExecutorLimit === Int.MaxValue) @@ -140,7 +140,7 @@ class StandaloneDynamicAllocationSuite val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() - assert(apps.size === 1) + assert(apps.length === 1) assert(apps.head.id === appId) assert(apps.head.executors.size === 2) assert(apps.head.executors.values.map(_.cores).toArray === Array(4, 4)) @@ -195,7 +195,7 @@ class StandaloneDynamicAllocationSuite val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() - assert(apps.size === 1) + assert(apps.length === 1) assert(apps.head.id === appId) assert(apps.head.executors.size === 2) assert(apps.head.executors.values.map(_.cores).toArray === Array(8, 8)) @@ -248,7 +248,7 @@ class StandaloneDynamicAllocationSuite val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() - assert(apps.size === 1) + assert(apps.length === 1) assert(apps.head.id === appId) assert(apps.head.executors.size === 10) // 20 cores total assert(apps.head.getExecutorLimit === Int.MaxValue) @@ -302,7 +302,7 @@ class StandaloneDynamicAllocationSuite val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() - assert(apps.size === 1) + assert(apps.length === 1) assert(apps.head.id === appId) assert(apps.head.executors.size === 4) // 8 cores total assert(apps.head.getExecutorLimit === Int.MaxValue) @@ -360,7 +360,7 @@ class StandaloneDynamicAllocationSuite sc.requestExecutors(2) eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() - assert(apps.size === 1) + assert(apps.length === 1) assert(apps.head.id === appId) assert(apps.head.executors.size === 2) assert(apps.head.getExecutorLimit === 2) @@ -385,7 +385,7 @@ class StandaloneDynamicAllocationSuite sc.requestExecutors(2) eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() - assert(apps.size === 1) + assert(apps.length === 1) assert(apps.head.id === appId) assert(apps.head.executors.size === 2) assert(apps.head.getExecutorLimit === 2) @@ -425,7 +425,7 @@ class StandaloneDynamicAllocationSuite val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() - assert(apps.size === 1) + assert(apps.length === 1) assert(apps.head.id === appId) assert(apps.head.executors.size === 2) assert(apps.head.getExecutorLimit === Int.MaxValue) @@ -465,7 +465,7 @@ class StandaloneDynamicAllocationSuite val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() - assert(apps.size === 1) + assert(apps.length === 1) assert(apps.head.id === appId) assert(apps.head.executors.size === initialExecutorLimit) assert(apps.head.getExecutorLimit === initialExecutorLimit) @@ -477,7 +477,7 @@ class StandaloneDynamicAllocationSuite val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() - assert(apps.size === 1) + assert(apps.length === 1) assert(apps.head.id === appId) assert(apps.head.executors.size === 2) assert(apps.head.getExecutorLimit === Int.MaxValue) diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index d109ed8442d44..3555faf5c2cb9 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -71,7 +71,7 @@ class AppClientSuite workers = makeWorkers(10, 2048) // Wait until all workers register with master successfully eventually(timeout(1.minute), interval(10.milliseconds)) { - assert(getMasterState.workers.size === numWorkers) + assert(getMasterState.workers.length === numWorkers) } } @@ -99,7 +99,7 @@ class AppClientSuite eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() assert(ci.listener.connectedIdList.size === 1, "client listener should have one connection") - assert(apps.size === 1, "master should have 1 registered app") + assert(apps.length === 1, "master should have 1 registered app") } // Send message to Master to request Executors, verify request by change in executor limit @@ -176,7 +176,7 @@ class AppClientSuite eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() assert(ci.listener.connectedIdList.size === 1, "client listener should have one connection") - assert(apps.size === 1, "master should have 1 registered app") + assert(apps.length === 1, "master should have 1 registered app") } // Send message to Master to request Executors with multiple resource profiles. diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala index ac89f60955eed..0161917f8853d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala @@ -56,7 +56,7 @@ object EventLogTestHelper { eventStr: String, desiredSize: Long): Seq[String] = { val stringLen = eventStr.getBytes(StandardCharsets.UTF_8).length - val repeatCount = Math.floor(desiredSize / stringLen).toInt + val repeatCount = (desiredSize / stringLen).toInt (0 until repeatCount).map { _ => writer.writeEvent(eventStr, flushLogger = true) eventStr diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index d16e904bdcf13..3013a5bf4a294 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -1113,13 +1113,13 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P provider.checkForLogs() provider.cleanLogs() - assert(new File(testDir.toURI).listFiles().size === logCount) + assert(new File(testDir.toURI).listFiles().length === logCount) // Move the clock forward 1 day and scan the files again. They should still be there. clock.advance(TimeUnit.DAYS.toMillis(1)) provider.checkForLogs() provider.cleanLogs() - assert(new File(testDir.toURI).listFiles().size === logCount) + assert(new File(testDir.toURI).listFiles().length === logCount) // Update the slow app to contain valid info. Code should detect the change and not clean // it up. @@ -1133,7 +1133,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P clock.advance(TimeUnit.DAYS.toMillis(2)) provider.checkForLogs() provider.cleanLogs() - assert(new File(testDir.toURI).listFiles().size === validLogCount) + assert(new File(testDir.toURI).listFiles().length === validLogCount) } test("always find end event for finished apps") { @@ -1414,12 +1414,12 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P provider.checkForLogs() // The invalid application log file would be cleaned by checkAndCleanLog(). - assert(new File(testDir.toURI).listFiles().size === 1) + assert(new File(testDir.toURI).listFiles().length === 1) clock.advance(1) // cleanLogs() would clean the valid application log file. provider.cleanLogs() - assert(new File(testDir.toURI).listFiles().size === 0) + assert(new File(testDir.toURI).listFiles().length === 0) } private def assertOptionAfterSerde(opt: Option[Long], expected: Option[Long]): Unit = { @@ -1556,7 +1556,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) provider.checkForLogs() provider.cleanLogs() - assert(dir.listFiles().size === 1) + assert(dir.listFiles().length === 1) assert(provider.getListing().length === 1) // Manually delete the appstatus file to make an invalid rolling event log @@ -1578,7 +1578,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P provider.checkForLogs() provider.cleanLogs() assert(provider.getListing().length === 1) - assert(dir.listFiles().size === 2) + assert(dir.listFiles().length === 2) // Make sure a new provider sees the valid application provider.stop() @@ -1615,7 +1615,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P // The 1st checkForLogs should scan/update app2 only since it is newer than app1 provider.checkForLogs() assert(provider.getListing().length === 1) - assert(dir.listFiles().size === 2) + assert(dir.listFiles().length === 2) assert(provider.getListing().map(e => e.id).contains("app2")) assert(!provider.getListing().map(e => e.id).contains("app1")) @@ -1630,7 +1630,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P // The 2nd checkForLogs should scan/update app3 only since it is newer than app1 provider.checkForLogs() assert(provider.getListing().length === 2) - assert(dir.listFiles().size === 3) + assert(dir.listFiles().length === 3) assert(provider.getListing().map(e => e.id).contains("app3")) assert(!provider.getListing().map(e => e.id).contains("app1")) @@ -1655,7 +1655,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) provider.checkForLogs() provider.cleanLogs() - assert(dir.listFiles().size === 1) + assert(dir.listFiles().length === 1) assert(provider.getListing().length === 1) // Manually delete event log files and create event log file reader diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index 2f645e69079a2..abe05a8055843 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -289,7 +289,7 @@ class StandaloneRestSubmitSuite extends SparkFunSuite { val statusRequestPath = s"$httpUrl/$v/submissions/status" val goodJson = constructSubmitRequest(masterUrl).toJson val badJson1 = goodJson.replaceAll("action", "fraction") // invalid JSON - val badJson2 = goodJson.substring(goodJson.size / 2) // malformed JSON + val badJson2 = goodJson.substring(goodJson.length / 2) // malformed JSON val notJson = "\"hello, world\"" val (response1, code1) = sendHttpRequestWithResponse(submitRequestPath, "POST") // missing JSON val (response2, code2) = sendHttpRequestWithResponse(submitRequestPath, "POST", badJson1) diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala index e64ebe2a55142..0fc0b7536067e 100644 --- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala @@ -97,7 +97,7 @@ class WholeTextFileRecordReaderSuite extends SparkFunSuite { val res = sc.wholeTextFiles(dir.toString, 3).collect() - assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size, + assert(res.length === WholeTextFileRecordReaderSuite.fileNames.length, "Number of files read out does not fit with the actual value.") for ((filename, contents) <- res) { @@ -120,7 +120,7 @@ class WholeTextFileRecordReaderSuite extends SparkFunSuite { val res = sc.wholeTextFiles(dir.toString, 3).collect() - assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size, + assert(res.length === WholeTextFileRecordReaderSuite.fileNames.length, "Number of files read out does not fit with the actual value.") for ((filename, contents) <- res) { diff --git a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala index ef214bd50d928..95b484d7176a5 100644 --- a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala @@ -214,11 +214,11 @@ class PluginContainerSuite extends SparkFunSuite with LocalSparkContext { } val execFiles = children.filter(_.getName.startsWith(NonLocalModeSparkPlugin.executorFileStr)) - assert(execFiles.size === 1) + assert(execFiles.length === 1) val allLines = Files.readLines(execFiles(0), StandardCharsets.UTF_8) assert(allLines.size === 1) val addrs = NonLocalModeSparkPlugin.extractGpuAddrs(allLines.get(0)) - assert(addrs.size === 2) + assert(addrs.length === 2) assert(addrs.sorted === Array("3", "4")) assert(NonLocalModeSparkPlugin.driverContext != null) diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index 56783de1c13b4..4239180ba6c37 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -91,7 +91,7 @@ class AsyncRDDActionsSuite extends SparkFunSuite with TimeLimits { val expected = input.take(num) val saw = rdd.takeAsync(num).get() assert(saw == expected, "incorrect result for rdd with %d partitions (expected %s, saw %s)" - .format(rdd.partitions.size, expected, saw)) + .format(rdd.partitions.length, expected, saw)) } val input = Range(1, 1000) diff --git a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala index f644fee74a18b..591b8b4c0df7e 100644 --- a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala @@ -159,7 +159,7 @@ class LocalCheckpointSuite extends SparkFunSuite with LocalSparkContext { test("missing checkpoint block fails with informative message") { val rdd = newRdd.localCheckpoint() - val numPartitions = rdd.partitions.size + val numPartitions = rdd.partitions.length val partitionIndices = rdd.partitions.map(_.index) val bmm = sc.env.blockManager.master diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 9b60d2eeeed1b..e436d98843411 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -41,7 +41,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val pairs = sc.parallelize(Seq((1, 1), (1, 1), (3, 2), (5, 1), (5, 3)), 2) val sets = pairs.aggregateByKey(new HashSet[Int]())(_ += _, _ ++= _).collect() - assert(sets.size === 3) + assert(sets.length === 3) val valuesFor1 = sets.find(_._1 == 1).get._2 assert(valuesFor1.toList.sorted === List(1)) val valuesFor3 = sets.find(_._1 == 3).get._2 @@ -53,7 +53,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { test("groupByKey") { val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (2, 1))) val groups = pairs.groupByKey().collect() - assert(groups.size === 2) + assert(groups.length === 2) val valuesFor1 = groups.find(_._1 == 1).get._2 assert(valuesFor1.toList.sorted === List(1, 2, 3)) val valuesFor2 = groups.find(_._1 == 2).get._2 @@ -63,7 +63,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { test("groupByKey with duplicates") { val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) val groups = pairs.groupByKey().collect() - assert(groups.size === 2) + assert(groups.length === 2) val valuesFor1 = groups.find(_._1 == 1).get._2 assert(valuesFor1.toList.sorted === List(1, 1, 2, 3)) val valuesFor2 = groups.find(_._1 == 2).get._2 @@ -73,7 +73,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { test("groupByKey with negative key hash codes") { val pairs = sc.parallelize(Seq((-1, 1), (-1, 2), (-1, 3), (2, 1))) val groups = pairs.groupByKey().collect() - assert(groups.size === 2) + assert(groups.length === 2) val valuesForMinus1 = groups.find(_._1 == -1).get._2 assert(valuesForMinus1.toList.sorted === List(1, 2, 3)) val valuesFor2 = groups.find(_._1 == 2).get._2 @@ -83,7 +83,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { test("groupByKey with many output partitions") { val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (2, 1))) val groups = pairs.groupByKey(10).collect() - assert(groups.size === 2) + assert(groups.length === 2) val valuesFor1 = groups.find(_._1 == 1).get._2 assert(valuesFor1.toList.sorted === List(1, 2, 3)) val valuesFor2 = groups.find(_._1 == 2).get._2 @@ -249,7 +249,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val joined = rdd1.join(rdd2).collect() - assert(joined.size === 4) + assert(joined.length === 4) assert(joined.toSet === Set( (1, (1, 'x')), (1, (2, 'x')), @@ -262,7 +262,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (1, 3))) val rdd2 = sc.parallelize(Seq((1, 'x'), (1, 'y'))) val joined = rdd1.join(rdd2).collect() - assert(joined.size === 6) + assert(joined.length === 6) assert(joined.toSet === Set( (1, (1, 'x')), (1, (1, 'y')), @@ -277,7 +277,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val joined = rdd1.leftOuterJoin(rdd2).collect() - assert(joined.size === 5) + assert(joined.length === 5) assert(joined.toSet === Set( (1, (1, Some('x'))), (1, (2, Some('x'))), @@ -296,7 +296,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val rdd2 = sc.emptyRDD[(Int, Int)](intPairCT) val joined = rdd1.cogroup(rdd2).collect() - assert(joined.size > 0) + assert(joined.length > 0) } // See SPARK-9326 @@ -307,7 +307,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.emptyRDD[Int](intCT).groupBy((x) => 5) val joined = rdd1.cogroup(rdd2).collect() - assert(joined.size > 0) + assert(joined.length > 0) } // See SPARK-22465 @@ -377,7 +377,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val joined = rdd1.rightOuterJoin(rdd2).collect() - assert(joined.size === 5) + assert(joined.length === 5) assert(joined.toSet === Set( (1, (Some(1), 'x')), (1, (Some(2), 'x')), @@ -391,7 +391,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val joined = rdd1.fullOuterJoin(rdd2).collect() - assert(joined.size === 6) + assert(joined.length === 6) assert(joined.toSet === Set( (1, (Some(1), Some('x'))), (1, (Some(2), Some('x'))), @@ -406,14 +406,14 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Seq((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))) val joined = rdd1.join(rdd2).collect() - assert(joined.size === 0) + assert(joined.length === 0) } test("join with many output partitions") { val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val joined = rdd1.join(rdd2, 10).collect() - assert(joined.size === 4) + assert(joined.length === 4) assert(joined.toSet === Set( (1, (1, 'x')), (1, (2, 'x')), @@ -426,7 +426,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val joined = rdd1.groupWith(rdd2).collect() - assert(joined.size === 4) + assert(joined.length === 4) val joinedSet = joined.map(x => (x._1, (x._2._1.toList, x._2._2.toList))).toSet assert(joinedSet === Set( (1, (List(1, 2), List('x'))), @@ -441,7 +441,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val rdd3 = sc.parallelize(Seq((1, 'a'), (3, 'b'), (4, 'c'), (4, 'd'))) val joined = rdd1.groupWith(rdd2, rdd3).collect() - assert(joined.size === 4) + assert(joined.length === 4) val joinedSet = joined.map(x => (x._1, (x._2._1.toList, x._2._2.toList, x._2._3.toList))).toSet assert(joinedSet === Set( @@ -458,7 +458,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val rdd3 = sc.parallelize(Seq((1, 'a'), (3, 'b'), (4, 'c'), (4, 'd'))) val rdd4 = sc.parallelize(Seq((2, '@'))) val joined = rdd1.groupWith(rdd2, rdd3, rdd4).collect() - assert(joined.size === 4) + assert(joined.length === 4) val joinedSet = joined.map(x => (x._1, (x._2._1.toList, x._2._2.toList, x._2._3.toList, x._2._4.toList))).toSet assert(joinedSet === Set( @@ -492,14 +492,14 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val b = a.map(a => (a, (a * 2).toString)) // then a group by, and see we didn't revert to 2 partitions val c = b.groupByKey() - assert(c.partitions.size === 2000) + assert(c.partitions.length === 2000) } test("default partitioner uses largest partitioner") { val a = sc.makeRDD(Seq((1, "a"), (2, "b")), 2) val b = sc.makeRDD(Seq((1, "a"), (2, "b")), 2000) val c = a.join(b) - assert(c.partitions.size === 2000) + assert(c.partitions.length === 2000) } test("subtract") { @@ -507,7 +507,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val b = sc.parallelize(Array(2, 3, 4).toImmutableArraySeq, 4) val c = a.subtract(b) assert(c.collect().toSet === Set(1)) - assert(c.partitions.size === a.partitions.size) + assert(c.partitions.length === a.partitions.length) } test("subtract with narrow dependency") { @@ -531,7 +531,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { val b = sc.parallelize(Seq((2, 20), (3, 30), (4, 40)), 4) val c = a.subtractByKey(b) assert(c.collect().toSet === Set((1, "a"), (1, "a"))) - assert(c.partitions.size === a.partitions.size) + assert(c.partitions.length === a.partitions.length) } test("subtractByKey with narrow dependency") { @@ -795,7 +795,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { assertBinomialSample(exact = exact, actual = v.toInt, trials = trials(k).toInt, p = samplingRate) } - assert(takeSample.size === takeSample.toSet.size) + assert(takeSample.length === takeSample.toSet.size) takeSample.foreach { x => assert(1 <= x._2 && x._2 <= n, s"elements not in [1, $n]") } } diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala index 3a097e5335a2a..7f12d8b624c84 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala @@ -47,7 +47,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext with Eventuall val piped = nums.pipe(Seq("cat")) val c = piped.collect() - assert(c.size === 4) + assert(c.length === 4) assert(c(0) === "1") assert(c(1) === "2") assert(c(2) === "3") @@ -61,7 +61,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext with Eventuall // verify that both RDD.pipe(command: String) and RDD.pipe(command: String, env) work good for (piped <- Seq(nums.pipe("wc -l"), nums.pipe("wc -l", Map[String, String]()))) { val c = piped.collect() - assert(c.size === 2) + assert(c.length === 2) assert(c(0).trim === "2") assert(c(1).trim === "2") } @@ -129,7 +129,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext with Eventuall val c = piped.collect() - assert(c.size === 8) + assert(c.length === 8) assert(c(0) === "0") assert(c(1) === "\u0001") assert(c(2) === "1_") @@ -151,7 +151,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext with Eventuall f(e + "_") } }).collect() - assert(d.size === 8) + assert(d.length === 8) assert(d(0) === "0") assert(d(1) === "\u0001") assert(d(2) === "b\t2_") @@ -216,7 +216,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext with Eventuall val nums = sc.makeRDD(Array(1, 2, 3, 4).toImmutableArraySeq, 2) val piped = nums.pipe(Seq("cat"), separateWorkingDir = true) val c = piped.collect() - assert(c.size === 4) + assert(c.length === 4) assert(c(0) === "1") assert(c(1) === "2") assert(c(2) === "3") diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 32ba2053258eb..706ebfa936470 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -322,7 +322,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { test("empty RDD") { val empty = new EmptyRDD[Int](sc) assert(empty.count() === 0) - assert(empty.collect().size === 0) + assert(empty.collect().length === 0) val thrown = intercept[UnsupportedOperationException]{ empty.reduce(_ + _) @@ -331,12 +331,12 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { val emptyKv = new EmptyRDD[(Int, Int)](sc) val rdd = sc.parallelize(1 to 2, 2).map(x => (x, x)) - assert(rdd.join(emptyKv).collect().size === 0) - assert(rdd.rightOuterJoin(emptyKv).collect().size === 0) - assert(rdd.leftOuterJoin(emptyKv).collect().size === 2) - assert(rdd.fullOuterJoin(emptyKv).collect().size === 2) - assert(rdd.cogroup(emptyKv).collect().size === 2) - assert(rdd.union(emptyKv).collect().size === 2) + assert(rdd.join(emptyKv).collect().length === 0) + assert(rdd.rightOuterJoin(emptyKv).collect().length === 0) + assert(rdd.leftOuterJoin(emptyKv).collect().length === 2) + assert(rdd.fullOuterJoin(emptyKv).collect().length === 2) + assert(rdd.cogroup(emptyKv).collect().length === 2) + assert(rdd.union(emptyKv).collect().length === 2) } test("repartitioned RDDs") { @@ -348,7 +348,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { // Coalesce partitions val repartitioned1 = data.repartition(2) - assert(repartitioned1.partitions.size == 2) + assert(repartitioned1.partitions.length == 2) val partitions1 = repartitioned1.glom().collect() assert(partitions1(0).length > 0) assert(partitions1(1).length > 0) @@ -356,7 +356,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { // Split partitions val repartitioned2 = data.repartition(20) - assert(repartitioned2.partitions.size == 20) + assert(repartitioned2.partitions.length == 20) val partitions2 = repartitioned2.glom().collect() assert(partitions2(0).length > 0) assert(partitions2(19).length > 0) @@ -370,7 +370,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { val data = sc.parallelize(input.toImmutableArraySeq, initialPartitions) val repartitioned1 = data.repartition(2) - assert(repartitioned1.partitions.size == 2) + assert(repartitioned1.partitions.length == 2) val partitions1 = repartitioned1.glom().collect() // some noise in balancing is allowed due to randomization assert(math.abs(partitions1(0).length - 500) < initialPartitions) @@ -380,7 +380,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { def testSplitPartitions(input: Seq[Int], initialPartitions: Int, finalPartitions: Int): Unit = { val data = sc.parallelize(input, initialPartitions) val repartitioned = data.repartition(finalPartitions) - assert(repartitioned.partitions.size === finalPartitions) + assert(repartitioned.partitions.length === finalPartitions) val partitions = repartitioned.glom().collect() // assert all elements are present assert(repartitioned.collect().sortWith(_ > _).toSeq === input.toSeq.sortWith(_ > _).toSeq) @@ -441,7 +441,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { // when shuffling, we can increase the number of partitions val coalesced6 = data.coalesce(20, shuffle = true) - assert(coalesced6.partitions.size === 20) + assert(coalesced6.partitions.length === 20) assert(coalesced6.collect().toSet === (1 to 10).toSet) } @@ -564,13 +564,13 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { val coalesced2 = data2.coalesce(partitions) // test that we have 10000 partitions - assert(coalesced2.partitions.size == 10000, "Expected 10000 partitions, but got " + - coalesced2.partitions.size) + assert(coalesced2.partitions.length == 10000, "Expected 10000 partitions, but got " + + coalesced2.partitions.length) // test that we have 100 partitions val coalesced3 = data2.coalesce(numMachines * 2) - assert(coalesced3.partitions.size == 100, "Expected 100 partitions, but got " + - coalesced3.partitions.size) + assert(coalesced3.partitions.length == 100, "Expected 100 partitions, but got " + + coalesced3.partitions.length) // test that the groups are load balanced with 100 +/- 20 elements in each val maxImbalance3 = coalesced3.partitions @@ -613,9 +613,9 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { val data = sc.parallelize(1 to 10, 10) // Note that split number starts from 0, so > 8 means only 10th partition left. val prunedRdd = new PartitionPruningRDD(data, splitNum => splitNum > 8) - assert(prunedRdd.partitions.size === 1) + assert(prunedRdd.partitions.length === 1) val prunedData = prunedRdd.collect() - assert(prunedData.size === 1) + assert(prunedData.length === 1) assert(prunedData(0) === 10) } @@ -626,7 +626,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { test("take") { var nums = sc.makeRDD(Range(1, 1000), 1) - assert(nums.take(0).size === 0) + assert(nums.take(0).length === 0) assert(nums.take(1) === Array(1)) assert(nums.take(3) === Array(1, 2, 3)) assert(nums.take(500) === (1 to 500).toArray) @@ -635,7 +635,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { assert(nums.take(1000) === (1 to 999).toArray) nums = sc.makeRDD(Range(1, 1000), 2) - assert(nums.take(0).size === 0) + assert(nums.take(0).length === 0) assert(nums.take(1) === Array(1)) assert(nums.take(3) === Array(1, 2, 3)) assert(nums.take(500) === (1 to 500).toArray) @@ -644,7 +644,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { assert(nums.take(1000) === (1 to 999).toArray) nums = sc.makeRDD(Range(1, 1000), 100) - assert(nums.take(0).size === 0) + assert(nums.take(0).length === 0) assert(nums.take(1) === Array(1)) assert(nums.take(3) === Array(1, 2, 3)) assert(nums.take(500) === (1 to 500).toArray) @@ -653,7 +653,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { assert(nums.take(1000) === (1 to 999).toArray) nums = sc.makeRDD(Range(1, 1000), 1000) - assert(nums.take(0).size === 0) + assert(nums.take(0).length === 0) assert(nums.take(1) === Array(1)) assert(nums.take(3) === Array(1, 2, 3)) assert(nums.take(500) === (1 to 500).toArray) @@ -662,7 +662,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { assert(nums.take(1000) === (1 to 999).toArray) nums = sc.parallelize(1 to 2, 2) - assert(nums.take(2147483638).size === 2) + assert(nums.take(2147483638).length === 2) assert(nums.takeAsync(2147483638).get().size === 2) } @@ -670,7 +670,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { val nums = Seq.range(1, 100000) val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2) val topK = ints.top(5) - assert(topK.size === 5) + assert(topK.length === 5) assert(topK === nums.reverse.take(5)) } @@ -679,7 +679,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { implicit val ord = implicitly[Ordering[String]].reverse val rdd = sc.makeRDD(words, 2) val topK = rdd.top(2) - assert(topK.size === 2) + assert(topK.length === 2) assert(topK.sorted === Array("b", "a")) } @@ -687,7 +687,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val rdd = sc.makeRDD(nums.toImmutableArraySeq, 2) val sortedLowerK = rdd.takeOrdered(5) - assert(sortedLowerK.size === 5) + assert(sortedLowerK.length === 5) assert(sortedLowerK === Array(1, 2, 3, 4, 5)) } @@ -695,7 +695,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val rdd = sc.makeRDD(nums.toImmutableArraySeq, 2) val sortedLowerK = rdd.takeOrdered(0) - assert(sortedLowerK.size === 0) + assert(sortedLowerK.length === 0) } test("SPARK-40276: takeOrdered with empty RDDs") { @@ -708,7 +708,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { implicit val ord = implicitly[Ordering[Int]].reverse val rdd = sc.makeRDD(nums.toImmutableArraySeq, 2) val sortedTopK = rdd.takeOrdered(5) - assert(sortedTopK.size === 5) + assert(sortedTopK.length === 5) assert(sortedTopK === Array(10, 9, 8, 7, 6)) assert(sortedTopK === nums.sorted(ord).take(5)) } @@ -736,48 +736,48 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { for (num <- List(5, 20, 100)) { val sample = data.takeSample(withReplacement = false, num = num) - assert(sample.size === num) // Got exactly num elements + assert(sample.length === num) // Got exactly num elements assert(sample.toSet.size === num) // Elements are distinct assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]") } for (seed <- 1 to 5) { val sample = data.takeSample(withReplacement = false, 20, seed) - assert(sample.size === 20) // Got exactly 20 elements + assert(sample.length === 20) // Got exactly 20 elements assert(sample.toSet.size === 20) // Elements are distinct assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]") } for (seed <- 1 to 5) { val sample = data.takeSample(withReplacement = false, 100, seed) - assert(sample.size === 100) // Got only 100 elements + assert(sample.length === 100) // Got only 100 elements assert(sample.toSet.size === 100) // Elements are distinct assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]") } for (seed <- 1 to 5) { val sample = data.takeSample(withReplacement = true, 20, seed) - assert(sample.size === 20) // Got exactly 20 elements + assert(sample.length === 20) // Got exactly 20 elements assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]") } { val sample = data.takeSample(withReplacement = true, num = 20) - assert(sample.size === 20) // Got exactly 20 elements + assert(sample.length === 20) // Got exactly 20 elements assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]") } { val sample = data.takeSample(withReplacement = true, num = n) - assert(sample.size === n) // Got exactly n elements + assert(sample.length === n) // Got exactly n elements // Chance of getting all distinct elements is astronomically low, so test we got < n assert(sample.toSet.size < n, "sampling with replacement returned all distinct elements") assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]") } for (seed <- 1 to 5) { val sample = data.takeSample(withReplacement = true, n, seed) - assert(sample.size === n) // Got exactly n elements + assert(sample.length === n) // Got exactly n elements // Chance of getting all distinct elements is astronomically low, so test we got < n assert(sample.toSet.size < n, "sampling with replacement returned all distinct elements") } for (seed <- 1 to 5) { val sample = data.takeSample(withReplacement = true, 2 * n, seed) - assert(sample.size === 2 * n) // Got exactly 2 * n elements + assert(sample.length === 2 * n) // Got exactly 2 * n elements // Chance of getting all distinct elements is still quite low, so test we got < n assert(sample.toSet.size < n, "sampling with replacement returned all distinct elements") } @@ -794,7 +794,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { val data = sc.parallelize(1 to n, 2) for(seed <- 1 to 5) { val splits = data.randomSplit(Array(1.0, 2.0, 3.0), seed) - assert(splits.size == 3, "wrong number of splits") + assert(splits.length == 3, "wrong number of splits") assert(splits.flatMap(_.collect()).sorted.toList == data.collect().toList, "incomplete or wrong split") val s = splits.map(_.count()) @@ -1179,7 +1179,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { sc.hadoopFile(outDir, classOf[TextInputFormat], classOf[LongWritable], classOf[Text]) val coalescedHadoopRDD = hadoopRDD.coalesce(2, partitionCoalescer = Option(new SizeBasedCoalescer(maxSplitSize))) - assert(coalescedHadoopRDD.partitions.size <= 10) + assert(coalescedHadoopRDD.partitions.length <= 10) var totalPartitionCount = 0L coalescedHadoopRDD.partitions.foreach(partition => { var splitSizeSum = 0L @@ -1256,7 +1256,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { .map(coalescedRDD.getPreferredLocations(_).head) .groupBy(identity) .view - .mapValues(_.size) + .mapValues(_.length) // Make sure the coalesced partitions are distributed fairly evenly between the two locations. // This should not become flaky since the DefaultPartitionsCoalescer uses a fixed seed. @@ -1357,7 +1357,7 @@ class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Seria totalSum += splitSize } - while (index < partitions.size) { + while (index < partitions.length) { val partition = partitions(index) val fileSplit = partition.asInstanceOf[HadoopPartition].inputSplit.value.asInstanceOf[FileSplit] diff --git a/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala b/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala index 802889b047796..5771e99b64c69 100644 --- a/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala @@ -35,7 +35,7 @@ class SortingSuite extends SparkFunSuite with SharedSparkContext with Matchers { val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr.toImmutableArraySeq, 2) val sorted = pairs.sortByKey() - assert(sorted.partitions.size === 2) + assert(sorted.partitions.length === 2) assert(sorted.collect() === pairArr.sortBy(_._1)) } @@ -44,7 +44,7 @@ class SortingSuite extends SparkFunSuite with SharedSparkContext with Matchers { val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr.toImmutableArraySeq, 2) val sorted = pairs.sortByKey(true, 1) - assert(sorted.partitions.size === 1) + assert(sorted.partitions.length === 1) assert(sorted.collect() === pairArr.sortBy(_._1)) } @@ -53,7 +53,7 @@ class SortingSuite extends SparkFunSuite with SharedSparkContext with Matchers { val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr.toImmutableArraySeq, 2) val sorted = pairs.sortByKey(true, 20) - assert(sorted.partitions.size === 20) + assert(sorted.partitions.length === 20) assert(sorted.collect() === pairArr.sortBy(_._1)) } diff --git a/core/src/test/scala/org/apache/spark/rdd/ZippedPartitionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/ZippedPartitionsSuite.scala index 7079b9ea8eadc..c04719eb9ea6f 100644 --- a/core/src/test/scala/org/apache/spark/rdd/ZippedPartitionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/ZippedPartitionsSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.{SharedSparkContext, SparkFunSuite} object ZippedPartitionsSuite { def procZippedData(i: Iterator[Int], s: Iterator[String], d: Iterator[Double]) : Iterator[Int] = { - Iterator(i.toArray.size, s.toArray.size, d.toArray.size) + Iterator(i.toArray.length, s.toArray.length, d.toArray.length) } } @@ -35,7 +35,7 @@ class ZippedPartitionsSuite extends SparkFunSuite with SharedSparkContext { val obtainedSizes = zippedRDD.collect() val expectedSizes = Array(2, 3, 1, 2, 3, 1) - assert(obtainedSizes.size == 6) + assert(obtainedSizes.length == 6) assert(obtainedSizes.zip(expectedSizes).forall(x => x._1 == x._2)) } } diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala index fd7018f189e26..be38315cd75fe 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala @@ -374,7 +374,7 @@ class ResourceProfileSuite extends SparkFunSuite with MockitoSugar { rprof.require(eReq) // Update this if new resource type added - assert(ResourceProfile.allSupportedExecutorResources.size === 5, + assert(ResourceProfile.allSupportedExecutorResources.length === 5, "Executor resources should have 5 supported resources") assert(rprof.build().getCustomExecutorResources().size === 1, "Executor resources should have 1 custom resource") diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala index 1ab9f7c5d2b0c..20d6cc7671582 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala @@ -101,13 +101,13 @@ class ResourceUtilsSuite extends SparkFunSuite val gpuValue = resources.get(GPU) assert(gpuValue.nonEmpty, "Should have a gpu entry") assert(gpuValue.get.name == "gpu", "name should be gpu") - assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes") + assert(gpuValue.get.addresses.length == 2, "Should have 2 indexes") assert(gpuValue.get.addresses.sameElements(Array("0", "1")), "should have 0,1 entries") val fpgaValue = resources.get(FPGA) assert(fpgaValue.nonEmpty, "Should have a gpu entry") assert(fpgaValue.get.name == "fpga", "name should be fpga") - assert(fpgaValue.get.addresses.size == 3, "Should have 3 indexes") + assert(fpgaValue.get.addresses.length == 3, "Should have 3 indexes") assert(fpgaValue.get.addresses.sameElements(Array("f1", "f2", "f3")), "should have f1,f2,f3 entries") } @@ -231,7 +231,7 @@ class ResourceUtilsSuite extends SparkFunSuite val gpuValue = resources.get(GPU) assert(gpuValue.nonEmpty, "Should have a gpu entry") assert(gpuValue.get.name == "gpu", "name should be gpu") - assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes") + assert(gpuValue.get.addresses.length == 2, "Should have 2 indexes") assert(gpuValue.get.addresses.sameElements(Array("0", "1")), "should have 0,1 entries") } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/AQEShuffledRDD.scala b/core/src/test/scala/org/apache/spark/scheduler/AQEShuffledRDD.scala index 3f8eaede6e799..84f9ef0d557e6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/AQEShuffledRDD.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/AQEShuffledRDD.scala @@ -48,7 +48,7 @@ class CoalescedPartitioner(val parent: Partitioner, val partitionStartIndices: A result } - override def numPartitions: Int = partitionStartIndices.size + override def numPartitions: Int = partitionStartIndices.length override def getPartition(key: Any): Int = { parentPartitionMapping(parent.getPartition(key)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index bf5e9d96cd80e..e9b8ae4bffe6d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -62,7 +62,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo } assert(thrown.getMessage.contains("using broadcast variables for large values")) val smaller = sc.parallelize(1 to 4).collect() - assert(smaller.size === 4) + assert(smaller.length === 4) } test("compute max number of concurrent tasks can be launched") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 0f7146bc7c150..c55f627075e8f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -462,9 +462,9 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti /** Send the given CompletionEvent messages for the tasks in the TaskSet. */ private def complete(taskSet: TaskSet, taskEndInfos: Seq[(TaskEndReason, Any)]): Unit = { - assert(taskSet.tasks.size >= taskEndInfos.size) + assert(taskSet.tasks.length >= taskEndInfos.size) for ((result, i) <- taskEndInfos.zipWithIndex) { - if (i < taskSet.tasks.size) { + if (i < taskSet.tasks.length) { runEvent(makeCompletionEvent(taskSet.tasks(i), result._1, result._2)) } } @@ -474,9 +474,9 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti accumId: Long, taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]): Unit = { - assert(taskSet.tasks.size >= results.size) + assert(taskSet.tasks.length >= results.size) for ((result, i) <- results.zipWithIndex) { - if (i < taskSet.tasks.size) { + if (i < taskSet.tasks.length) { runEvent(makeCompletionEvent( taskSet.tasks(i), result._1, @@ -1671,21 +1671,21 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti runEvent(makeCompletionEvent( taskSet.tasks(0), Success, - makeMapStatus("hostA", reduceRdd.partitions.size))) + makeMapStatus("hostA", reduceRdd.partitions.length))) assert(shuffleStage.numAvailableOutputs === 0) // should work because it's a non-failed host (so the available map outputs will increase) runEvent(makeCompletionEvent( taskSet.tasks(0), Success, - makeMapStatus("hostB", reduceRdd.partitions.size))) + makeMapStatus("hostB", reduceRdd.partitions.length))) assert(shuffleStage.numAvailableOutputs === 1) // should be ignored for being too old runEvent(makeCompletionEvent( taskSet.tasks(0), Success, - makeMapStatus("hostA", reduceRdd.partitions.size))) + makeMapStatus("hostA", reduceRdd.partitions.length))) assert(shuffleStage.numAvailableOutputs === 1) // should work because it's a new epoch, which will increase the number of available map @@ -1694,7 +1694,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti runEvent(makeCompletionEvent( taskSet.tasks(1), Success, - makeMapStatus("hostA", reduceRdd.partitions.size))) + makeMapStatus("hostA", reduceRdd.partitions.length))) assert(shuffleStage.numAvailableOutputs === 2) assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) @@ -2081,7 +2081,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // stage complete), but the tasks that ran on HostA need to be re-run, so the DAGScheduler // should re-submit the stage with one task (the task that originally ran on HostA). assert(taskSets.size === 2) - assert(taskSets(1).tasks.size === 1) + assert(taskSets(1).tasks.length === 1) // Make sure that the stage that was re-submitted was the ShuffleMapStage (not the reduce // stage, which shouldn't be run until all of the tasks in the ShuffleMapStage complete on @@ -2735,7 +2735,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // Now complete tasks in the second task set val newTaskSet = taskSets(1) // 2 tasks should have been re-submitted, for tasks 0 and 1 (which ran on hostA). - assert(newTaskSet.tasks.size === 2) + assert(newTaskSet.tasks.length === 2) // Complete task 0 from the original task set (i.e., not the one that's currently active). // This should still be counted towards the job being complete (but there's still one // outstanding task). @@ -2878,7 +2878,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // failed hostA, so both should be resubmitted. Complete them on hostB successfully. scheduler.resubmitFailedStages() assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1 - && taskSets(2).tasks.size === 2) + && taskSets(2).tasks.length === 2) complete(taskSets(2), Seq( (Success, makeMapStatus("hostB", 2)), (Success, makeMapStatus("hostB", 2)))) @@ -2898,7 +2898,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // Task(stageId=1, stageAttemptId=1, partitionId=1) of this new active stage attempt // is still running. assert(taskSets(3).stageId === 1 && taskSets(3).stageAttemptId === 1 - && taskSets(3).tasks.size === 2) + && taskSets(3).tasks.length === 2) runEvent(makeCompletionEvent( taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) @@ -2907,7 +2907,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // was ignored due to executor failure assert(taskSets.size === 5) assert(taskSets(4).stageId === 1 && taskSets(4).stageAttemptId === 2 - && taskSets(4).tasks.size === 1) + && taskSets(4).tasks.length === 1) // Complete task(stageId=1, stageAttempt=2, partitionId=1) successfully. runEvent(makeCompletionEvent( @@ -4445,7 +4445,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // a scenario where stage 0 needs to be resubmitted upon finishing all tasks. // Merge finalization should be scheduled in this case. for ((result, i) <- taskResults.zipWithIndex) { - if (i == taskSets(0).tasks.size - 1) { + if (i == taskSets(0).tasks.length - 1) { mapOutputTracker.removeOutputsOnHost("host0") } runEvent(makeCompletionEvent(taskSets(0).tasks(i), result._1, result._2)) @@ -4522,7 +4522,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti // a scenario where stage 0 needs to be resubmitted upon finishing all tasks. // Merge finalization should be scheduled in this case. for ((result, i) <- taskResults.zipWithIndex) { - if (i == taskSets(0).tasks.size - 1) { + if (i == taskSets(0).tasks.length - 1) { mapOutputTracker.removeOutputsOnHost("host0") } runEvent(makeCompletionEvent(taskSets(0).tasks(i), result._1, result._2)) @@ -4986,7 +4986,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti * Note that this checks only the host and not the executor ID. */ private def assertLocations(taskSet: TaskSet, hosts: Seq[Seq[String]]): Unit = { - assert(hosts.size === taskSet.tasks.size) + assert(hosts.size === taskSet.tasks.length) for ((taskLocs, expectedLocs) <- taskSet.tasks.map(_.preferredLocations).zip(hosts)) { assert(taskLocs.map(_.host).toSet === expectedLocs.toSet) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala index cf2240a0511d7..13e7ff758ebaf 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -268,7 +268,7 @@ class MapStatusSuite extends SparkFunSuite { "number of skewed block sizes") val smallAndUntrackedBlocks = - nonEmptyBlocks.slice(0, nonEmptyBlocks.size - trackedSkewedBlocksLength) + nonEmptyBlocks.slice(0, nonEmptyBlocks.length - trackedSkewedBlocksLength) val avg = smallAndUntrackedBlocks.sum / smallAndUntrackedBlocks.length val loc = BlockManagerId("a", "b", 10) diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 0533f9d7d8a49..f1a4b97c2981d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -143,14 +143,14 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { val rdd = sc.parallelize(Seq(1), 1) sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).commitSuccessfully _, rdd.partitions.indices) - assert(tempDir.list().size === 1) + assert(tempDir.list().length === 1) } ignore("If commit fails, if task is retried it should not be locked, and will succeed.") { val rdd = sc.parallelize(Seq(1), 1) sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).failFirstCommitAttempt _, rdd.partitions.indices) - assert(tempDir.list().size === 1) + assert(tempDir.list().length === 1) } test("Job should not complete if all commits are denied") { @@ -161,13 +161,13 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { def resultHandler(x: Int, y: Unit): Unit = {} val futureAction: SimpleFutureAction[Unit] = sc.submitJob[Int, Unit, Unit](rdd, OutputCommitFunctions(tempDir.getAbsolutePath).commitSuccessfully, - 0 until rdd.partitions.size, resultHandler, ()) + 0 until rdd.partitions.length, resultHandler, ()) // It's an error if the job completes successfully even though no committer was authorized, // so throw an exception if the job was allowed to complete. intercept[TimeoutException] { ThreadUtils.awaitResult(futureAction, 5.seconds) } - assert(tempDir.list().size === 0) + assert(tempDir.list().length === 0) } test("Only authorized committer failures can clear the authorized committer lock (SPARK-6614)") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index f0ae7fc74112b..2ab7df0d9cfd3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1815,10 +1815,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext var has1Gpu = 0 for (tDesc <- taskDescriptions) { assert(tDesc.resources.contains(GPU)) - if (tDesc.resources(GPU).addresses.size == 2) { + if (tDesc.resources(GPU).addresses.length == 2) { has2Gpus += 1 } - if (tDesc.resources(GPU).addresses.size == 1) { + if (tDesc.resources(GPU).addresses.length == 1) { has1Gpu += 1 } } @@ -1836,7 +1836,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext taskDescriptions = taskScheduler.resourceOffers(workerOffers3).flatten assert(2 === taskDescriptions.length) assert(taskDescriptions.head.resources.contains(GPU)) - assert(2 == taskDescriptions.head.resources(GPU).addresses.size) + assert(2 == taskDescriptions.head.resources(GPU).addresses.length) } test("Scheduler works with task resource profiles") { @@ -1875,10 +1875,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext var has1Gpu = 0 for (tDesc <- taskDescriptions) { assert(tDesc.resources.contains(GPU)) - if (tDesc.resources(GPU).addresses.size == 2) { + if (tDesc.resources(GPU).addresses.length == 2) { has2Gpus += 1 } - if (tDesc.resources(GPU).addresses.size == 1) { + if (tDesc.resources(GPU).addresses.length == 1) { has1Gpu += 1 } } @@ -1896,7 +1896,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext taskDescriptions = taskScheduler.resourceOffers(workerOffers3).flatten assert(2 === taskDescriptions.length) assert(taskDescriptions.head.resources.contains(GPU)) - assert(2 == taskDescriptions.head.resources(GPU).addresses.size) + assert(2 == taskDescriptions.head.resources(GPU).addresses.length) } test("Calculate available tasks slots for task resource profiles") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 2fe50a486dbd6..2f8b6df8beac5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -845,7 +845,7 @@ class TaskSetManagerSuite // multiple 1k result val r = sc.makeRDD(0 until 10, 10).map(genBytes(1024)).collect() - assert(10 === r.size) + assert(10 === r.length) // single 10M result val thrown = intercept[SparkException] {sc.makeRDD(genBytes(10 << 20)(0), 1).collect()} @@ -863,7 +863,7 @@ class TaskSetManagerSuite sc = new SparkContext("local", "test", conf) // final result is below limit. val r = sc.makeRDD(0 until 2000, 2000).distinct(10).filter(_ == 0).collect() - assert(1 === r.size) + assert(1 === r.length) } test("[SPARK-13931] taskSetManager should not send Resubmitted tasks after being a zombie") { diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala index 4acb4bbc779c3..25db9a5c68612 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala @@ -48,7 +48,7 @@ class KryoSerializerDistributedSuite extends SparkFunSuite with LocalSparkContex val shuffledRDD = cachedRDD.map { case (i, o) => (i * i * i - 10 * i * i, o)} // Join the two RDDs, and force evaluation - assert(shuffledRDD.join(cachedRDD).collect().size == 1) + assert(shuffledRDD.join(cachedRDD).collect().length == 1) } } diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala index 8a9537b4f18d7..a9ca9135f38a9 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala @@ -236,7 +236,7 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite { ShuffleMergedBlockId(shuffleId, shuffleMergeId, reduceId), dirs) assert(mergedBlockMeta.getNumChunks === 3) - assert(mergedBlockMeta.readChunkBitmaps().size === 3) + assert(mergedBlockMeta.readChunkBitmaps().length === 3) assert(mergedBlockMeta.readChunkBitmaps()(0).contains(1)) assert(mergedBlockMeta.readChunkBitmaps()(0).contains(2)) assert(!mergedBlockMeta.readChunkBitmaps()(0).contains(3)) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index ccf6c9184cc96..f2b795764b7e8 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -170,40 +170,44 @@ class AppStatusStoreSuite extends SparkFunSuite { assert(actualQuantiles === expectedQuantiles) } - assertQuantiles(_.executorDeserializeTime, summary.executorDeserializeTime) - assertQuantiles(_.executorDeserializeCpuTime, summary.executorDeserializeCpuTime) - assertQuantiles(_.executorRunTime, summary.executorRunTime) - assertQuantiles(_.executorRunTime, summary.executorRunTime) - assertQuantiles(_.executorCpuTime, summary.executorCpuTime) - assertQuantiles(_.resultSize, summary.resultSize) - assertQuantiles(_.jvmGCTime, summary.jvmGcTime) - assertQuantiles(_.resultSerializationTime, summary.resultSerializationTime) - assertQuantiles(_.memoryBytesSpilled, summary.memoryBytesSpilled) - assertQuantiles(_.diskBytesSpilled, summary.diskBytesSpilled) - assertQuantiles(_.peakExecutionMemory, summary.peakExecutionMemory) - assertQuantiles(_.inputMetrics.bytesRead, summary.inputMetrics.bytesRead) - assertQuantiles(_.inputMetrics.recordsRead, summary.inputMetrics.recordsRead) - assertQuantiles(_.outputMetrics.bytesWritten, summary.outputMetrics.bytesWritten) - assertQuantiles(_.outputMetrics.recordsWritten, summary.outputMetrics.recordsWritten) - assertQuantiles(_.shuffleReadMetrics.remoteBlocksFetched, + assertQuantiles(_.executorDeserializeTime.toDouble, summary.executorDeserializeTime) + assertQuantiles(_.executorDeserializeCpuTime.toDouble, summary.executorDeserializeCpuTime) + assertQuantiles(_.executorRunTime.toDouble, summary.executorRunTime) + assertQuantiles(_.executorRunTime.toDouble, summary.executorRunTime) + assertQuantiles(_.executorCpuTime.toDouble, summary.executorCpuTime) + assertQuantiles(_.resultSize.toDouble, summary.resultSize) + assertQuantiles(_.jvmGCTime.toDouble, summary.jvmGcTime) + assertQuantiles(_.resultSerializationTime.toDouble, summary.resultSerializationTime) + assertQuantiles(_.memoryBytesSpilled.toDouble, summary.memoryBytesSpilled) + assertQuantiles(_.diskBytesSpilled.toDouble, summary.diskBytesSpilled) + assertQuantiles(_.peakExecutionMemory.toDouble, summary.peakExecutionMemory) + assertQuantiles(_.inputMetrics.bytesRead.toDouble, summary.inputMetrics.bytesRead) + assertQuantiles(_.inputMetrics.recordsRead.toDouble, summary.inputMetrics.recordsRead) + assertQuantiles(_.outputMetrics.bytesWritten.toDouble, summary.outputMetrics.bytesWritten) + assertQuantiles(_.outputMetrics.recordsWritten.toDouble, + summary.outputMetrics.recordsWritten) + assertQuantiles(_.shuffleReadMetrics.remoteBlocksFetched.toDouble, summary.shuffleReadMetrics.remoteBlocksFetched) - assertQuantiles(_.shuffleReadMetrics.localBlocksFetched, + assertQuantiles(_.shuffleReadMetrics.localBlocksFetched.toDouble, summary.shuffleReadMetrics.localBlocksFetched) - assertQuantiles(_.shuffleReadMetrics.fetchWaitTime, + assertQuantiles(_.shuffleReadMetrics.fetchWaitTime.toDouble, summary.shuffleReadMetrics.fetchWaitTime) - assertQuantiles(_.shuffleReadMetrics.remoteBytesRead, + assertQuantiles(_.shuffleReadMetrics.remoteBytesRead.toDouble, summary.shuffleReadMetrics.remoteBytesRead) - assertQuantiles(_.shuffleReadMetrics.remoteBytesReadToDisk, + assertQuantiles(_.shuffleReadMetrics.remoteBytesReadToDisk.toDouble, summary.shuffleReadMetrics.remoteBytesReadToDisk) assertQuantiles( - t => t.shuffleReadMetrics.localBytesRead + t.shuffleReadMetrics.remoteBytesRead, + t => t.shuffleReadMetrics.localBytesRead + t.shuffleReadMetrics.remoteBytesRead.toDouble, summary.shuffleReadMetrics.readBytes) assertQuantiles( - t => t.shuffleReadMetrics.localBlocksFetched + t.shuffleReadMetrics.remoteBlocksFetched, + t => t.shuffleReadMetrics.localBlocksFetched + + t.shuffleReadMetrics.remoteBlocksFetched.toDouble, summary.shuffleReadMetrics.totalBlocksFetched) - assertQuantiles(_.shuffleWriteMetrics.bytesWritten, summary.shuffleWriteMetrics.writeBytes) - assertQuantiles(_.shuffleWriteMetrics.writeTime, summary.shuffleWriteMetrics.writeTime) - assertQuantiles(_.shuffleWriteMetrics.recordsWritten, + assertQuantiles(_.shuffleWriteMetrics.bytesWritten.toDouble, + summary.shuffleWriteMetrics.writeBytes) + assertQuantiles(_.shuffleWriteMetrics.writeTime.toDouble, + summary.shuffleWriteMetrics.writeTime) + assertQuantiles(_.shuffleWriteMetrics.recordsWritten.toDouble, summary.shuffleWriteMetrics.writeRecords) } finally { appStore.close() diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala index be1b9be2d85d9..b644224652266 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala @@ -117,7 +117,7 @@ class DiskStoreSuite extends SparkFunSuite { val chunkedByteBuffer = blockData.toChunkedByteBuffer(ByteBuffer.allocate) val chunks = chunkedByteBuffer.chunks - assert(chunks.size === 2) + assert(chunks.length === 2) for (chunk <- chunks) { assert(chunk.limit() === 10 * 1024) } diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala index c377f2495d05d..35ef0587b9b4c 100644 --- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala @@ -192,9 +192,9 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter { // verify whether the earliest file has been deleted val rolledOverFiles = allGeneratedFiles.filter { _ != testFile.toString }.toArray.sorted - logInfo(s"All rolled over files generated:${rolledOverFiles.size}\n" + + logInfo(s"All rolled over files generated:${rolledOverFiles.length}\n" + rolledOverFiles.mkString("\n")) - assert(rolledOverFiles.size > 2) + assert(rolledOverFiles.length > 2) val earliestRolledOverFile = rolledOverFiles.head val existingRolledOverFiles = RollingFileAppender.getSortedRolledOverFiles( testFile.getParentFile.toString, testFile.getName).map(_.toString) diff --git a/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala index 8aa4be6c2ff8d..82a4c85b02fa0 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala @@ -104,7 +104,7 @@ private object SizeTrackerSuite { * Run speed tests for size tracking collections. */ def main(args: Array[String]): Unit = { - if (args.size < 1) { + if (args.length < 1) { // scalastyle:off println println("Usage: SizeTrackerSuite [num elements]") // scalastyle:on println diff --git a/dev/create-release/spark-rm/Dockerfile b/dev/create-release/spark-rm/Dockerfile index dbb851d74a565..9cfe78570421e 100644 --- a/dev/create-release/spark-rm/Dockerfile +++ b/dev/create-release/spark-rm/Dockerfile @@ -37,12 +37,7 @@ ENV DEBCONF_NONINTERACTIVE_SEEN true # These arguments are just for reuse and not really meant to be customized. ARG APT_INSTALL="apt-get install --no-install-recommends -y" -# TODO(SPARK-32407): Sphinx 3.1+ does not correctly index nested classes. -# See also https://github.com/sphinx-doc/sphinx/issues/7551. -# We should use the latest Sphinx version once this is fixed. -# TODO(SPARK-35375): Jinja2 3.0.0+ causes error when building with Sphinx. -# See also https://issues.apache.org/jira/browse/SPARK-35375. -ARG PIP_PKGS="sphinx==3.0.4 mkdocs==1.1.2 numpy==1.20.3 pydata_sphinx_theme==0.8.0 ipython==7.19.0 nbsphinx==0.8.0 numpydoc==1.1.0 jinja2==2.11.3 twine==3.4.1 sphinx-plotly-directive==0.1.3 sphinx-copybutton==0.5.2 pandas==1.5.3 pyarrow==3.0.0 plotly==5.4.0 markupsafe==2.0.1 docutils<0.17 grpcio==1.59.3 protobuf==4.21.6 grpcio-status==1.59.3 googleapis-common-protos==1.56.4" +ARG PIP_PKGS="sphinx==4.2.0 mkdocs==1.1.2 numpy==1.20.3 pydata_sphinx_theme==0.13.3 ipython==7.19.0 nbsphinx==0.8.0 numpydoc==1.1.0 jinja2==3.1.2 twine==3.4.1 sphinx-plotly-directive==0.1.3 sphinx-copybutton==0.5.2 pandas==1.5.3 pyarrow==3.0.0 plotly==5.4.0 markupsafe==2.0.1 docutils<0.17 grpcio==1.59.3 protobuf==4.21.6 grpcio-status==1.59.3 googleapis-common-protos==1.56.4" ARG GEM_PKGS="bundler:2.3.8" # Install extra needed repos and refresh. diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index 5a96f3fe9b982..3a8c3dc707aa1 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -45,7 +45,7 @@ commons-crypto/1.1.0//commons-crypto-1.1.0.jar commons-dbcp/1.4//commons-dbcp-1.4.jar commons-io/2.15.0//commons-io-2.15.0.jar commons-lang/2.6//commons-lang-2.6.jar -commons-lang3/3.13.0//commons-lang3-3.13.0.jar +commons-lang3/3.14.0//commons-lang3-3.14.0.jar commons-logging/1.1.3//commons-logging-1.1.3.jar commons-math3/3.6.1//commons-math3-3.6.1.jar commons-pool/1.5.4//commons-pool-1.5.4.jar diff --git a/dev/infra/Dockerfile b/dev/infra/Dockerfile index 10ae49b71665f..7348c6af1e059 100644 --- a/dev/infra/Dockerfile +++ b/dev/infra/Dockerfile @@ -138,4 +138,5 @@ RUN python3.12 -m pip install numpy 'pyarrow>=14.0.0' 'six==1.16.0' 'pandas<=2.1 RUN python3.12 -m pip install 'grpcio==1.59.3' 'grpcio-status==1.59.3' 'protobuf==4.25.1' 'googleapis-common-protos==1.56.4' # TODO(SPARK-46078) Use official one instead of nightly build when it's ready RUN python3.12 -m pip install --pre torch --index-url https://download.pytorch.org/whl/nightly/cpu +RUN python3.12 -m pip install torchvision --index-url https://download.pytorch.org/whl/cpu RUN python3.12 -m pip install torcheval diff --git a/dev/requirements.txt b/dev/requirements.txt index 66a74471377dd..2d139911bacb6 100644 --- a/dev/requirements.txt +++ b/dev/requirements.txt @@ -1,11 +1,11 @@ # PySpark dependencies (required) -py4j +py4j>=0.10.9.7 # PySpark dependencies (optional) -numpy -pyarrow +numpy>=1.21 +pyarrow>=4.0.0 six==1.16.0 -pandas +pandas>=1.4.4 scipy plotly mlflow>=2.3.1 @@ -31,12 +31,12 @@ pandas-stubs<1.2.0.54 mkdocs # Documentation (Python) -pydata_sphinx_theme +pydata_sphinx_theme>=0.13 ipython nbsphinx numpydoc -jinja2<3.0.0 -sphinx<3.1.0 +jinja2 +sphinx==4.2.0 sphinx-plotly-directive sphinx-copybutton docutils<0.18.0 @@ -52,8 +52,8 @@ black==23.9.1 py # Spark Connect (required) -grpcio==1.59.3 -grpcio-status==1.59.3 +grpcio>=1.59.3 +grpcio-status>=1.59.3 protobuf==4.25.1 googleapis-common-protos>=1.56.4 diff --git a/docs/README.md b/docs/README.md index 87d68c2f86499..99ccf69dbaee5 100644 --- a/docs/README.md +++ b/docs/README.md @@ -52,13 +52,6 @@ Note: If you are on a system with both Ruby 1.9 and Ruby 2.0 you may need to rep To generate SQL and Python API docs, you'll need to install these libraries: - Run the following command from $SPARK_HOME: ```sh $ pip install --upgrade -r dev/requirements.txt diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index ce739cb90b531..2ab68d2a8049f 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -518,6 +518,8 @@ Spark applications supports the following configuration properties specific to s # Launching Spark Applications +## Spark Protocol + The [`spark-submit` script](submitting-applications.html) provides the most straightforward way to submit a compiled Spark application to the cluster. For standalone clusters, Spark currently supports two deploy modes. In `client` mode, the driver is launched in the same process as the @@ -540,6 +542,84 @@ failing repeatedly, you may do so through: You can find the driver ID through the standalone Master web UI at `http://:8080`. +## REST API + +If `spark.master.rest.enabled` is enabled, Spark master provides additional REST API +via http://[host:port]/[version]/submissions/[action] where +host is the master host, and +port is the port number specified by `spark.master.rest.port` (default: 6066), and +version is a protocol version, v1 as of today, and +action is one of the following supported actions. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
CommandDescriptionHTTP METHODSince Version
createCreate a Spark driver via cluster mode.POST1.3.0
killKill a single Spark driver.POST1.3.0
killallKill all running Spark drivers.POST4.0.0
statusCheck the status of a Spark job.GET1.3.0
clearClear the completed drivers and applications.POST4.0.0
+ +The following is a curl CLI command example with the `pi.py` and REST API. + +```bash +$ curl -XPOST http://IP:PORT/v1/submissions/create \ +--header "Content-Type:application/json;charset=UTF-8" \ +--data '{ + "appResource": "", + "sparkProperties": { + "spark.master": "spark://master:7077", + "spark.app.name": "Spark Pi", + "spark.driver.memory": "1g", + "spark.driver.cores": "1", + "spark.jars": "" + }, + "clientSparkVersion": "", + "mainClass": "org.apache.spark.deploy.SparkSubmit", + "environmentVariables": { }, + "action": "CreateSubmissionRequest", + "appArgs": [ "/opt/spark/examples/src/main/python/pi.py", "10" ] +}' +``` + +The following is the response from the REST API for the above create request. + +```bash +{ + "action" : "CreateSubmissionResponse", + "message" : "Driver successfully submitted as driver-20231124153531-0000", + "serverSparkVersion" : "4.0.0", + "submissionId" : "driver-20231124153531-0000", + "success" : true +} +``` + + # Resource Scheduling The standalone cluster mode currently only supports a simple FIFO scheduler across applications. diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index c0f88bffa6e5b..71abf10da328b 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -643,7 +643,7 @@ Column expression `` cannot be sorted because its type `` is not [SQLSTATE: 39000](sql-error-conditions-sqlstates.html#class-39-external-routine-invocation-exception) -Failed to execute user defined function (``: (``) => ``). +User defined function (``: (``) => ``) failed due to: ``. ### FAILED_FUNCTION_CALL diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 5c00ce6558513..664bccf26651b 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -29,6 +29,7 @@ license: | - Since Spark 4.0, `spark.sql.hive.metastore` drops the support of Hive prior to 2.0.0 as they require JDK 8 that Spark does not support anymore. Users should migrate to higher versions. - Since Spark 4.0, `spark.sql.parquet.compression.codec` drops the support of codec name `lz4raw`, please use `lz4_raw` instead. - Since Spark 4.0, when overflowing during casting timestamp to byte/short/int under non-ansi mode, Spark will return null instead a wrapping value. +- Since Spark 4.0, the `encode()` function supports only the following charsets 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16'. To restore the previous behavior when the function accepts charsets of the current JDK used by Spark, set `spark.sql.legacy.javaCharsets` to `true`. ## Upgrading from Spark SQL 3.4 to 3.5 diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 547834c7f9e3a..33b9453a18c37 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -2452,6 +2452,14 @@ Specifically for built-in HDFS state store provider, users can check the state s it is best if cache missing count is minimized that means Spark won't waste too much time on loading checkpointed state. User can increase Spark locality waiting configurations to avoid loading state store providers in different executors across batches. +#### State Data Source (Experimental) + +Apache Spark provides a streaming state related data source that provides the ability to manipulate state stores in the checkpoint. Users can run the batch query with State Data Source to get the visibility of the states for existing streaming query. + +As of Spark 4.0, the data source only supports read feature. See [State Data Source Integration Guide](structured-streaming-state-data-source.html) for more details. + +NOTE: this data source is currently marked as experimental - source options and the behavior (output) might be subject to change. + ## Starting Streaming Queries Once you have defined the final result DataFrame/Dataset, all that is left is for you to start the streaming computation. To do that, you have to use the `DataStreamWriter` ([Python](api/python/reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.html#pyspark.sql.streaming.DataStreamWriter)/[Scala](api/scala/org/apache/spark/sql/streaming/DataStreamWriter.html)/[Java](api/java/org/apache/spark/sql/streaming/DataStreamWriter.html) docs) diff --git a/docs/structured-streaming-state-data-source.md b/docs/structured-streaming-state-data-source.md new file mode 100644 index 0000000000000..a9353861c532c --- /dev/null +++ b/docs/structured-streaming-state-data-source.md @@ -0,0 +1,248 @@ +--- +layout: global +displayTitle: State Data Source Integration Guide +title: State Data Source Integration Guide +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +State data source Guide in Structured Streaming (Experimental) + +## Overview + +State data source provides functionality to manipulate the state from the checkpoint. + +As of Spark 4.0, state data source provides the read functionality with a batch query. Additional functionalities including write is on the future roadmap. + +NOTE: this data source is currently marked as experimental - source options and the behavior (output) might be subject to change. + +## Reading state key-values from the checkpoint + +State data source enables reading key-value pairs from the state store in the checkpoint, via running a separate batch query. +Users can leverage the functionality to cover two major use cases described below: + +* Construct a test checking both output and the state. It is non-trivial to deduce the key-value of the state from the output, and having visibility of the state would be a huge win on testing. +* Investigate an incident against stateful streaming query. If users observe the incorrect output and want to track how it came up, having visibility of the state would be required. + +Users can read an instance of state store, which is matched to a single stateful operator in most cases. This means, users can expect that they can read the entire key-value pairs in the state for a single stateful operator. + +Note that there could be an exception, e.g. stream-stream join, which leverages multiple state store instances internally. The data source abstracts the internal representation away from users and +provides a user-friendly approach to read the state. See the section for stream-stream join for more details. + +### Creating a State store for Batch Queries (all defaults) + +
+ +
+{% highlight python %} + +df = spark \ +.read \ +.format("statestore") \ +.load("") + +{% endhighlight %} +
+ +
+{% highlight scala %} + +val df = spark +.read +.format("statestore") +.load("") + +{% endhighlight %} +
+ +
+{% highlight java %} + +Dataset df = spark +.read() +.format("statestore") +.load(""); + +{% endhighlight %} +
+ +
+ +Each row in the source has the following schema: + + + + + + + + + + + + + + + + + + +
ColumnTypeNote
keystruct (depends on the type for state key)
valuestruct (depends on the type for state value)
_partition_idintmetadata column (hidden unless specified with SELECT)
+ +The nested columns for key and value heavily depend on the input schema of the stateful operator as well as the type of operator. +Users are encouraged to query about the schema via df.schema() / df.printSchema() first to understand the type of output. + +The following options must be set for the source. + + + + + + + + +
Optionvaluemeaning
pathstringSpecify the root directory of the checkpoint location. You can either specify the path via option("path", `path`) or load(`path`).
+ +The following configurations are optional: + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Optionvaluedefaultmeaning
batchIdnumeric valuelatest committed batchRepresents the target batch to read from. This option is used when users want to perform time-travel. The batch should be committed but not yet cleaned up.
operatorIdnumeric value0Represents the target operator to read from. This option is used when the query is using multiple stateful operators.
storeNamestringDEFAULTRepresents the target state store name to read from. This option is used when the stateful operator uses multiple state store instances. It is not required except stream-stream join.
joinSidestring ("left" or "right")(none)Represents the target side to read from. This option is used when users want to read the state from stream-stream join.
+ +### Reading state for Stream-stream join + +Structured Streaming implements the stream-stream join feature via leveraging multiple instances of state store internally. +These instances logically compose buffers to store the input rows for left and right. + +Since it is more obvious to users to reason about, the data source provides the option 'joinSide' to read the buffered input for specific side of the join. +To enable the functionality to read the internal state store instance directly, we also allow specifying the option 'storeName', with restriction that 'storeName' and 'joinSide' cannot be specified together. + +## State metadata source + +Before querying the state from existing checkpoint via state data source, users would like to understand the information for the checkpoint, especially about state operator. This includes which operators and state store instances are available in the checkpoint, available range of batch IDs, etc. + +Structured Streaming provides a data source named "State metadata source" to provide the state-related metadata information from the checkpoint. + +Note: The metadata is constructed when the streaming query is running with Spark 4.0+. The existing checkpoint which has been running with lower Spark version does not have the metadata and will be unable to query/use with this metadata source. It is required to run the streaming query pointing the existing checkpoint in Spark 4.0+ to construct the metadata before querying. + +### Creating a State metadata store for Batch Queries + +
+ +
+{% highlight python %} + +df = spark \ +.read \ +.format("state-metadata") \ +.load("") + +{% endhighlight %} +
+ +
+{% highlight scala %} + +val df = spark +.read +.format("state-metadata") +.load("") + +{% endhighlight %} +
+ +
+{% highlight java %} + +Dataset df = spark +.read() +.format("state-metadata") +.load(""); + +{% endhighlight %} +
+ +
+ +Each row in the source has the following schema: + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
ColumnTypeNote
operatorIdint
operatorNamestring
stateStoreNameint
numPartitionsint
minBatchIdintThe minimum batch ID available for querying state. The value could be invalid if the streaming query taking the checkpoint is running, as cleanup would run.
maxBatchIdintThe maximum batch ID available for querying state. The value could be invalid if the streaming query taking the checkpoint is running, as the query will commit further batches.
_numColsPrefixKeyintmetadata column (hidden unless specified with SELECT)
+ +One of the major use cases of this data source is to identify the operatorId to query if the query has multiple stateful operators, e.g. stream-stream join followed by deduplication. +The column 'operatorName' helps users to identify the operatorId for given operator. + +Additionally, if users want to query about an internal state store instance for a stateful operator (e.g. stream-stream join), the column 'stateStoreName' would be useful to determine the target. diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala index d7099c5c953c1..bc6fab45810eb 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala @@ -87,7 +87,7 @@ object SVDPlusPlus { val gJoinT0 = g.outerJoinVertices(t0) { (vid: VertexId, vd: (Array[Double], Array[Double], Double, Double), msg: Option[(Long, Double)]) => - (vd._1, vd._2, msg.get._2 / msg.get._1 - u, 1.0 / scala.math.sqrt(msg.get._1)) + (vd._1, vd._2, msg.get._2 / msg.get._1 - u, 1.0 / scala.math.sqrt(msg.get._1.toDouble)) }.cache() materialize(gJoinT0) g.unpersist() diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index caa2fdcdf5d2b..666790958c353 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -321,7 +321,7 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { val rank = if (vid < source) { 0.0 } else { - a * Math.pow(1 - resetProb, vid - source) + a * Math.pow(1 - resetProb, vid.toDouble - source) } vid -> rank } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 6e26a78e9c7e6..aa39a3e177eeb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -1418,7 +1418,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( case Row(label: Double, pred: Double, weight: Double) => (label, pred, weight) } - family.aic(t, deviance, numInstances, weightSum) + 2 * rank + family.aic(t, deviance, numInstances.toDouble, weightSum) + 2 * rank } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/ANOVATest.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/ANOVATest.scala index d7b13f1bf25f3..482bb7fdc2105 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/ANOVATest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/ANOVATest.scala @@ -224,7 +224,7 @@ private[ml] object ANOVATest { // mean square within val msw = sswn / dfwn val fValue = msb / msw - val pValue = 1 - new FDistribution(dfbn, dfwn).cumulativeProbability(fValue) + val pValue = 1 - new FDistribution(dfbn.toDouble, dfwn.toDouble).cumulativeProbability(fValue) val degreesOfFreedom = dfbn + dfwn (pValue, degreesOfFreedom, fValue) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/FValueTest.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/FValueTest.scala index 89579dfcbb0c3..e2ce6cf7214f7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/FValueTest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/FValueTest.scala @@ -135,7 +135,7 @@ private[ml] object FValueTest { } else Iterator.empty }.reduceByKey(_ + _ ).mapPartitions { iter => - val fd = new FDistribution(1, degreesOfFreedom) + val fd = new FDistribution(1.0, degreesOfFreedom.toDouble) iter.map { case (col, sumForCov) => // Cov(X,Y) = Sum(((Xi - Avg(X)) * ((Yi-Avg(Y))) / (N-1) val covariance = sumForCov / (numSamples - 1) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index dbcf9017f1748..234ecbc460638 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -525,7 +525,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging { updateLambda(batchResult, batchSize) logphatOption.foreach(_ /= nonEmptyDocsN.toDouble) - logphatOption.foreach(updateAlpha(_, nonEmptyDocsN)) + logphatOption.foreach(updateAlpha(_, nonEmptyDocsN.toDouble)) this } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index ed6e3ea966b26..17b28ed3eba5d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -106,7 +106,7 @@ class StreamingKMeansModel @Since("1.2.0") ( val numNewPoints = pointStats.iterator.map { case (_, (_, n)) => n }.sum - math.pow(decayFactor, numNewPoints) + math.pow(decayFactor, numNewPoints.toDouble) } // apply discount to weights diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala index 06c7754691953..79f482347289a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala @@ -91,8 +91,8 @@ class AssociationRules private[fpm] ( .map { case (antecedent, ((consequent, freqUnion), freqAntecedent)) => new Rule(antecedent.toArray, consequent.toArray, - freqUnion, - freqAntecedent, + freqUnion.toDouble, + freqAntecedent.toDouble, // the consequent contains always only one element itemSupport.get(consequent.head)) }.filter(_.confidence >= minConfidence) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 2bd4877ffc72e..37bf9d45f6646 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -633,7 +633,7 @@ class RowMatrix @Since("1.0.0") ( val gamma = if (threshold < 1e-6) { Double.PositiveInfinity } else { - 10 * math.log(numCols()) / threshold + 10 * math.log(numCols().toDouble) / threshold } val summary = Statistics.colStats(rows.map((_, 1.0)), Seq("normL2")) @@ -823,7 +823,8 @@ class RowMatrix @Since("1.0.0") ( + s"as it's bigger than maxResultSize ($maxDriverResultSizeInBytes Bytes)") val numerator = math.log(rows.getNumPartitions) - val denominator = math.log(maxDriverResultSizeInBytes) - math.log(aggregatedObjectSizeInBytes) + val denominator = math.log(maxDriverResultSizeInBytes.toDouble) - + math.log(aggregatedObjectSizeInBytes.toDouble) val desiredTreeDepth = math.ceil(numerator / denominator) if (desiredTreeDepth > 4) { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala index aa0bf51ebcd25..28c2b5d5027ab 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala @@ -70,7 +70,7 @@ private[stat] object SpearmanCorrelation extends Correlation with Logging { val output = flush() preCol = j preVal = v - startRank = rank + startRank = rank.toDouble cachedUids += uid output } else { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala index ead9f887fe811..d42df3e2f0ddf 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala @@ -201,7 +201,7 @@ private[spark] object ChiSqTest extends Logging { counts.foreach { case ((label, value), c) => val i = value2Index(value) val j = label2Index(label) - contingency.update(i, j, c) + contingency.update(i, j, c.toDouble) } ChiSqTest.chiSquaredMatrix(contingency, methodName) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTestMethod.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTestMethod.scala index 8f3d0f8b3214c..cf0fd388fa749 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTestMethod.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTestMethod.scala @@ -131,7 +131,7 @@ private[stat] object StudentTTest extends StreamingTestMethod with Logging { statsA: StatCounter, statsB: StatCounter): StreamingTestResult = { def studentDF(sample1: StatisticalSummaryValues, sample2: StatisticalSummaryValues): Double = - sample1.getN + sample2.getN - 2 + sample1.getN + sample2.getN - 2.0 new StreamingTestResult( tTester.get.homoscedasticTTest(statsA, statsB), diff --git a/pom.xml b/pom.xml index ac096a19804db..fce9c2b54e03a 100644 --- a/pom.xml +++ b/pom.xml @@ -197,7 +197,7 @@ 2.6 - 3.13.0 + 3.14.0 2.11.1 4.1.17 @@ -2978,7 +2978,7 @@ TODO(SPARK-33805): Undo the corresponding deprecated usage suppression rule after fixed. --> -Wconf:msg=^(?=.*?method|value|type|object|trait|inheritance)(?=.*?deprecated)(?=.*?since 2.13).+$:e - -Wconf:msg=^(?=.*?Widening conversion from)(?=.*?is deprecated because it loses precision).+$:s + -Wconf:msg=^(?=.*?Widening conversion from)(?=.*?is deprecated because it loses precision).+$:e -Wconf:cat=deprecation&msg=Auto-application to \`\(\)\` is deprecated:e