diff --git a/build.sbt b/build.sbt index 56fa8ce08..be73e8bb5 100644 --- a/build.sbt +++ b/build.sbt @@ -2,7 +2,7 @@ name := "overwatch" organization := "com.databricks.labs" -version := "0.7.1.0" +version := "0.7.1.1" scalaVersion := "2.12.12" scalacOptions ++= Seq("-Xmax-classfile-name", "78") diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/BronzeTransforms.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/BronzeTransforms.scala index 67817090d..b7b618f44 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/BronzeTransforms.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/BronzeTransforms.scala @@ -15,6 +15,7 @@ import org.apache.log4j.{Level, Logger} import org.apache.spark.eventhubs.{ConnectionStringBuilder, EventHubsConf, EventPosition} import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.LongType import org.apache.spark.sql.{AnalysisException, Column, DataFrame} import org.apache.spark.util.SerializableConfiguration @@ -834,10 +835,20 @@ trait BronzeTransforms extends SparkSessionWrapper { .otherwise(col("Stage Attempt ID")) } else col("Stage Attempt ID") + // raw data contains both "Executor ID" and "executorId" at root for different events + val executorIdOverride: Column = if(baseEventsDF.columns.contains("Executor ID")) { + if (baseEventsDF.columns.contains("executorId")) { // blacklisted executor ids cannot exist if executor ids do not + concat(col("Executor ID"), 'executorId) + } else col("Executor ID") + } else { // handle missing Executor ID field + lit(null).cast("long") + } + val bronzeSparkEventsScrubber = getSparkEventsSchemaScrubber(baseEventsDF) val rawScrubbed = if (baseEventsDF.columns.count(_.toLowerCase().replace(" ", "") == "stageid") > 1) { baseEventsDF + .withColumn("Executor ID", executorIdOverride) .withColumn("progress", progressCol) .withColumn("filename", input_file_name) .withColumn("pathSize", size(split('filename, "/"))) @@ -845,17 +856,18 @@ trait BronzeTransforms extends SparkSessionWrapper { .withColumn("clusterId", split('filename, "/")('pathSize - lit(5))) .withColumn("StageID", stageIDColumnOverride) .withColumn("StageAttemptID", stageAttemptIDColumnOverride) - .drop("pathSize", "Stage ID", "stageId", "Stage Attempt ID", "stageAttemptId") + .drop("pathSize", "executorId", "Stage ID", "stageId", "Stage Attempt ID", "stageAttemptId") .withColumn("filenameGroup", groupFilename('filename)) .scrubSchema(bronzeSparkEventsScrubber) } else { baseEventsDF + .withColumn("Executor ID", executorIdOverride) .withColumn("progress", progressCol) .withColumn("filename", input_file_name) .withColumn("pathSize", size(split('filename, "/"))) .withColumn("SparkContextId", split('filename, "/")('pathSize - lit(2))) .withColumn("clusterId", split('filename, "/")('pathSize - lit(5))) - .drop("pathSize") + .drop("pathSize", "executorId") .withColumn("filenameGroup", groupFilename('filename)) .scrubSchema(bronzeSparkEventsScrubber) } @@ -866,7 +878,6 @@ trait BronzeTransforms extends SparkSessionWrapper { rawScrubbed.withColumn("Properties", SchemaTools.structToMap(rawScrubbed, "Properties")) .withColumn("modifiedConfigs", SchemaTools.structToMap(rawScrubbed, "modifiedConfigs")) .withColumn("extraTags", SchemaTools.structToMap(rawScrubbed, "extraTags")) - .withColumnRenamed("executorId", "blackListedExecutorIds") .join(eventLogsDF, Seq("filename")) .withColumn("organization_id", lit(organizationId)) .withColumn("Properties", expr("map_filter(Properties, (k,v) -> k not in ('sparkexecutorextraClassPath'))"))