Skip to content

Commit

Permalink
Comvert all the struct field inside 'spec' column for cluster_snapsho…
Browse files Browse the repository at this point in the history
…t_bronze to mapType
  • Loading branch information
souravbaner-da committed Jun 11, 2024
1 parent da14f88 commit a5c8b54
Showing 1 changed file with 17 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -552,45 +552,35 @@ trait BronzeTransforms extends SparkSessionWrapper {

val rawDF = deriveRawApiResponseDF(spark.read.json(tmpClusterSnapshotSuccessPath))
if (rawDF.columns.contains("cluster_id")) {
val outputDF = SchemaScrubber.scrubSchema(rawDF)
val finalDF = outputDF.withColumn("default_tags", SchemaTools.structToMap(outputDF, "default_tags"))
.withColumn("custom_tags", SchemaTools.structToMap(outputDF, "custom_tags"))
.withColumn("spark_conf", SchemaTools.structToMap(outputDF, "spark_conf"))
.withColumn("spark_env_vars", SchemaTools.structToMap(outputDF, "spark_env_vars"))
.withColumn(s"aws_attributes", SchemaTools.structToMap(outputDF, s"aws_attributes"))
.withColumn(s"azure_attributes", SchemaTools.structToMap(outputDF, s"azure_attributes"))
.withColumn(s"gcp_attributes", SchemaTools.structToMap(outputDF, s"gcp_attributes"))
val scrubbedDF = SchemaScrubber.scrubSchema(rawDF)
val df = scrubbedDF.withColumn("default_tags", SchemaTools.structToMap(scrubbedDF, "default_tags"))
.withColumn("custom_tags", SchemaTools.structToMap(scrubbedDF, "custom_tags"))
.withColumn("spark_conf", SchemaTools.structToMap(scrubbedDF, "spark_conf"))
.withColumn("spark_env_vars", SchemaTools.structToMap(scrubbedDF, "spark_env_vars"))
.withColumn(s"aws_attributes", SchemaTools.structToMap(scrubbedDF, s"aws_attributes"))
.withColumn(s"azure_attributes", SchemaTools.structToMap(scrubbedDF, s"azure_attributes"))
.withColumn(s"gcp_attributes", SchemaTools.structToMap(scrubbedDF, s"gcp_attributes"))
.withColumn("organization_id", lit(config.organizationId))
.verifyMinimumSchema(clusterSnapMinimumSchema)

val explodedDF = finalDF
.withColumnRenamed("custom_tags", "custom_tags_old")
.selectExpr("*", "spec.custom_tags")

val normalizedDf = explodedDF.withColumn("custom_tags", SchemaTools.structToMap(explodedDF, "custom_tags"))

// Replace the custom_tags field inside the spec struct with custom_tags outside of spec column
val updatedDf = normalizedDf.schema.fields.find(_.name == "spec") match {
val finalDF = df.schema.fields.find(_.name == "spec") match {
case Some(field) =>
field.dataType match {
case structType: StructType =>
// Create a new struct expression, replacing the specified field with the new column
val newFields = structType.fields.map { f =>
if (f.name.equalsIgnoreCase("custom_tags")) {
col("custom_tags").as("custom_tags") // Replace with new column if names match
} else {
col(s"spec.${f.name}") // Keep existing fields as is
f.dataType match {
case _: StructType => SchemaTools.structToMap(df, s"spec.${f.name}").as(f.name)
case _ => col(s"spec.${f.name}").as(f.name)
}
}
// Update the DataFrame with the new struct replacing the old one
normalizedDf.withColumn("spec", struct(newFields: _*))
case _ => normalizedDf // No action if the specified structColName is not a struct type
df.withColumn("spec", struct(newFields: _*))
case _ => df
}
case None => normalizedDf // No action if the specified structColName does not exist
case None => df
}

updatedDf.drop("custom_tags")
.withColumnRenamed("custom_tags_old", "custom_tags")
finalDF
.verifyMinimumSchema(clusterSnapMinimumSchema)

} else {
throw new NoNewDataException(msg, Level.WARN, true)
Expand Down

0 comments on commit a5c8b54

Please sign in to comment.