From fbb19d7dc473da37fc0e0f96516ff68e09166282 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Mon, 13 Aug 2018 13:49:52 -0700 Subject: [PATCH] OS-neutral filesystem path creation --- .../com/salesforce/op/cli/SchemaSource.scala | 3 +-- .../op/cli/gen/templates/SimpleProject.scala | 12 ++++++---- .../salesforce/op/cli/CliFullCycleTest.scala | 2 +- .../com/salesforce/op/cli/CliTestBase.scala | 9 +++---- .../com/salesforce/op/cli/gen/OpsTest.scala | 11 +++++---- .../com/salesforce/op/OpWorkflowRunner.scala | 3 ++- .../com/salesforce/op/OpWorkflowCVTest.scala | 7 +++--- .../salesforce/op/OpWorkflowRunnerTest.scala | 24 +++++++++---------- .../op/utils/spark/RichDatasetTest.scala | 13 +++++----- .../op/test/PassengerSparkFixtureTest.scala | 13 ++++------ .../com/salesforce/op/test/TestCommon.scala | 5 +++- .../op/utils/io/avro/AvroInOutTest.scala | 8 ++++--- .../op/utils/spark/RichRDDTest.scala | 4 ++-- 13 files changed, 61 insertions(+), 53 deletions(-) diff --git a/cli/src/main/scala/com/salesforce/op/cli/SchemaSource.scala b/cli/src/main/scala/com/salesforce/op/cli/SchemaSource.scala index 1254ab62df..ee863c8a81 100644 --- a/cli/src/main/scala/com/salesforce/op/cli/SchemaSource.scala +++ b/cli/src/main/scala/com/salesforce/op/cli/SchemaSource.scala @@ -42,7 +42,6 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} import scala.collection.JavaConverters._ -import scala.util.Try /** * A variety of functionalities for pulling data schema. @@ -147,7 +146,7 @@ case class AutomaticSchema(recordClassName: String)(dataFile: File) extends Sche override def schemaFile: File = { val goodName = recordClassName.capitalize - val file = new File(s"/tmp/$goodName.avsc") + val file = File.createTempFile(goodName, ".avsc") val out = new FileWriter(file) val s = dataSchema.toString(true).replace(defaultName, goodName) out.write(s) diff --git a/cli/src/main/scala/com/salesforce/op/cli/gen/templates/SimpleProject.scala b/cli/src/main/scala/com/salesforce/op/cli/gen/templates/SimpleProject.scala index 55559862ec..62bd898e26 100644 --- a/cli/src/main/scala/com/salesforce/op/cli/gen/templates/SimpleProject.scala +++ b/cli/src/main/scala/com/salesforce/op/cli/gen/templates/SimpleProject.scala @@ -31,8 +31,9 @@ package com.salesforce.op.cli.gen.templates import java.io.File +import java.nio.file.Paths -import com.salesforce.op.cli.{AvroSchemaFromFile, GeneratorConfig} +import com.salesforce.op.cli.GeneratorConfig import com.salesforce.op.cli.gen.FileGenerator.Substitutions import com.salesforce.op.cli.gen._ @@ -77,10 +78,11 @@ case class SimpleProject(ops: Ops) extends ProjectGenerator { val mainClass = s"'com.salesforce.app.${conf.projName}'" - val modelLocation = new File(conf.projectDirectory, "build/spark/model") - val scoreLocation = new File(conf.projectDirectory, "build/spark/scores") - val evalLocation = new File(conf.projectDirectory, "build/spark/eval") - val metricsLocation = new File(conf.projectDirectory, "build/spark/metrics") + val projectBuildRoot = Paths.get(conf.projectDirectory.toString, "build", "spark").toString + val modelLocation = Paths.get(projectBuildRoot, "model").toFile + val scoreLocation = Paths.get(projectBuildRoot, "scores").toFile + val evalLocation = Paths.get(projectBuildRoot, "eval").toFile + val metricsLocation = Paths.get(projectBuildRoot, "metrics").toFile val readerChoice = schema.theReader diff --git a/cli/src/test/scala/com/salesforce/op/cli/CliFullCycleTest.scala b/cli/src/test/scala/com/salesforce/op/cli/CliFullCycleTest.scala index 8880ffac00..286a9e40b3 100644 --- a/cli/src/test/scala/com/salesforce/op/cli/CliFullCycleTest.scala +++ b/cli/src/test/scala/com/salesforce/op/cli/CliFullCycleTest.scala @@ -108,7 +108,7 @@ class CliFullCycleTest extends CliTestBase { cmdSh.write(cmdStr) cmdSh.close() - val proc = Process("sh" :: "cmd" :: Nil, projectDir.getAbsoluteFile) + val proc = Process("sh" :: "cmd" :: Nil, new File(projectDir).getAbsoluteFile) val logger = ProcessLogger(s => log.info(s), s => log.error(s)) val code = proc !< logger diff --git a/cli/src/test/scala/com/salesforce/op/cli/CliTestBase.scala b/cli/src/test/scala/com/salesforce/op/cli/CliTestBase.scala index 9742269d37..edcf4ba91d 100644 --- a/cli/src/test/scala/com/salesforce/op/cli/CliTestBase.scala +++ b/cli/src/test/scala/com/salesforce/op/cli/CliTestBase.scala @@ -31,6 +31,7 @@ package com.salesforce.op.cli import java.io.{ByteArrayOutputStream, File, StringReader} +import java.nio.file.Paths import com.salesforce.op.OpWorkflowRunType import com.salesforce.op.test.TestCommon @@ -108,17 +109,17 @@ class CliTestBase extends FlatSpec with TestCommon with Assertions with BeforeAn val expectedSourceFiles = "Features.scala" :: s"$ProjectName.scala"::Nil - val projectDir = new File(ProjectName.toLowerCase) + val projectDir = ProjectName.toLowerCase def checkAvroFile(source: File): Unit = { - val avroFile = new File(projectDir, s"src/main/avro/${source.getName}") + val avroFile = Paths.get(projectDir, "src", "main", "avro", source.getName).toFile avroFile should exist Source.fromFile(avroFile).getLines.mkString("\n") shouldBe Source.fromFile(source).getLines.mkString("\n") } def checkScalaFiles(shouldNotContain: String): Unit = { - val srcDir = new File(projectDir, "src/main/scala/com/salesforce/app") + val srcDir = Paths.get(projectDir, "src", "main", "scala", "com", "salesforce", "app").toFile srcDir should exist for { @@ -136,7 +137,7 @@ class CliTestBase extends FlatSpec with TestCommon with Assertions with BeforeAn def findFile(relPath: String): String = { Option(new File(relPath)) filter (_.exists) orElse - Option(new File(new File(".."), relPath)) filter (_.exists) getOrElse { + Option(Paths.get("fake-rel-path").relativize(Paths.get(relPath)).toFile) filter (_.exists) getOrElse { throw new UnsupportedOperationException( s"Could not find file $relPath, current is ${new File(".").getAbsolutePath}") } getAbsolutePath diff --git a/cli/src/test/scala/com/salesforce/op/cli/gen/OpsTest.scala b/cli/src/test/scala/com/salesforce/op/cli/gen/OpsTest.scala index 1cf94f33ab..e5eb8b0fb1 100644 --- a/cli/src/test/scala/com/salesforce/op/cli/gen/OpsTest.scala +++ b/cli/src/test/scala/com/salesforce/op/cli/gen/OpsTest.scala @@ -31,6 +31,7 @@ package com.salesforce.op.cli.gen import java.io.File +import java.nio.file.Paths import com.salesforce.op.cli.{AvroSchemaFromFile, CliParameters, GeneratorConfig} import com.salesforce.op.test.TestCommon @@ -50,11 +51,11 @@ class OpsTest extends FlatSpec with TestCommon with Assertions { val testParams = CliParameters( location = tempFolder, projName = "cli_test", - inputFile = Some(new File("templates/simple/src/main/resources/PassengerData.csv")), + inputFile = Some(Paths.get("templates", "simple", "src", "main", "resources", "PassengerData.csv").toFile), response = Some("survived"), idField = Some("passengerId"), - schemaSource = Some(AvroSchemaFromFile(new File("utils/src/main/avro/PassengerCSV.avsc"))), - answersFile = Some(new File("cli/passengers.answers")), + schemaSource = Some(AvroSchemaFromFile(Paths.get("utils", "src", "main", "avro", "PassengerCSV.avsc").toFile)), + answersFile = Some(new File("cli", "passengers.answers")), overwrite = true).values Spec[Ops] should "generate project files" in { @@ -72,8 +73,8 @@ class OpsTest extends FlatSpec with TestCommon with Assertions { buildFileContent should include("credentials artifactoryCredentials") - val scalaSourcesFolder = new File(projectFolder, "src/main/scala/com/salesforce/app") - val featuresFile = Source.fromFile(new File(scalaSourcesFolder, "Features.scala")).getLines + val scalaSourcesFolder = Paths.get(projectFolder.toString, "src", "main", "scala", "com", "salesforce", "app") + val featuresFile = Source.fromFile(new File(scalaSourcesFolder.toFile, "Features.scala")).getLines val heightLine = featuresFile.find(_ contains "height") map (_.trim) heightLine shouldBe Some( "val height = FeatureBuilder.Real[PassengerCSV].extract(o => o.getHeight.toReal).asPredictor" diff --git a/core/src/main/scala/com/salesforce/op/OpWorkflowRunner.scala b/core/src/main/scala/com/salesforce/op/OpWorkflowRunner.scala index 198010e895..e3f5277349 100644 --- a/core/src/main/scala/com/salesforce/op/OpWorkflowRunner.scala +++ b/core/src/main/scala/com/salesforce/op/OpWorkflowRunner.scala @@ -31,6 +31,7 @@ package com.salesforce.op import java.io.File +import java.nio.file.Paths import com.github.fommil.netlib.{BLAS, LAPACK} import com.salesforce.op.evaluators.{EvaluationMetrics, OpEvaluatorBase} @@ -234,7 +235,7 @@ class OpWorkflowRunner "The streamingScore method requires an streaming score reader to be specified") // Prepare write path - def writePath(timeInMs: Long) = Some(s"${params.writeLocation.get.stripSuffix("/")}/$timeInMs") + def writePath(timeInMs: Long) = Some(Paths.get(params.writeLocation.get, timeInMs.toString)) // Load the model to score with and prepare the scoring function val workflowModel = workflow.loadModel(params.modelLocation.get).setParameters(params) diff --git a/core/src/test/scala/com/salesforce/op/OpWorkflowCVTest.scala b/core/src/test/scala/com/salesforce/op/OpWorkflowCVTest.scala index dfce4637cb..6e09bdd72f 100644 --- a/core/src/test/scala/com/salesforce/op/OpWorkflowCVTest.scala +++ b/core/src/test/scala/com/salesforce/op/OpWorkflowCVTest.scala @@ -30,6 +30,8 @@ package com.salesforce.op +import java.nio.file.Paths + import com.salesforce.app.schema.PassengerDataAll import com.salesforce.op.evaluators._ import com.salesforce.op.features._ @@ -41,7 +43,7 @@ import com.salesforce.op.stages.impl.classification.ClassificationModelsToTry._ import com.salesforce.op.stages.impl.classification._ import com.salesforce.op.stages.impl.preparators.SanityChecker import com.salesforce.op.stages.impl.regression.{LossType, RegressionModelSelector, RegressionModelsToTry} -import com.salesforce.op.stages.impl.selector.{ModelSelectorBase, ModelSelectorBaseNames} +import com.salesforce.op.stages.impl.selector.ModelSelectorBase import com.salesforce.op.stages.impl.tuning._ import com.salesforce.op.test.PassengerSparkFixtureTest import org.apache.spark.ml.PipelineStage @@ -50,7 +52,6 @@ import org.apache.spark.sql.DataFrame import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner -import com.salesforce.op.utils.spark.RichRow._ import org.slf4j.LoggerFactory @RunWith(classOf[JUnitRunner]) @@ -60,7 +61,7 @@ class OpWorkflowCVTest extends FlatSpec with PassengerSparkFixtureTest { trait PassenserCSVforCV { val simplePassengerForCV = DataReaders.Simple.csv[PassengerDataAll]( - path = Some(s"$testDataPath/PassengerDataAll.csv"), + path = Some(Paths.get(testDataDir, "PassengerDataAll.csv").toString), schema = PassengerDataAll.getClassSchema.toString, key = _.getPassengerId.toString ) diff --git a/core/src/test/scala/com/salesforce/op/OpWorkflowRunnerTest.scala b/core/src/test/scala/com/salesforce/op/OpWorkflowRunnerTest.scala index dd5c67ee69..d03fa73a30 100644 --- a/core/src/test/scala/com/salesforce/op/OpWorkflowRunnerTest.scala +++ b/core/src/test/scala/com/salesforce/op/OpWorkflowRunnerTest.scala @@ -31,13 +31,14 @@ package com.salesforce.op import java.io.File +import java.nio.file.Paths import com.salesforce.op.OpWorkflowRunType._ import com.salesforce.op.evaluators.{BinaryClassificationMetrics, Evaluators} import com.salesforce.op.features.types._ import com.salesforce.op.readers.DataFrameFieldNames._ +import com.salesforce.op.stages.impl.classification.BinaryClassificationModelSelector import com.salesforce.op.stages.impl.classification.ClassificationModelsToTry.LogisticRegression -import com.salesforce.op.stages.impl.classification.{BinaryClassificationModelSelector, OpLogisticRegression} import com.salesforce.op.test.{PassengerSparkFixtureTest, TestSparkStreamingContext} import com.salesforce.op.utils.spark.AppMetrics import com.salesforce.op.utils.spark.RichDataset._ @@ -47,7 +48,6 @@ import org.scalactic.source import org.scalatest.AsyncFlatSpec import org.scalatest.junit.JUnitRunner import org.slf4j.LoggerFactory -import org.apache.log4j.Level import scala.collection.JavaConverters._ import scala.concurrent.Promise @@ -60,7 +60,7 @@ class OpWorkflowRunnerTest extends AsyncFlatSpec val log = LoggerFactory.getLogger(this.getClass) - val thisDir = new File("resources/tmp/OpWorkflowRunnerTest/").getCanonicalFile + val thisDir = Paths.get("resources", "tmp", "OpWorkflowRunnerTest").toFile.getCanonicalFile override def beforeAll: Unit = try deleteRecursively(thisDir) finally super.beforeAll override def afterAll: Unit = try deleteRecursively(thisDir) finally super.afterAll @@ -133,8 +133,8 @@ class OpWorkflowRunnerTest extends AsyncFlatSpec } it should "train a workflow and write the trained model" in { - lazy val modelLocation = new File(thisDir + "/op-runner-test-model") - lazy val modelMetricsLocation = new File(thisDir + "/op-runner-test-metrics/train") + lazy val modelLocation = new File(thisDir, "op-runner-test-model") + lazy val modelMetricsLocation = Paths.get(thisDir.toString, "op-runner-test-metrics", "train").toFile val runConfig = testConfig.copy( runType = Train, @@ -146,8 +146,8 @@ class OpWorkflowRunnerTest extends AsyncFlatSpec } it should "score a dataset with a trained model" in { - val scoresLocation = new File(thisDir + "/op-runner-test-write/score") - val scoringMetricsLocation = new File(thisDir + "/op-runner-test-metrics/score") + val scoresLocation = Paths.get(thisDir.toString, "op-runner-test-write", "score").toFile + val scoringMetricsLocation = Paths.get(thisDir.toString, "op-runner-test-metrics", "score").toFile val runConfig = testConfig.copy( runType = Score, @@ -162,13 +162,13 @@ class OpWorkflowRunnerTest extends AsyncFlatSpec } it should "streaming score a dataset with a trained model" in { - val readLocation = new File(thisDir + "/op-runner-test-read/streaming-score") - val scoresLocation = new File(thisDir + "/op-runner-test-write/streaming-score") + val readLocation = Paths.get(thisDir.toString, "op-runner-test-read", "streaming-score").toFile + val scoresLocation = Paths.get(thisDir.toString, "op-runner-test-write", "streaming-score").toFile // Prepare streaming input data FileUtils.forceMkdir(readLocation) val passengerAvroFile = new File(passengerAvroPath).getCanonicalFile - FileUtils.copyFile(passengerAvroFile, new File(readLocation + "/" + passengerAvroFile.getName), false) + FileUtils.copyFile(passengerAvroFile, new File(readLocation, passengerAvroFile.getName), false) val runConfig = testConfig.copy( runType = StreamingScore, @@ -185,7 +185,7 @@ class OpWorkflowRunnerTest extends AsyncFlatSpec } it should "evaluate a dataset with a trained model" in { - val metricsLocation = new File(thisDir + "/op-runner-test-metrics/eval") + val metricsLocation = Paths.get(thisDir.toString, "op-runner-test-metrics", "eval").toFile val runConfig = testConfig.copy( runType = Evaluate, @@ -196,7 +196,7 @@ class OpWorkflowRunnerTest extends AsyncFlatSpec } it should "compute features upto with a workflow" in { - lazy val featuresLocation = new File(thisDir + "/op-runner-test-write/features") + lazy val featuresLocation = Paths.get(thisDir.toString, "op-runner-test-write", "features").toFile val runConfig = testConfig.copy( runType = Features, diff --git a/features/src/test/scala/com/salesforce/op/utils/spark/RichDatasetTest.scala b/features/src/test/scala/com/salesforce/op/utils/spark/RichDatasetTest.scala index 59d3546997..e5af1eae92 100644 --- a/features/src/test/scala/com/salesforce/op/utils/spark/RichDatasetTest.scala +++ b/features/src/test/scala/com/salesforce/op/utils/spark/RichDatasetTest.scala @@ -30,35 +30,36 @@ package com.salesforce.op.utils.spark -import language.postfixOps import java.io.File import com.salesforce.op.features.Feature import com.salesforce.op.features.types._ -import com.salesforce.op.test.{TestFeatureBuilder, TestSparkContext} import com.salesforce.op.test.SparkMatchers._ +import com.salesforce.op.test.{TestFeatureBuilder, TestSparkContext} +import org.apache.spark.ml.linalg.Vectors import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset} import org.joda.time.DateTime import org.junit.runner.RunWith +import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner -import org.scalatest.{FlatSpec, Matchers} -import org.apache.spark.ml.linalg.Vectors import org.slf4j.LoggerFactory +import scala.language.postfixOps + @RunWith(classOf[JUnitRunner]) class RichDatasetTest extends FlatSpec with TestSparkContext { // TODO: fix implicit scope conflicts with 'org.apache.spark.sql.functions._' import com.salesforce.op.utils.spark.RichDataType._ - import com.salesforce.op.utils.spark.RichMetadata._ import com.salesforce.op.utils.spark.RichDataset._ + import com.salesforce.op.utils.spark.RichMetadata._ val log = LoggerFactory.getLogger(this.getClass) - lazy val savedPath = new File(tempDir + "/richDS-" + DateTime.now().getMillis) + lazy val savedPath = new File(tempDir, "richDS-" + DateTime.now().getMillis) private val data = Seq[(Integral, Real, Text, Binary, Real)]( diff --git a/readers/src/main/scala/com/salesforce/op/test/PassengerSparkFixtureTest.scala b/readers/src/main/scala/com/salesforce/op/test/PassengerSparkFixtureTest.scala index ae58ed26fa..a021a53961 100644 --- a/readers/src/main/scala/com/salesforce/op/test/PassengerSparkFixtureTest.scala +++ b/readers/src/main/scala/com/salesforce/op/test/PassengerSparkFixtureTest.scala @@ -30,7 +30,7 @@ package com.salesforce.op.test -import java.io.File +import java.nio.file.Paths import com.salesforce.op.aggregators.CutOffTime import com.salesforce.op.readers._ @@ -42,13 +42,10 @@ import scala.language.postfixOps trait PassengerSparkFixtureTest extends TestSparkContext with PassengerFeaturesTest { self: Suite => - def testDataPath: String = { - Some(new File("test-data")) filter (_.isDirectory) getOrElse new File("../test-data") getPath - } - def passengerAvroPath: String = s"$testDataPath/PassengerData.avro" - def passengerCsvPath: String = s"$testDataPath/PassengerData.csv" - def passengerCsvWithHeaderPath: String = s"$testDataPath/PassengerDataWithHeader.csv" - def passengerProfileCsvPath: String = s"$testDataPath/PassengerProfileData.csv" + def passengerAvroPath: String = Paths.get(testDataDir, "PassengerData.avro").toString + def passengerCsvPath: String = Paths.get(testDataDir, "PassengerData.csv").toString + def passengerCsvWithHeaderPath: String = Paths.get(testDataDir, "PassengerDataWithHeader.csv").toString + def passengerProfileCsvPath: String = Paths.get(testDataDir, "PassengerProfileData.csv").toString lazy val simpleReader: AvroReader[Passenger] = DataReaders.Simple.avro[Passenger]( diff --git a/utils/src/main/scala/com/salesforce/op/test/TestCommon.scala b/utils/src/main/scala/com/salesforce/op/test/TestCommon.scala index 80df6791ea..aced13de53 100644 --- a/utils/src/main/scala/com/salesforce/op/test/TestCommon.scala +++ b/utils/src/main/scala/com/salesforce/op/test/TestCommon.scala @@ -31,6 +31,7 @@ package com.salesforce.op.test import java.io.File +import java.nio.file.Paths import org.apache.log4j.{Level, LogManager, Logger} import org.scalatest._ @@ -72,7 +73,9 @@ trait TestCommon extends Matchers with Assertions { * @return directory path */ def testDataDir: String = { - Some(new File("test-data")) filter (_.isDirectory) getOrElse new File("../test-data") getPath + Some(new File("test-data")) + .collect{ case d if d.isDirectory => d.getPath} + .getOrElse(Paths.get("test-data-sibling").relativize(Paths.get("test-data")).toString) } /** diff --git a/utils/src/test/scala/com/salesforce/op/utils/io/avro/AvroInOutTest.scala b/utils/src/test/scala/com/salesforce/op/utils/io/avro/AvroInOutTest.scala index 4ee5ee7024..601b5cabc8 100644 --- a/utils/src/test/scala/com/salesforce/op/utils/io/avro/AvroInOutTest.scala +++ b/utils/src/test/scala/com/salesforce/op/utils/io/avro/AvroInOutTest.scala @@ -31,6 +31,7 @@ package com.salesforce.op.utils.io.avro import java.io.{File, FileNotFoundException, FileWriter} +import java.nio.file.Paths import com.salesforce.op.test.TestSparkContext import com.salesforce.op.utils.io.avro.AvroInOut._ @@ -96,14 +97,15 @@ class AvroInOutTest extends FlatSpec with TestSparkContext { } it should "checkPathsExist" in { - val f1 = new File("/tmp/avroinouttest") + val tmpDir = Paths.get(File.separator, "tmp").toFile + val f1 = new File(tmpDir, "avroinouttest") f1.delete() val w = new FileWriter(f1) w.write("just checking") w.close() - val f2 = new File("/tmp/thisfilecannotexist") + val f2 = new File(tmpDir, "thisfilecannotexist") f2.delete() - val f3 = new File("/tmp/this file cannot exist") + val f3 = new File(tmpDir, "this file cannot exist") f3.delete() assume(f1.exists && !f2.exists && !f3.exists) diff --git a/utils/src/test/scala/com/salesforce/op/utils/spark/RichRDDTest.scala b/utils/src/test/scala/com/salesforce/op/utils/spark/RichRDDTest.scala index b06ba7c43f..0a35e7de0a 100644 --- a/utils/src/test/scala/com/salesforce/op/utils/spark/RichRDDTest.scala +++ b/utils/src/test/scala/com/salesforce/op/utils/spark/RichRDDTest.scala @@ -52,14 +52,14 @@ class RichRDDTest extends PropSpec with PropertyChecks with TestSparkContext { property("save as a text file") { forAll(data) { rdd => - val out = new File(tempDir + "/op-richrdd-" + DateTime.now().getMillis).toString + val out = new File(tempDir, "op-richrdd-" + DateTime.now().getMillis).toString rdd.saveAsTextFile(out, None, new JobConf(rdd.context.hadoopConfiguration)) spark.read.textFile(out).count() shouldBe rdd.count() } } property("save as a compressed text file") { forAll(data) { rdd => - val out = new File(tempDir + "/op-richrdd-" + DateTime.now().getMillis).toString + val out = new File(tempDir, "op-richrdd-" + DateTime.now().getMillis).toString rdd.saveAsTextFile(out, Some(classOf[DefaultCodec]), new JobConf(rdd.context.hadoopConfiguration)) spark.read.textFile(out).count() shouldBe rdd.count() }