From a5c8b5469eef3fcfd7629c2861e7e5f0c9a414bd Mon Sep 17 00:00:00 2001 From: "sourav.banerjee" Date: Tue, 11 Jun 2024 17:52:52 +0530 Subject: [PATCH] Comvert all the struct field inside 'spec' column for cluster_snapshot_bronze to mapType --- .../overwatch/pipeline/BronzeTransforms.scala | 44 +++++++------------ 1 file changed, 17 insertions(+), 27 deletions(-) diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/BronzeTransforms.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/BronzeTransforms.scala index fa0f6747c..6f24309f3 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/BronzeTransforms.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/BronzeTransforms.scala @@ -552,45 +552,35 @@ trait BronzeTransforms extends SparkSessionWrapper { val rawDF = deriveRawApiResponseDF(spark.read.json(tmpClusterSnapshotSuccessPath)) if (rawDF.columns.contains("cluster_id")) { - val outputDF = SchemaScrubber.scrubSchema(rawDF) - val finalDF = outputDF.withColumn("default_tags", SchemaTools.structToMap(outputDF, "default_tags")) - .withColumn("custom_tags", SchemaTools.structToMap(outputDF, "custom_tags")) - .withColumn("spark_conf", SchemaTools.structToMap(outputDF, "spark_conf")) - .withColumn("spark_env_vars", SchemaTools.structToMap(outputDF, "spark_env_vars")) - .withColumn(s"aws_attributes", SchemaTools.structToMap(outputDF, s"aws_attributes")) - .withColumn(s"azure_attributes", SchemaTools.structToMap(outputDF, s"azure_attributes")) - .withColumn(s"gcp_attributes", SchemaTools.structToMap(outputDF, s"gcp_attributes")) + val scrubbedDF = SchemaScrubber.scrubSchema(rawDF) + val df = scrubbedDF.withColumn("default_tags", SchemaTools.structToMap(scrubbedDF, "default_tags")) + .withColumn("custom_tags", SchemaTools.structToMap(scrubbedDF, "custom_tags")) + .withColumn("spark_conf", SchemaTools.structToMap(scrubbedDF, "spark_conf")) + .withColumn("spark_env_vars", SchemaTools.structToMap(scrubbedDF, "spark_env_vars")) + .withColumn(s"aws_attributes", SchemaTools.structToMap(scrubbedDF, s"aws_attributes")) + .withColumn(s"azure_attributes", SchemaTools.structToMap(scrubbedDF, s"azure_attributes")) + .withColumn(s"gcp_attributes", SchemaTools.structToMap(scrubbedDF, s"gcp_attributes")) .withColumn("organization_id", lit(config.organizationId)) .verifyMinimumSchema(clusterSnapMinimumSchema) - val explodedDF = finalDF - .withColumnRenamed("custom_tags", "custom_tags_old") - .selectExpr("*", "spec.custom_tags") - - val normalizedDf = explodedDF.withColumn("custom_tags", SchemaTools.structToMap(explodedDF, "custom_tags")) - - // Replace the custom_tags field inside the spec struct with custom_tags outside of spec column - val updatedDf = normalizedDf.schema.fields.find(_.name == "spec") match { + val finalDF = df.schema.fields.find(_.name == "spec") match { case Some(field) => field.dataType match { case structType: StructType => - // Create a new struct expression, replacing the specified field with the new column val newFields = structType.fields.map { f => - if (f.name.equalsIgnoreCase("custom_tags")) { - col("custom_tags").as("custom_tags") // Replace with new column if names match - } else { - col(s"spec.${f.name}") // Keep existing fields as is + f.dataType match { + case _: StructType => SchemaTools.structToMap(df, s"spec.${f.name}").as(f.name) + case _ => col(s"spec.${f.name}").as(f.name) } } - // Update the DataFrame with the new struct replacing the old one - normalizedDf.withColumn("spec", struct(newFields: _*)) - case _ => normalizedDf // No action if the specified structColName is not a struct type + df.withColumn("spec", struct(newFields: _*)) + case _ => df } - case None => normalizedDf // No action if the specified structColName does not exist + case None => df } - updatedDf.drop("custom_tags") - .withColumnRenamed("custom_tags_old", "custom_tags") + finalDF + .verifyMinimumSchema(clusterSnapMinimumSchema) } else { throw new NoNewDataException(msg, Level.WARN, true)