Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0812 release #1249

Merged
merged 5 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name := "overwatch"

organization := "com.databricks.labs"

version := "0.8.1.1"
version := "0.8.1.2"

scalaVersion := "2.12.12"
scalacOptions ++= Seq("-Xmax-classfile-name", "78")
Expand Down
139 changes: 139 additions & 0 deletions src/main/resources/AWS_Instance_Details.csv

Large diffs are not rendered by default.

23 changes: 22 additions & 1 deletion src/main/resources/Gcp_Instance_Details.csv
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,25 @@ n2d-standard-64,n2d-standard-64,64,256.0,6.336,6.336,11.52
n2d-standard-8,n2d-standard-8,8,32.0,0.792,0.792,1.44
n2d-standard-80,n2d-standard-80,80,320.0,7.92,7.92,14.4
n2d-standard-96,n2d-standard-96,96,384.0,9.504,9.504,17.28
a2-megagpu-16g,a2-megagpu-16g,96,1360.0,35.112,35.112,63.84
a2-megagpu-16g,a2-megagpu-16g,96,1360.0,35.112,35.112,63.84
a2-ultragpu-1g,a2-ultragpu-1g,12,170.0,4.389,4.389,7.98
a2-ultragpu-2g,a2-ultragpu-2g,24,340.0,8.778,8.778,15.96
a2-ultragpu-4g,a2-ultragpu-4g,48,680.0,17.556,17.556,31.92
a2-ultragpu-8g,a2-ultragpu-8g,96,1360.0,35.112,35.112,63.84
e2_highcpu_16,e2_highcpu_16,16,16.0,1.188,1.188,2.16
e2_highmem_16,e2_highmem_16,16,128.0,2.112,2.112,3.84
e2_highmem_2,e2_highmem_2,2,16.0,0.264,0.264,0.48
e2_highmem_4,e2_highmem_4,4,32.0,0.528,0.528,0.96
e2_highmem_8,e2_highmem_8,8,64.0,1.056,1.056,1.92
e2_standard_16,e2_standard_16,16,64.0,1.584,1.584,2.88
e2_standard_32,e2_standard_32,32,128.0,3.168,3.168,5.76
e2_standard_4,e2_standard_4,4,16.0,0.396,0.396,0.72
e2_standard_8,e2_standard_8,8,32.0,0.792,0.792,1.44
g2-standard-12,g2-standard-12,12,48.0,2.376,2.376,4.32
g2-standard-16,g2-standard-16,16,64.0,3.168,3.168,5.76
g2-standard-24,g2-standard-24,24,96.0,4.752,4.752,8.64
g2-standard-32,g2-standard-32,32,128.0,6.336,6.336,11.52
g2-standard-4,g2-standard-4,4,16.0,0.792,0.792,1.44
g2-standard-48,g2-standard-48,48,192.0,9.504,9.504,17.28
g2-standard-8,g2-standard-8,8,32.0,1.584,1.584,2.88
g2-standard-96,g2-standard-96,96,384.0,19.008,19.008,34.56
Original file line number Diff line number Diff line change
Expand Up @@ -552,46 +552,17 @@ trait BronzeTransforms extends SparkSessionWrapper {

val rawDF = deriveRawApiResponseDF(spark.read.json(tmpClusterSnapshotSuccessPath))
if (rawDF.columns.contains("cluster_id")) {
val outputDF = SchemaScrubber.scrubSchema(rawDF)
val finalDF = outputDF.withColumn("default_tags", SchemaTools.structToMap(outputDF, "default_tags"))
.withColumn("custom_tags", SchemaTools.structToMap(outputDF, "custom_tags"))
.withColumn("spark_conf", SchemaTools.structToMap(outputDF, "spark_conf"))
.withColumn("spark_env_vars", SchemaTools.structToMap(outputDF, "spark_env_vars"))
.withColumn(s"aws_attributes", SchemaTools.structToMap(outputDF, s"aws_attributes"))
.withColumn(s"azure_attributes", SchemaTools.structToMap(outputDF, s"azure_attributes"))
.withColumn(s"gcp_attributes", SchemaTools.structToMap(outputDF, s"gcp_attributes"))
val scrubbedDF = SchemaScrubber.scrubSchema(rawDF)
val df = scrubbedDF.withColumn("default_tags", SchemaTools.structToMap(scrubbedDF, "default_tags"))
.withColumn("custom_tags", SchemaTools.structToMap(scrubbedDF, "custom_tags"))
.withColumn("spark_conf", SchemaTools.structToMap(scrubbedDF, "spark_conf"))
.withColumn("spark_env_vars", SchemaTools.structToMap(scrubbedDF, "spark_env_vars"))
.withColumn(s"aws_attributes", SchemaTools.structToMap(scrubbedDF, s"aws_attributes"))
.withColumn(s"azure_attributes", SchemaTools.structToMap(scrubbedDF, s"azure_attributes"))
.withColumn(s"gcp_attributes", SchemaTools.structToMap(scrubbedDF, s"gcp_attributes"))
.withColumn("organization_id", lit(config.organizationId))
.verifyMinimumSchema(clusterSnapMinimumSchema)

val explodedDF = finalDF
.withColumnRenamed("custom_tags", "custom_tags_old")
.selectExpr("*", "spec.custom_tags")

val normalizedDf = explodedDF.withColumn("custom_tags", SchemaTools.structToMap(explodedDF, "custom_tags"))

// Replace the custom_tags field inside the spec struct with custom_tags outside of spec column
val updatedDf = normalizedDf.schema.fields.find(_.name == "spec") match {
case Some(field) =>
field.dataType match {
case structType: StructType =>
// Create a new struct expression, replacing the specified field with the new column
val newFields = structType.fields.map { f =>
if (f.name.equalsIgnoreCase("custom_tags")) {
col("custom_tags").as("custom_tags") // Replace with new column if names match
} else {
col(s"spec.${f.name}") // Keep existing fields as is
}
}
// Update the DataFrame with the new struct replacing the old one
normalizedDf.withColumn("spec", struct(newFields: _*))
case _ => normalizedDf // No action if the specified structColName is not a struct type
}
case None => normalizedDf // No action if the specified structColName does not exist
}

updatedDf.drop("custom_tags")
.withColumnRenamed("custom_tags_old", "custom_tags")

.drop("spec")
df.verifyMinimumSchema(clusterSnapMinimumSchema)
} else {
throw new NoNewDataException(msg, Level.WARN, true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1325,10 +1325,10 @@ object WorkflowsTransforms extends SparkSessionWrapper {
clusterStateEndOrPipelineEnd.alias("unixTimeMS_state_end"), // if clusterState still open -- close it for calculations
'timestamp_state_start,
'timestamp_state_end, 'state, 'cloud_billable, 'databricks_billable, 'uptime_in_state_H, 'current_num_workers, 'target_num_workers,
$"driverSpecs.API_Name".alias("driver_node_type_id"),
coalesce('driver_node_type_id, $"driverSpecs.API_Name").alias("driver_node_type_id"),
$"driverSpecs.Compute_Contract_Price".alias("driver_compute_hourly"),
$"driverSpecs.Hourly_DBUs".alias("driver_dbu_hourly"),
$"workerSpecs.API_Name".alias("node_type_id"),
coalesce('node_type_id, $"workerSpecs.API_Name").alias("node_type_id"),
$"workerSpecs.Compute_Contract_Price".alias("worker_compute_hourly"),
$"workerSpecs.Hourly_DBUs".alias("worker_dbu_hourly"),
$"workerSpecs.vCPUs".alias("worker_cores"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ class SchemaScrubber(
s"DUPLICATE FIELDS:\n" +
s"${dups.mkString("\n")}"
logger.log(Level.WARN, warnMsg)
val counterMap = scala.collection.mutable.Map[String, Int]().withDefaultValue(0)
fields.map(f => {
val fieldName = if (caseSensitive) f.sanitizedField.name else f.sanitizedField.name.toLowerCase
val fieldName = if (caseSensitive) f.sanitizedField.name.trim else f.sanitizedField.name.toLowerCase.trim
if (dups.contains(fieldName)) {
val generatedUniqueName = f.sanitizedField.name + "_UNIQUESUFFIX_" + f.originalField.name.hashCode.toString
counterMap(fieldName) += 1
val generatedUniqueName = f.sanitizedField.name.trim + "_UNIQUESUFFIX_" + f.originalField.name.trim.hashCode.toString + "_" + counterMap(fieldName)
val uniqueColumnMapping = s"\n${f.originalField.name} --> ${generatedUniqueName}"
logger.log(Level.WARN, uniqueColumnMapping)
f.sanitizedField.copy(name = generatedUniqueName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ class SchemaToolsTest extends AnyFunSpec with SparkSessionTestWrapper with Given

val expectedResString = "`b_2_2_2` STRUCT<`abc`: STRING, `c_1__45`: BIGINT>,`exception_parent` " +
"STRUCT<`dup1`: BIGINT, `dup2`: BIGINT, `xyz`: STRUCT<`_mixed`: BIGINT, `_bad`: BIGINT, " +
"`dup1_UNIQUESUFFIX_95946320`: BIGINT, `dup1_UNIQUESUFFIX_95946320`: BIGINT, `dup2_UNIQUESUFFIX_3095059`: " +
"BIGINT, `dup2_UNIQUESUFFIX_3095059`: STRING, `good_col`: BIGINT, `jkl`: BIGINT, `otherexcept`: BIGINT>, " +
"`dup1_UNIQUESUFFIX_95946320_1`: BIGINT, `dup1_UNIQUESUFFIX_95946320_2`: BIGINT, `dup2_UNIQUESUFFIX_3095059_1`: " +
"BIGINT, `dup2_UNIQUESUFFIX_3095059_2`: STRING, `good_col`: BIGINT, `jkl`: BIGINT, `otherexcept`: BIGINT>, " +
"`zyx`: BIGINT>,`i_1` BIGINT,`parentwspace` STRING,`validParent` STRING"
val ddlFromLogic = df.scrubSchema(exceptionScrubber).schema.toDDL
assertResult(expectedResString) {
Expand Down