Skip to content

Commit

Permalink
Revert "[SPARK-48883][ML][R] Replace RDD read / write API invocation …
Browse files Browse the repository at this point in the history
…with Dataframe read / write API"

This reverts commit 0fa5787.
  • Loading branch information
HyukjinKwon committed Jul 13, 2024
1 parent b4cd2ec commit cc32137
Show file tree
Hide file tree
Showing 24 changed files with 45 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,7 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -144,8 +142,7 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]

Expand Down
7 changes: 2 additions & 5 deletions mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,7 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper] {
val rMetadata = ("class" -> instance.getClass.getName) ~
("ratingCol" -> instance.ratingCol)
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)

instance.alsModel.save(modelPath)
}
Expand All @@ -109,8 +107,7 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper] {
val rMetadataPath = new Path(path, "rMetadata").toString
val modelPath = new Path(path, "model").toString

val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val ratingCol = (rMetadata \ "ratingCol").extract[String]
val alsModel = ALSModel.load(modelPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,7 @@ private[r] object BisectingKMeansWrapper extends MLReadable[BisectingKMeansWrapp
("size" -> instance.size.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -135,8 +133,7 @@ private[r] object BisectingKMeansWrapper extends MLReadable[BisectingKMeansWrapp
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val size = (rMetadata \ "size").extract[Array[Long]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,7 @@ private[r] object DecisionTreeClassifierWrapper extends MLReadable[DecisionTreeC
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -146,8 +144,7 @@ private[r] object DecisionTreeClassifierWrapper extends MLReadable[DecisionTreeC
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ private[r] object DecisionTreeRegressorWrapper extends MLReadable[DecisionTreeRe
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -129,8 +127,7 @@ private[r] object DecisionTreeRegressorWrapper extends MLReadable[DecisionTreeRe
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,7 @@ private[r] object FMClassifierWrapper
("features" -> instance.features.toImmutableArraySeq) ~
("labels" -> instance.labels.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -166,8 +164,7 @@ private[r] object FMClassifierWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val labels = (rMetadata \ "labels").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,7 @@ private[r] object FMRegressorWrapper
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -147,8 +145,7 @@ private[r] object FMRegressorWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ private[r] object FPGrowthWrapper extends MLReadable[FPGrowthWrapper] {
"class" -> instance.getClass.getName
))

sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)

instance.fpGrowthModel.save(modelPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,7 @@ private[r] object GBTClassifierWrapper extends MLReadable[GBTClassifierWrapper]
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -153,8 +151,7 @@ private[r] object GBTClassifierWrapper extends MLReadable[GBTClassifierWrapper]
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,7 @@ private[r] object GBTRegressorWrapper extends MLReadable[GBTRegressorWrapper] {
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -137,8 +135,7 @@ private[r] object GBTRegressorWrapper extends MLReadable[GBTRegressorWrapper] {
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val formula = (rMetadata \ "formula").extract[String]
val features = (rMetadata \ "features").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,7 @@ private[r] object GaussianMixtureWrapper extends MLReadable[GaussianMixtureWrapp
("logLikelihood" -> instance.logLikelihood)
val rMetadataJson: String = compact(render(rMetadata))

sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -128,8 +126,7 @@ private[r] object GaussianMixtureWrapper extends MLReadable[GaussianMixtureWrapp
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val dim = (rMetadata \ "dim").extract[Int]
val logLikelihood = (rMetadata \ "logLikelihood").extract[Double]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,7 @@ private[r] object GeneralizedLinearRegressionWrapper
("rAic" -> instance.rAic) ~
("rNumIterations" -> instance.rNumIterations)
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -186,8 +184,7 @@ private[r] object GeneralizedLinearRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val rFeatures = (rMetadata \ "rFeatures").extract[Array[String]]
val rCoefficients = (rMetadata \ "rCoefficients").extract[Array[Double]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ private[r] object IsotonicRegressionWrapper
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -114,8 +112,7 @@ private[r] object IsotonicRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,7 @@ private[r] object KMeansWrapper extends MLReadable[KMeansWrapper] {
("size" -> instance.size.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))

sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
instance.pipeline.save(pipelinePath)
}
}
Expand All @@ -138,8 +136,7 @@ private[r] object KMeansWrapper extends MLReadable[KMeansWrapper] {
val pipelinePath = new Path(path, "pipeline").toString
val pipeline = PipelineModel.load(pipelinePath)

val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val size = (rMetadata \ "size").extract[Array[Long]]
Expand Down
7 changes: 2 additions & 5 deletions mllib/src/main/scala/org/apache/spark/ml/r/LDAWrapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,7 @@ private[r] object LDAWrapper extends MLReadable[LDAWrapper] {
("logPerplexity" -> instance.logPerplexity) ~
("vocabulary" -> instance.vocabulary.toList)
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -213,8 +211,7 @@ private[r] object LDAWrapper extends MLReadable[LDAWrapper] {
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val logLikelihood = (rMetadata \ "logLikelihood").extract[Double]
val logPerplexity = (rMetadata \ "logPerplexity").extract[Double]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,7 @@ private[r] object LinearRegressionWrapper
val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -142,8 +140,7 @@ private[r] object LinearRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,7 @@ private[r] object LinearSVCWrapper
("features" -> instance.features.toImmutableArraySeq) ~
("labels" -> instance.labels.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -152,8 +150,7 @@ private[r] object LinearSVCWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val labels = (rMetadata \ "labels").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,7 @@ private[r] object LogisticRegressionWrapper
("features" -> instance.features.toImmutableArraySeq) ~
("labels" -> instance.labels.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -207,8 +205,7 @@ private[r] object LogisticRegressionWrapper
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val labels = (rMetadata \ "labels").extract[Array[String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,7 @@ private[r] object MultilayerPerceptronClassifierWrapper

val rMetadata = "class" -> instance.getClass.getName
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,7 @@ private[r] object NaiveBayesWrapper extends MLReadable[NaiveBayesWrapper] {
("labels" -> instance.labels.toImmutableArraySeq) ~
("features" -> instance.features.toImmutableArraySeq)
val rMetadataJson: String = compact(render(rMetadata))
sparkSession.createDataFrame(
Seq(Tuple1(rMetadataJson))
).repartition(1).write.text(rMetadataPath)
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
Expand All @@ -117,8 +115,7 @@ private[r] object NaiveBayesWrapper extends MLReadable[NaiveBayesWrapper] {
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sparkSession.read.text(rMetadataPath)
.first().getString(0)
val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val labels = (rMetadata \ "labels").extract[Array[String]]
val features = (rMetadata \ "features").extract[Array[String]]
Expand Down
Loading

0 comments on commit cc32137

Please sign in to comment.