Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Support Multiple Spark versions in the same jar #355

Closed
tgravescs opened this issue Jul 14, 2020 · 5 comments · Fixed by #414
Closed

[FEA] Support Multiple Spark versions in the same jar #355

tgravescs opened this issue Jul 14, 2020 · 5 comments · Fixed by #414
Assignees
Labels
feature request New feature or request P0 Must have for release

Comments

@tgravescs
Copy link
Collaborator

It would be nice if we can have a single spark rapids jar that supports multiple versions of Spark (spark 3.0, spark 3.1, etc), including different distributions based on the same Spark version (GCP, Databricks, AWS, etc)

Types of incompatibilities it would need ti handle:

  • Simple change in function definition in Spark
  • Classes take different parameters
  • Classes move packages - harder if we inherit from them
  • Catalyst expressions/execs/etc removed or added

Proposal to handle:
Create a ShimLoader class that looks at the spark version and then loads the proper rapids subclass. This can have multiple different classes. For instance a SparkShim layer that is wrapper around simple function calls, we could have a loader for different classes that share a base class. For instance GpuBroadcastNestedLoopJoinExec could have a GpuBroadcastNestedLoopJoinExecBase class and then specific versions GpuBroadcastNestedLoopJoinExec30 and GpuBroadcastNestedLoopJoinExec31.

Each Spark version then gets its own submodule where all the specific classes for Spark3.0, Spark 3.1, Spark 3.0 databricks etc. all live. The Loader will look at the version and then load the corresponding class.

Some spark distributions make things a bit harder in that we can’t compile it at the same time as Spark 3.0 and spark 3.1. So for those jars we will have to build them separately and ideally pull them as a dependency to package up and distribute a single jar. The Apache spark 3.0 and spark 3.1 jars can be built in a single build.

To handle expressions and exec being added and removed we can have the GpuOverrides call down into the ShimLoader to get any version specific ones. So for instance TimeSub in Spark 3.1 got changed to TimeAdd. The GpuOverrides definition of exprs would leave it out but then call into the shim layer to get specific versions (++ ShimLoader.getSparkShims.getExecs).

@tgravescs tgravescs added feature request New feature or request ? - Needs Triage Need team to review and classify labels Jul 14, 2020
@tgravescs
Copy link
Collaborator Author

Some specific things that have changed in Spark 3.1 that we need to handle:

  • spark3.1 BuildRight,BuildLeft,BuildSide moved packages and they reorganized base class of HashJoin. We inherit from HashJoin so it requires us to move that into the shim and make GpuHashJoin specific versions for Spark 3.0 and Spark 3.1.
  • TimeSub changed to TimeAdd in Spark 3.1
  • ShuffleWriter interface changing getReaderForRange moved to GetReader
  • Databricks also has some functions with names and return types changed, we are currently using reflection for those, but could potentially be moved to shim layer.
  • DatasourceScanExec trait added maxMetadataValueLength. This will need to move into the Spark version specific module
  • FileSourceScanExec added a parameter of optionalNumCoalescedBuckets

@tgravescs tgravescs removed the ? - Needs Triage Need team to review and classify label Jul 14, 2020
@tgravescs tgravescs self-assigned this Jul 14, 2020
@tgravescs
Copy link
Collaborator Author

tgravescs commented Jul 14, 2020

I have a prototype working and this is what the api and high level design look like:

Directory structure for modules, we keep the existing modules and just add version specific ones:

dist
integration_tests
shuffle-plugin
spark30 *** new
spark31 *** new
spark30databricks *** new
sql-plugin
target
tests

The new modules need a dependency on the sql-plugin module and each module builds against the appropriate version of Spark. spark30 - pom file uses spark.version=3.0.0 and spark31 uses spark.version=3.1.0-SNAPSHOT. All those modules create their own jar and then in our dist/ module we package them all together into a rapids-4-spark_2.12-{version}.jar. This does require that each module use its own package name or separate class names. for instance com.nvidia.spark.rapids.shims.spark30 vs com.nvidia.spark.rapids.shims.spark31, or we could use make sure class names are different: GpuShuffledHashJoinExec30 vs GpuShuffledHashJoinExec31.

For Spark distribution specific - Databricks, GCP, AWS, we will have to build directly against their jars. So those submodules would be built against those jars and then pulled as dependencies. The combined jar can then be run through integration tests on each of the versions and distributions.

The high level design of the Shimloader:

object ShimLoader  {

  val SPARK30DATABRICKSSVERSIONNAME = "3.0.0-databricks"
  val SPARK30VERSIONNAME = "3.0.0"
  val SPARK31VERSIONNAME = "3.1.0-SNAPSHOT"

  private val SPARK_SHIM_CLASSES = HashMap(
    SPARK30VERSIONNAME -> "com.nvidia.spark.rapids.shims.Spark30Shims",
    SPARK30DATABRICKSSVERSIONNAME -> "com.nvidia.spark.rapids.shims.Spark300DatabricksShims",
    SPARK31VERSIONNAME -> "com.nvidia.spark.rapids.shims.Spark31Shims",
  )
  def getSparkShims: SparkShims = {
    if (sparkShims == null) {
      sparkShims = loadShims(SPARK_SHIM_CLASSES, classOf[SparkShims])
    }
    sparkShims
  }

  private def loadShims[T](classMap: Map[String, String], xface: Class[T]): T = {
    val vers = getVersion();
    val className = classMap.get(vers)
    if (className.isEmpty) {
      throw new Exception(s"No shim layer for $vers")
    }
    createShim(className.get, xface)
  }

  private def createShim[T](className: String, xface: Class[T]): T = try {
    val clazz = Class.forName(className)
    val res = clazz.newInstance().asInstanceOf[T]
    res
  } catch {
    case e: Exception => throw new RuntimeException("Could not load shims in class " + className, e)
  }

  def getVersion(): String = {  
    if (checkIfDatabricks()) {
        SPARK_VERSION + "-databricks"
    } else {
      SPARK_VERSION
    }
  }

The base SparkShims trait which each version would implement would be something like this:

trait SparkShims {

  def isGpuHashJoin(plan: SparkPlan): Boolean
  def getBuildSide(join: ShuffledHashJoinExec): GpuBuildSide
  def getExprs: Seq[ExprRule[_ <: Expression]]
  def getExecs: Seq[ExecRule[_ <: SparkPlan]]

}

Example implementation for Spark 3.0 would look like:

class Spark30Shims extends SparkShims with Logging {

  def isGpuHashJoin(plan: SparkPlan): Boolean = {
    plan match {
      case _: GpuHashJoin30 => true
      case p => false
    }
  }

  def getExecs: Seq[ExecRule[_ <: SparkPlan]] = {
    Seq(
    GpuOverrides.exec[SortMergeJoinExec](
      "Sort merge join, replacing with shuffled hash join",
      (join, conf, p, r) => new GpuSortMergeJoinMeta30(join, conf, p, r)),
    GpuOverrides.exec[BroadcastHashJoinExec](
      "Implementation of join using broadcast data",
      (join, conf, p, r) => new GpuBroadcastHashJoinMeta30(join, conf, p, r)),
    GpuOverrides.exec[ShuffledHashJoinExec](
      "Implementation of join using hashed shuffled data",
      (join, conf, p, r) => new GpuShuffledHashJoinMeta30(join, conf, p, r)),
    )
  }

Examples of how this would be called from the sql-plugin code:

case bkj: BroadcastHashJoinExec => ShimLoader.getSparkShims.getBuildSide(bkj)
GpuOverrides would have the common expressions and then call into Shim to get specific ones:

  val execs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = (Seq(
    exec[GenerateExec] (
      "The backend for operations that generate more output rows than input rows like explode.",
      (gen, conf, p, r) => new GpuGenerateExecSparkPlanMeta(gen, conf, p, r)),
    exec[ProjectExec](
      "The backend for most select, withColumn and dropColumn statements",
      (proj, conf, p, r) => {
        new SparkPlanMeta[ProjectExec](proj, conf, p, r) {
          override def convertToGpu(): GpuExec =
            GpuProjectExec(childExprs.map(_.convertToGpu()), childPlans(0).convertIfNeeded())
        }
      }),
    exec[BatchScanExec](
      "The backend for most file input",
      (p, conf, parent, r) => new SparkPlanMeta[BatchScanExec](p, conf, parent, r) {
        override val childScans: scala.Seq[ScanMeta[_]] =
          Seq(GpuOverrides.wrapScan(p.scan, conf, Some(this)))

        override def convertToGpu(): GpuExec =
          GpuBatchScanExec(p.output, childScans(0).convertToGpu())
      }),
...
...  rest of the common execs....
...
  ) ++ShimLoader.getSparkShims.getExecs).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap

Here you notice the ++ ShimLoader.getSparkShims.getExecs call where it would add in any specific execs for the version of Spark you are running against

@jlowe
Copy link
Member

jlowe commented Jul 16, 2020

Would it make sense to organize the various shims into their own submodule, e.g.: spark-shims or just shims? I see the following benefits:

  • top-level directory doesn't get cluttered with shims as we support more Spark versions
  • dist can depend on the parent shims artifact to pull in all shims, so adding a new shim only needs to update the parent shim pom.

@tgravescs
Copy link
Collaborator Author

The problem with putting them into another submodule is you run into cyclic dependency issues. I started out having them in their own and it caused a lot of issues. unless you had specific ideas.

  • spark-shims needs base classes from sql-plugin because you move version specific classes in there.
    -sql-plugin needs access to shimLoader from GpuOverrides and a few other places in order to add them to the list

Now, I had things mostly working by pulling GpuOverrides and a few other classes out and move them into the spark-shim layer itself but there were still a few issues to resolve and changeing this approach made that problem go away entirely.

@jlowe
Copy link
Member

jlowe commented Jul 17, 2020

There already is a circular dependency in this design, and it only works because this design uses reflection to avoid the explicit dependency. I think we could solve the issue a bit cleaner like this:

  • spark-shims is just an aggregating parent module. It provides no code itself, rather just packages its child modules into a jar.
  • Each shim project underneath spark-shims depends on sql-plugin as it does in the original design.
  • sql-plugin uses ServiceLoader to load the shim probes, polling each probe instance until it finds one that is willing to claim it supports the current version of Spark.

For example, sql-plugin would provide a probe or detector interface, something like this:

trait ShimProbe {
  def buildSparkShim(sparkVersion: String): Option[SparkShim]
}

sql-plugin would then use ServiceLoader to load all dynamically detected shim probes, querying each one in turn until it found one that doesn't return None when it tries to build a shim. The probe class intentionally is lightweight and doesn't need to instantiate a full shim instance (and all dependent class instances) to answer the question of whether this Spark version is supported. Each shim project can then provide a specific probe and shim for that Spark version along with the META-INF/services/ entry for their probe class so the service loader can find them. The nice thing is then each shim project is isolated, and adding a new shim only updates the parent shim project to add it as a child and everything else is contained within that specific shim project.

@sameerz sameerz added the P0 Must have for release label Jul 22, 2020
tgravescs pushed a commit to tgravescs/spark-rapids that referenced this issue Nov 30, 2023
Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request P0 Must have for release
Projects
None yet
3 participants