Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49249][SPARK-49122][Connect][SQL] Add addArtifact API to the Spark SQL Core #47631

Closed
wants to merge 14 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -390,77 +390,30 @@ class SparkSession private[sql] (
execute(command)
}

/**
* Add a single artifact to the client session.
*
* Currently only local files with extensions .jar and .class are supported.
*
* @since 3.4.0
*/
/** @inheritdoc */
@Experimental
def addArtifact(path: String): Unit = client.addArtifact(path)
override def addArtifact(path: String): Unit = client.addArtifact(path)

/**
* Add a single artifact to the client session.
*
* Currently it supports local files with extensions .jar and .class and Apache Ivy URIs
*
* @since 3.4.0
*/
/** @inheritdoc */
@Experimental
def addArtifact(uri: URI): Unit = client.addArtifact(uri)
override def addArtifact(uri: URI): Unit = client.addArtifact(uri)

/**
* Add a single in-memory artifact to the session while preserving the directory structure
* specified by `target` under the session's working directory of that particular file
* extension.
*
* Supported target file extensions are .jar and .class.
*
* ==Example==
* {{{
* addArtifact(bytesBar, "foo/bar.class")
* addArtifact(bytesFlat, "flat.class")
* // Directory structure of the session's working directory for class files would look like:
* // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class
* // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class
* }}}
*
* @since 4.0.0
*/
/** @inheritdoc */
@Experimental
def addArtifact(bytes: Array[Byte], target: String): Unit = client.addArtifact(bytes, target)
override def addArtifact(bytes: Array[Byte], target: String): Unit = {
client.addArtifact(bytes, target)
}

/**
* Add a single artifact to the session while preserving the directory structure specified by
* `target` under the session's working directory of that particular file extension.
*
* Supported target file extensions are .jar and .class.
*
* ==Example==
* {{{
* addArtifact("/Users/dummyUser/files/foo/bar.class", "foo/bar.class")
* addArtifact("/Users/dummyUser/files/flat.class", "flat.class")
* // Directory structure of the session's working directory for class files would look like:
* // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class
* // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class
* }}}
*
* @since 4.0.0
*/
/** @inheritdoc */
@Experimental
def addArtifact(source: String, target: String): Unit = client.addArtifact(source, target)
override def addArtifact(source: String, target: String): Unit = {
client.addArtifact(source, target)
}

/**
* Add one or more artifacts to the session.
*
* Currently it supports local files with extensions .jar and .class and Apache Ivy URIs
*
* @since 3.4.0
*/
/** @inheritdoc */
@Experimental
@scala.annotation.varargs
def addArtifacts(uri: URI*): Unit = client.addArtifacts(uri)
override def addArtifacts(uri: URI*): Unit = client.addArtifacts(uri)

/**
* Register a ClassFinder for dynamically generated classes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import java.nio.file.Paths
import ammonite.repl.api.Session
import ammonite.runtime.SpecialClassLoader

import org.apache.spark.sql.Artifact

/**
* A special [[ClassFinder]] for the Ammonite REPL to handle in-memory class files.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.commons.codec.digest.DigestUtils.sha256Hex
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.connect.proto.AddArtifactsRequest
import org.apache.spark.sql.Artifact
import org.apache.spark.sql.connect.client.SparkConnectClient.Configuration
import org.apache.spark.sql.test.ConnectFunSuite
import org.apache.spark.util.IvyTestUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,15 +282,11 @@ object CheckConnectJvmClientCompatibility {
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.rddToDatasetHolder"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.session"),

// Artifact Manager
// Artifact Manager, client has a totally different implementation.
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.artifact.ArtifactManager"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.artifact.ArtifactManager$"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.artifact.util.ArtifactUtils"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.artifact.util.ArtifactUtils$"),

// UDFRegistration
ProblemFilters.exclude[DirectMissingMethodProblem](
Expand Down Expand Up @@ -391,10 +387,6 @@ object CheckConnectJvmClientCompatibility {
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.apache.spark.sql.SparkSession.execute"),
// Experimental
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.apache.spark.sql.SparkSession.addArtifact"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.apache.spark.sql.SparkSession.addArtifacts"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.apache.spark.sql.SparkSession.registerClassFinder"),
// public
Expand Down
160 changes: 160 additions & 0 deletions sql/api/src/main/scala/org/apache/spark/sql/Artifact.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import java.io.{ByteArrayInputStream, InputStream, PrintStream}
import java.net.URI
import java.nio.file.{Files, Path, Paths}

import org.apache.commons.lang3.StringUtils

import org.apache.spark.sql.Artifact.LocalData
import org.apache.spark.sql.util.ArtifactUtils
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.MavenUtils


private[sql] class Artifact private(val path: Path, val storage: LocalData) {
require(!path.isAbsolute, s"Bad path: $path")

lazy val size: Long = storage match {
case localData: LocalData => localData.size
}
}

private[sql] object Artifact {
val CLASS_PREFIX: Path = Paths.get("classes")
val JAR_PREFIX: Path = Paths.get("jars")
val CACHE_PREFIX: Path = Paths.get("cache")

def newArtifactFromExtension(
fileName: String,
targetFilePath: Path,
storage: LocalData): Artifact = {
fileName match {
case jar if jar.endsWith(".jar") =>
newJarArtifact(targetFilePath, storage)
case cf if cf.endsWith(".class") =>
newClassArtifact(targetFilePath, storage)
case other =>
throw new UnsupportedOperationException(s"Unsupported file format: $other")
}
}

def parseArtifacts(uri: URI): Seq[Artifact] = {
// Currently only local files with extensions .jar and .class are supported.
uri.getScheme match {
case "file" =>
val path = Paths.get(uri)
val artifact = Artifact.newArtifactFromExtension(
path.getFileName.toString,
path.getFileName,
new LocalFile(path))
Seq[Artifact](artifact)

case "ivy" =>
newIvyArtifacts(uri)

case other =>
throw new UnsupportedOperationException(s"Unsupported scheme: $other")
}
}

def newJarArtifact(targetFilePath: Path, storage: LocalData): Artifact = {
newArtifact(JAR_PREFIX, ".jar", targetFilePath, storage)
}

def newClassArtifact(targetFilePath: Path, storage: LocalData): Artifact = {
newArtifact(CLASS_PREFIX, ".class", targetFilePath, storage)
}

def newCacheArtifact(id: String, storage: LocalData): Artifact = {
newArtifact(CACHE_PREFIX, "", Paths.get(id), storage)
}

def newIvyArtifacts(uri: URI): Seq[Artifact] = {
implicit val printStream: PrintStream = System.err

val authority = uri.getAuthority
if (authority == null) {
throw new IllegalArgumentException(
s"Invalid Ivy URI authority in uri ${uri.toString}:" +
" Expected 'org:module:version', found null.")
}
if (authority.split(":").length != 3) {
throw new IllegalArgumentException(
s"Invalid Ivy URI authority in uri ${uri.toString}:" +
s" Expected 'org:module:version', found $authority.")
}

val (transitive, exclusions, repos) = MavenUtils.parseQueryParams(uri)

val exclusionsList: Seq[String] =
if (!StringUtils.isBlank(exclusions)) {
exclusions.split(",").toImmutableArraySeq
} else {
Nil
}

val ivySettings = MavenUtils.buildIvySettings(Some(repos), None)

val jars = MavenUtils.resolveMavenCoordinates(
authority,
ivySettings,
transitive = transitive,
exclusions = exclusionsList)
jars.map(p => Paths.get(p)).map(path => newJarArtifact(path.getFileName, new LocalFile(path)))
}

private def newArtifact(
prefix: Path,
requiredSuffix: String,
targetFilePath: Path,
storage: LocalData): Artifact = {
require(targetFilePath.toString.endsWith(requiredSuffix))
new Artifact(ArtifactUtils.concatenatePaths(prefix, targetFilePath), storage)
}

/**
* Payload stored on this machine.
*/
sealed trait LocalData {
def stream: InputStream

def size: Long
}

/**
* Payload stored in a local file.
*/
class LocalFile(val path: Path) extends LocalData {
override def size: Long = Files.size(path)

override def stream: InputStream = Files.newInputStream(path)
}

/**
* Payload stored in memory.
*/
class InMemory(bytes: Array[Byte]) extends LocalData {
override def size: Long = bytes.length

override def stream: InputStream = new ByteArrayInputStream(bytes)
}

}
55 changes: 55 additions & 0 deletions sql/api/src/main/scala/org/apache/spark/sql/api/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.reflect.runtime.universe.TypeTag

import _root_.java.io.Closeable
import _root_.java.lang
import _root_.java.net.URI
import _root_.java.util

import org.apache.spark.annotation.{DeveloperApi, Experimental}
Expand Down Expand Up @@ -312,6 +313,60 @@ abstract class SparkSession[DS[U] <: Dataset[U, DS]] extends Serializable with C
*/
def sql(sqlText: String): DS[Row] = sql(sqlText, Map.empty[String, Any])

/**
* Add a single artifact to the current session.
*
* Currently only local files with extensions .jar and .class are supported.
*
* @since 4.0.0
*/
@Experimental
def addArtifact(path: String): Unit

/**
* Add a single artifact to the current session.
*
* Currently it supports local files with extensions .jar and .class and Apache Ivy URIs.
*
* @since 4.0.0
*/
@Experimental
def addArtifact(uri: URI): Unit

@Experimental
def addArtifact(bytes: Array[Byte], target: String): Unit

/**
* Add a single artifact to the session while preserving the directory structure specified by
* `target` under the session's working directory of that particular file extension.
*
* Supported target file extensions are .jar and .class.
*
* ==Example==
* {{{
* addArtifact("/Users/dummyUser/files/foo/bar.class", "foo/bar.class")
* addArtifact("/Users/dummyUser/files/flat.class", "flat.class")
* // Directory structure of the session's working directory for class files would look like:
* // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class
* // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class
* }}}
*
* @since 4.0.0
*/
@Experimental
def addArtifact(source: String, target: String): Unit

/**
* Add one or more artifacts to the session.
*
* Currently it supports local files with extensions .jar and .class and Apache Ivy URIs
*
* @since 4.0.0
*/
@Experimental
@scala.annotation.varargs
def addArtifacts(uri: URI*): Unit

/**
* Executes some code block and prints to stdout the time taken to execute the block. This is
* available in Scala only and is used primarily for interactive testing and debugging.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.artifact.util
package org.apache.spark.sql.util

import java.nio.file.{Path, Paths}

Expand Down
Loading