Skip to content

Commit

Permalink
OS-neutral filesystem path creation
Browse files Browse the repository at this point in the history
  • Loading branch information
gerashegalov committed Aug 13, 2018
1 parent f379975 commit fbb19d7
Show file tree
Hide file tree
Showing 13 changed files with 61 additions and 53 deletions.
3 changes: 1 addition & 2 deletions cli/src/main/scala/com/salesforce/op/cli/SchemaSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.JavaConverters._
import scala.util.Try

/**
* A variety of functionalities for pulling data schema.
Expand Down Expand Up @@ -147,7 +146,7 @@ case class AutomaticSchema(recordClassName: String)(dataFile: File) extends Sche

override def schemaFile: File = {
val goodName = recordClassName.capitalize
val file = new File(s"/tmp/$goodName.avsc")
val file = File.createTempFile(goodName, ".avsc")
val out = new FileWriter(file)
val s = dataSchema.toString(true).replace(defaultName, goodName)
out.write(s)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@
package com.salesforce.op.cli.gen.templates

import java.io.File
import java.nio.file.Paths

import com.salesforce.op.cli.{AvroSchemaFromFile, GeneratorConfig}
import com.salesforce.op.cli.GeneratorConfig
import com.salesforce.op.cli.gen.FileGenerator.Substitutions
import com.salesforce.op.cli.gen._

Expand Down Expand Up @@ -77,10 +78,11 @@ case class SimpleProject(ops: Ops) extends ProjectGenerator {

val mainClass = s"'com.salesforce.app.${conf.projName}'"

val modelLocation = new File(conf.projectDirectory, "build/spark/model")
val scoreLocation = new File(conf.projectDirectory, "build/spark/scores")
val evalLocation = new File(conf.projectDirectory, "build/spark/eval")
val metricsLocation = new File(conf.projectDirectory, "build/spark/metrics")
val projectBuildRoot = Paths.get(conf.projectDirectory.toString, "build", "spark").toString
val modelLocation = Paths.get(projectBuildRoot, "model").toFile
val scoreLocation = Paths.get(projectBuildRoot, "scores").toFile
val evalLocation = Paths.get(projectBuildRoot, "eval").toFile
val metricsLocation = Paths.get(projectBuildRoot, "metrics").toFile

val readerChoice = schema.theReader

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class CliFullCycleTest extends CliTestBase {
cmdSh.write(cmdStr)
cmdSh.close()

val proc = Process("sh" :: "cmd" :: Nil, projectDir.getAbsoluteFile)
val proc = Process("sh" :: "cmd" :: Nil, new File(projectDir).getAbsoluteFile)
val logger = ProcessLogger(s => log.info(s), s => log.error(s))
val code = proc !< logger

Expand Down
9 changes: 5 additions & 4 deletions cli/src/test/scala/com/salesforce/op/cli/CliTestBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
package com.salesforce.op.cli

import java.io.{ByteArrayOutputStream, File, StringReader}
import java.nio.file.Paths

import com.salesforce.op.OpWorkflowRunType
import com.salesforce.op.test.TestCommon
Expand Down Expand Up @@ -108,17 +109,17 @@ class CliTestBase extends FlatSpec with TestCommon with Assertions with BeforeAn

val expectedSourceFiles = "Features.scala" :: s"$ProjectName.scala"::Nil

val projectDir = new File(ProjectName.toLowerCase)
val projectDir = ProjectName.toLowerCase

def checkAvroFile(source: File): Unit = {
val avroFile = new File(projectDir, s"src/main/avro/${source.getName}")
val avroFile = Paths.get(projectDir, "src", "main", "avro", source.getName).toFile
avroFile should exist
Source.fromFile(avroFile).getLines.mkString("\n") shouldBe
Source.fromFile(source).getLines.mkString("\n")
}

def checkScalaFiles(shouldNotContain: String): Unit = {
val srcDir = new File(projectDir, "src/main/scala/com/salesforce/app")
val srcDir = Paths.get(projectDir, "src", "main", "scala", "com", "salesforce", "app").toFile
srcDir should exist

for {
Expand All @@ -136,7 +137,7 @@ class CliTestBase extends FlatSpec with TestCommon with Assertions with BeforeAn

def findFile(relPath: String): String = {
Option(new File(relPath)) filter (_.exists) orElse
Option(new File(new File(".."), relPath)) filter (_.exists) getOrElse {
Option(Paths.get("fake-rel-path").relativize(Paths.get(relPath)).toFile) filter (_.exists) getOrElse {
throw new UnsupportedOperationException(
s"Could not find file $relPath, current is ${new File(".").getAbsolutePath}")
} getAbsolutePath
Expand Down
11 changes: 6 additions & 5 deletions cli/src/test/scala/com/salesforce/op/cli/gen/OpsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
package com.salesforce.op.cli.gen

import java.io.File
import java.nio.file.Paths

import com.salesforce.op.cli.{AvroSchemaFromFile, CliParameters, GeneratorConfig}
import com.salesforce.op.test.TestCommon
Expand All @@ -50,11 +51,11 @@ class OpsTest extends FlatSpec with TestCommon with Assertions {
val testParams = CliParameters(
location = tempFolder,
projName = "cli_test",
inputFile = Some(new File("templates/simple/src/main/resources/PassengerData.csv")),
inputFile = Some(Paths.get("templates", "simple", "src", "main", "resources", "PassengerData.csv").toFile),
response = Some("survived"),
idField = Some("passengerId"),
schemaSource = Some(AvroSchemaFromFile(new File("utils/src/main/avro/PassengerCSV.avsc"))),
answersFile = Some(new File("cli/passengers.answers")),
schemaSource = Some(AvroSchemaFromFile(Paths.get("utils", "src", "main", "avro", "PassengerCSV.avsc").toFile)),
answersFile = Some(new File("cli", "passengers.answers")),
overwrite = true).values

Spec[Ops] should "generate project files" in {
Expand All @@ -72,8 +73,8 @@ class OpsTest extends FlatSpec with TestCommon with Assertions {

buildFileContent should include("credentials artifactoryCredentials")

val scalaSourcesFolder = new File(projectFolder, "src/main/scala/com/salesforce/app")
val featuresFile = Source.fromFile(new File(scalaSourcesFolder, "Features.scala")).getLines
val scalaSourcesFolder = Paths.get(projectFolder.toString, "src", "main", "scala", "com", "salesforce", "app")
val featuresFile = Source.fromFile(new File(scalaSourcesFolder.toFile, "Features.scala")).getLines
val heightLine = featuresFile.find(_ contains "height") map (_.trim)
heightLine shouldBe Some(
"val height = FeatureBuilder.Real[PassengerCSV].extract(o => o.getHeight.toReal).asPredictor"
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/com/salesforce/op/OpWorkflowRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
package com.salesforce.op

import java.io.File
import java.nio.file.Paths

import com.github.fommil.netlib.{BLAS, LAPACK}
import com.salesforce.op.evaluators.{EvaluationMetrics, OpEvaluatorBase}
Expand Down Expand Up @@ -234,7 +235,7 @@ class OpWorkflowRunner
"The streamingScore method requires an streaming score reader to be specified")

// Prepare write path
def writePath(timeInMs: Long) = Some(s"${params.writeLocation.get.stripSuffix("/")}/$timeInMs")
def writePath(timeInMs: Long) = Some(Paths.get(params.writeLocation.get, timeInMs.toString))

// Load the model to score with and prepare the scoring function
val workflowModel = workflow.loadModel(params.modelLocation.get).setParameters(params)
Expand Down
7 changes: 4 additions & 3 deletions core/src/test/scala/com/salesforce/op/OpWorkflowCVTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

package com.salesforce.op

import java.nio.file.Paths

import com.salesforce.app.schema.PassengerDataAll
import com.salesforce.op.evaluators._
import com.salesforce.op.features._
Expand All @@ -41,7 +43,7 @@ import com.salesforce.op.stages.impl.classification.ClassificationModelsToTry._
import com.salesforce.op.stages.impl.classification._
import com.salesforce.op.stages.impl.preparators.SanityChecker
import com.salesforce.op.stages.impl.regression.{LossType, RegressionModelSelector, RegressionModelsToTry}
import com.salesforce.op.stages.impl.selector.{ModelSelectorBase, ModelSelectorBaseNames}
import com.salesforce.op.stages.impl.selector.ModelSelectorBase
import com.salesforce.op.stages.impl.tuning._
import com.salesforce.op.test.PassengerSparkFixtureTest
import org.apache.spark.ml.PipelineStage
Expand All @@ -50,7 +52,6 @@ import org.apache.spark.sql.DataFrame
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
import com.salesforce.op.utils.spark.RichRow._
import org.slf4j.LoggerFactory

@RunWith(classOf[JUnitRunner])
Expand All @@ -60,7 +61,7 @@ class OpWorkflowCVTest extends FlatSpec with PassengerSparkFixtureTest {

trait PassenserCSVforCV {
val simplePassengerForCV = DataReaders.Simple.csv[PassengerDataAll](
path = Some(s"$testDataPath/PassengerDataAll.csv"),
path = Some(Paths.get(testDataDir, "PassengerDataAll.csv").toString),
schema = PassengerDataAll.getClassSchema.toString,
key = _.getPassengerId.toString
)
Expand Down
24 changes: 12 additions & 12 deletions core/src/test/scala/com/salesforce/op/OpWorkflowRunnerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@
package com.salesforce.op

import java.io.File
import java.nio.file.Paths

import com.salesforce.op.OpWorkflowRunType._
import com.salesforce.op.evaluators.{BinaryClassificationMetrics, Evaluators}
import com.salesforce.op.features.types._
import com.salesforce.op.readers.DataFrameFieldNames._
import com.salesforce.op.stages.impl.classification.BinaryClassificationModelSelector
import com.salesforce.op.stages.impl.classification.ClassificationModelsToTry.LogisticRegression
import com.salesforce.op.stages.impl.classification.{BinaryClassificationModelSelector, OpLogisticRegression}
import com.salesforce.op.test.{PassengerSparkFixtureTest, TestSparkStreamingContext}
import com.salesforce.op.utils.spark.AppMetrics
import com.salesforce.op.utils.spark.RichDataset._
Expand All @@ -47,7 +48,6 @@ import org.scalactic.source
import org.scalatest.AsyncFlatSpec
import org.scalatest.junit.JUnitRunner
import org.slf4j.LoggerFactory
import org.apache.log4j.Level

import scala.collection.JavaConverters._
import scala.concurrent.Promise
Expand All @@ -60,7 +60,7 @@ class OpWorkflowRunnerTest extends AsyncFlatSpec

val log = LoggerFactory.getLogger(this.getClass)

val thisDir = new File("resources/tmp/OpWorkflowRunnerTest/").getCanonicalFile
val thisDir = Paths.get("resources", "tmp", "OpWorkflowRunnerTest").toFile.getCanonicalFile

override def beforeAll: Unit = try deleteRecursively(thisDir) finally super.beforeAll
override def afterAll: Unit = try deleteRecursively(thisDir) finally super.afterAll
Expand Down Expand Up @@ -133,8 +133,8 @@ class OpWorkflowRunnerTest extends AsyncFlatSpec
}

it should "train a workflow and write the trained model" in {
lazy val modelLocation = new File(thisDir + "/op-runner-test-model")
lazy val modelMetricsLocation = new File(thisDir + "/op-runner-test-metrics/train")
lazy val modelLocation = new File(thisDir, "op-runner-test-model")
lazy val modelMetricsLocation = Paths.get(thisDir.toString, "op-runner-test-metrics", "train").toFile

val runConfig = testConfig.copy(
runType = Train,
Expand All @@ -146,8 +146,8 @@ class OpWorkflowRunnerTest extends AsyncFlatSpec
}

it should "score a dataset with a trained model" in {
val scoresLocation = new File(thisDir + "/op-runner-test-write/score")
val scoringMetricsLocation = new File(thisDir + "/op-runner-test-metrics/score")
val scoresLocation = Paths.get(thisDir.toString, "op-runner-test-write", "score").toFile
val scoringMetricsLocation = Paths.get(thisDir.toString, "op-runner-test-metrics", "score").toFile

val runConfig = testConfig.copy(
runType = Score,
Expand All @@ -162,13 +162,13 @@ class OpWorkflowRunnerTest extends AsyncFlatSpec
}

it should "streaming score a dataset with a trained model" in {
val readLocation = new File(thisDir + "/op-runner-test-read/streaming-score")
val scoresLocation = new File(thisDir + "/op-runner-test-write/streaming-score")
val readLocation = Paths.get(thisDir.toString, "op-runner-test-read", "streaming-score").toFile
val scoresLocation = Paths.get(thisDir.toString, "op-runner-test-write", "streaming-score").toFile

// Prepare streaming input data
FileUtils.forceMkdir(readLocation)
val passengerAvroFile = new File(passengerAvroPath).getCanonicalFile
FileUtils.copyFile(passengerAvroFile, new File(readLocation + "/" + passengerAvroFile.getName), false)
FileUtils.copyFile(passengerAvroFile, new File(readLocation, passengerAvroFile.getName), false)

val runConfig = testConfig.copy(
runType = StreamingScore,
Expand All @@ -185,7 +185,7 @@ class OpWorkflowRunnerTest extends AsyncFlatSpec
}

it should "evaluate a dataset with a trained model" in {
val metricsLocation = new File(thisDir + "/op-runner-test-metrics/eval")
val metricsLocation = Paths.get(thisDir.toString, "op-runner-test-metrics", "eval").toFile

val runConfig = testConfig.copy(
runType = Evaluate,
Expand All @@ -196,7 +196,7 @@ class OpWorkflowRunnerTest extends AsyncFlatSpec
}

it should "compute features upto with a workflow" in {
lazy val featuresLocation = new File(thisDir + "/op-runner-test-write/features")
lazy val featuresLocation = Paths.get(thisDir.toString, "op-runner-test-write", "features").toFile

val runConfig = testConfig.copy(
runType = Features,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,35 +30,36 @@

package com.salesforce.op.utils.spark

import language.postfixOps
import java.io.File

import com.salesforce.op.features.Feature
import com.salesforce.op.features.types._
import com.salesforce.op.test.{TestFeatureBuilder, TestSparkContext}
import com.salesforce.op.test.SparkMatchers._
import com.salesforce.op.test.{TestFeatureBuilder, TestSparkContext}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset}
import org.joda.time.DateTime
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
import org.scalatest.{FlatSpec, Matchers}
import org.apache.spark.ml.linalg.Vectors
import org.slf4j.LoggerFactory

import scala.language.postfixOps


@RunWith(classOf[JUnitRunner])
class RichDatasetTest extends FlatSpec with TestSparkContext {

// TODO: fix implicit scope conflicts with 'org.apache.spark.sql.functions._'
import com.salesforce.op.utils.spark.RichDataType._
import com.salesforce.op.utils.spark.RichMetadata._
import com.salesforce.op.utils.spark.RichDataset._
import com.salesforce.op.utils.spark.RichMetadata._

val log = LoggerFactory.getLogger(this.getClass)

lazy val savedPath = new File(tempDir + "/richDS-" + DateTime.now().getMillis)
lazy val savedPath = new File(tempDir, "richDS-" + DateTime.now().getMillis)

private val data =
Seq[(Integral, Real, Text, Binary, Real)](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

package com.salesforce.op.test

import java.io.File
import java.nio.file.Paths

import com.salesforce.op.aggregators.CutOffTime
import com.salesforce.op.readers._
Expand All @@ -42,13 +42,10 @@ import scala.language.postfixOps
trait PassengerSparkFixtureTest extends TestSparkContext with PassengerFeaturesTest {
self: Suite =>

def testDataPath: String = {
Some(new File("test-data")) filter (_.isDirectory) getOrElse new File("../test-data") getPath
}
def passengerAvroPath: String = s"$testDataPath/PassengerData.avro"
def passengerCsvPath: String = s"$testDataPath/PassengerData.csv"
def passengerCsvWithHeaderPath: String = s"$testDataPath/PassengerDataWithHeader.csv"
def passengerProfileCsvPath: String = s"$testDataPath/PassengerProfileData.csv"
def passengerAvroPath: String = Paths.get(testDataDir, "PassengerData.avro").toString
def passengerCsvPath: String = Paths.get(testDataDir, "PassengerData.csv").toString
def passengerCsvWithHeaderPath: String = Paths.get(testDataDir, "PassengerDataWithHeader.csv").toString
def passengerProfileCsvPath: String = Paths.get(testDataDir, "PassengerProfileData.csv").toString

lazy val simpleReader: AvroReader[Passenger] =
DataReaders.Simple.avro[Passenger](
Expand Down
5 changes: 4 additions & 1 deletion utils/src/main/scala/com/salesforce/op/test/TestCommon.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
package com.salesforce.op.test

import java.io.File
import java.nio.file.Paths

import org.apache.log4j.{Level, LogManager, Logger}
import org.scalatest._
Expand Down Expand Up @@ -72,7 +73,9 @@ trait TestCommon extends Matchers with Assertions {
* @return directory path
*/
def testDataDir: String = {
Some(new File("test-data")) filter (_.isDirectory) getOrElse new File("../test-data") getPath
Some(new File("test-data"))
.collect{ case d if d.isDirectory => d.getPath}
.getOrElse(Paths.get("test-data-sibling").relativize(Paths.get("test-data")).toString)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
package com.salesforce.op.utils.io.avro

import java.io.{File, FileNotFoundException, FileWriter}
import java.nio.file.Paths

import com.salesforce.op.test.TestSparkContext
import com.salesforce.op.utils.io.avro.AvroInOut._
Expand Down Expand Up @@ -96,14 +97,15 @@ class AvroInOutTest extends FlatSpec with TestSparkContext {
}

it should "checkPathsExist" in {
val f1 = new File("/tmp/avroinouttest")
val tmpDir = Paths.get(File.separator, "tmp").toFile
val f1 = new File(tmpDir, "avroinouttest")
f1.delete()
val w = new FileWriter(f1)
w.write("just checking")
w.close()
val f2 = new File("/tmp/thisfilecannotexist")
val f2 = new File(tmpDir, "thisfilecannotexist")
f2.delete()
val f3 = new File("/tmp/this file cannot exist")
val f3 = new File(tmpDir, "this file cannot exist")
f3.delete()
assume(f1.exists && !f2.exists && !f3.exists)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ class RichRDDTest extends PropSpec with PropertyChecks with TestSparkContext {

property("save as a text file") {
forAll(data) { rdd =>
val out = new File(tempDir + "/op-richrdd-" + DateTime.now().getMillis).toString
val out = new File(tempDir, "op-richrdd-" + DateTime.now().getMillis).toString
rdd.saveAsTextFile(out, None, new JobConf(rdd.context.hadoopConfiguration))
spark.read.textFile(out).count() shouldBe rdd.count()
}
}
property("save as a compressed text file") {
forAll(data) { rdd =>
val out = new File(tempDir + "/op-richrdd-" + DateTime.now().getMillis).toString
val out = new File(tempDir, "op-richrdd-" + DateTime.now().getMillis).toString
rdd.saveAsTextFile(out, Some(classOf[DefaultCodec]), new JobConf(rdd.context.hadoopConfiguration))
spark.read.textFile(out).count() shouldBe rdd.count()
}
Expand Down

0 comments on commit fbb19d7

Please sign in to comment.