Skip to content

Commit

Permalink
686 - SparkEvents Executor ID Schema Handler
Browse files Browse the repository at this point in the history
  • Loading branch information
GeekSheikh committed Jan 9, 2023
1 parent ed8c521 commit 01e2192
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name := "overwatch"

organization := "com.databricks.labs"

version := "0.7.1.0"
version := "0.7.1.1"

scalaVersion := "2.12.12"
scalacOptions ++= Seq("-Xmax-classfile-name", "78")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.apache.log4j.{Level, Logger}
import org.apache.spark.eventhubs.{ConnectionStringBuilder, EventHubsConf, EventPosition}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.{AnalysisException, Column, DataFrame}
import org.apache.spark.util.SerializableConfiguration

Expand Down Expand Up @@ -834,28 +835,39 @@ trait BronzeTransforms extends SparkSessionWrapper {
.otherwise(col("Stage Attempt ID"))
} else col("Stage Attempt ID")

// raw data contains both "Executor ID" and "executorId" at root for different events
val executorIdOverride: Column = if(baseEventsDF.columns.contains("Executor ID")) {
if (baseEventsDF.columns.contains("executorId")) { // blacklisted executor ids cannot exist if executor ids do not
concat(col("Executor ID"), 'executorId)
} else col("Executor ID")
} else { // handle missing Executor ID field
lit(null).cast("long")
}

val bronzeSparkEventsScrubber = getSparkEventsSchemaScrubber(baseEventsDF)

val rawScrubbed = if (baseEventsDF.columns.count(_.toLowerCase().replace(" ", "") == "stageid") > 1) {
baseEventsDF
.withColumn("Executor ID", executorIdOverride)
.withColumn("progress", progressCol)
.withColumn("filename", input_file_name)
.withColumn("pathSize", size(split('filename, "/")))
.withColumn("SparkContextId", split('filename, "/")('pathSize - lit(2)))
.withColumn("clusterId", split('filename, "/")('pathSize - lit(5)))
.withColumn("StageID", stageIDColumnOverride)
.withColumn("StageAttemptID", stageAttemptIDColumnOverride)
.drop("pathSize", "Stage ID", "stageId", "Stage Attempt ID", "stageAttemptId")
.drop("pathSize", "executorId", "Stage ID", "stageId", "Stage Attempt ID", "stageAttemptId")
.withColumn("filenameGroup", groupFilename('filename))
.scrubSchema(bronzeSparkEventsScrubber)
} else {
baseEventsDF
.withColumn("Executor ID", executorIdOverride)
.withColumn("progress", progressCol)
.withColumn("filename", input_file_name)
.withColumn("pathSize", size(split('filename, "/")))
.withColumn("SparkContextId", split('filename, "/")('pathSize - lit(2)))
.withColumn("clusterId", split('filename, "/")('pathSize - lit(5)))
.drop("pathSize")
.drop("pathSize", "executorId")
.withColumn("filenameGroup", groupFilename('filename))
.scrubSchema(bronzeSparkEventsScrubber)
}
Expand All @@ -866,7 +878,6 @@ trait BronzeTransforms extends SparkSessionWrapper {
rawScrubbed.withColumn("Properties", SchemaTools.structToMap(rawScrubbed, "Properties"))
.withColumn("modifiedConfigs", SchemaTools.structToMap(rawScrubbed, "modifiedConfigs"))
.withColumn("extraTags", SchemaTools.structToMap(rawScrubbed, "extraTags"))
.withColumnRenamed("executorId", "blackListedExecutorIds")
.join(eventLogsDF, Seq("filename"))
.withColumn("organization_id", lit(organizationId))
.withColumn("Properties", expr("map_filter(Properties, (k,v) -> k not in ('sparkexecutorextraClassPath'))"))
Expand Down

0 comments on commit 01e2192

Please sign in to comment.