From 3055a22736211248fd3d21dbf158978e40faa287 Mon Sep 17 00:00:00 2001 From: Aman <91308367+aman-db@users.noreply.github.com> Date: Mon, 12 Aug 2024 22:59:13 +0530 Subject: [PATCH] 1218 warehouse state details (#1254) * test * code for warehouse_state_detail_silver * removed comments * adding warehouseEvents scope * added exception for table not found * added exception to check if system tables are getting used or not * enhance function getWarehousesEventDF * added code to fix max number of clusters * change in column names * refactored code --- .../overwatch/MultiWorkspaceDeployment.scala | 2 +- .../labs/overwatch/env/Workspace.scala | 36 ++ .../labs/overwatch/pipeline/Bronze.scala | 2 - .../pipeline/InitializerFunctions.scala | 1 + .../overwatch/pipeline/PipelineTargets.scala | 10 + .../labs/overwatch/pipeline/Silver.scala | 24 +- .../overwatch/pipeline/SilverTransforms.scala | 321 +++++++++++++++++- .../labs/overwatch/utils/Config.scala | 3 +- .../labs/overwatch/utils/Structures.scala | 4 +- 9 files changed, 390 insertions(+), 13 deletions(-) diff --git a/src/main/scala/com/databricks/labs/overwatch/MultiWorkspaceDeployment.scala b/src/main/scala/com/databricks/labs/overwatch/MultiWorkspaceDeployment.scala index 24c5de6a1..9913d1b22 100644 --- a/src/main/scala/com/databricks/labs/overwatch/MultiWorkspaceDeployment.scala +++ b/src/main/scala/com/databricks/labs/overwatch/MultiWorkspaceDeployment.scala @@ -122,7 +122,7 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper { val sqlComputerDBUPrice: Double = config.sql_compute_dbu_price val jobsLightDBUPrice: Double = config.jobs_light_dbu_price val customWorkspaceName: String = config.workspace_name - val standardScopes = "audit,sparkEvents,jobs,clusters,clusterEvents,notebooks,pools,accounts,dbsql,notebookCommands".split(",") + val standardScopes = OverwatchScope.toArray val scopesToExecute = (standardScopes.map(_.toLowerCase).toSet -- config.excluded_scopes.getOrElse("").split(":").map(_.toLowerCase).toSet).toArray diff --git a/src/main/scala/com/databricks/labs/overwatch/env/Workspace.scala b/src/main/scala/com/databricks/labs/overwatch/env/Workspace.scala index 5bf767505..5188ba0ef 100644 --- a/src/main/scala/com/databricks/labs/overwatch/env/Workspace.scala +++ b/src/main/scala/com/databricks/labs/overwatch/env/Workspace.scala @@ -17,6 +17,8 @@ import scala.concurrent.Future import scala.concurrent.forkjoin.ForkJoinPool import scala.util.{Failure, Success, Try} import scala.concurrent.ExecutionContext.Implicits.global +import java.time.LocalDateTime +import java.time.format.DateTimeFormatter /** @@ -422,6 +424,40 @@ class Workspace(config: Config) extends SparkSessionWrapper { addReport } + /** + * Fetch the warehouse event data from system.compute.warehouse_events + * @param fromTime : from time to fetch the data + * @param untilTime: until time to fetch the data + * @param maxHistoryDays: maximum history days to fetch the data + * @return + */ + def getWarehousesEventDF(fromTime: TimeTypes, + untilTime: TimeTypes, + config: Config, + maxHistoryDays: Int = 30 + ): DataFrame = { + val sysTableFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS") + val moduleFromTime = fromTime.asLocalDateTime.format(sysTableFormat) + val moduleUntilTime = untilTime.asLocalDateTime.format(sysTableFormat) + val useSystemTableMessage = "Use system tables as a source to audit logs" + val tableDoesNotExistsMessage = "Table system.compute.warehouse_events does not exists" + + if(config.auditLogConfig.systemTableName.isEmpty) + throw new NoNewDataException(useSystemTableMessage, Level.WARN, allowModuleProgression = false) + + if(!spark.catalog.tableExists("system.compute.warehouse_events")) + throw new NoNewDataException(tableDoesNotExistsMessage, Level.WARN, allowModuleProgression = false) + + spark.sql(s""" + select * from system.compute.warehouse_events + WHERE workspace_id = '${config.organizationId}' + and event_time >= DATE_SUB('${moduleFromTime}', ${maxHistoryDays}) + and event_time <= '${moduleUntilTime}' + """) + .withColumnRenamed("event_type","state") + .withColumnRenamed("workspace_id","organization_id") + .withColumnRenamed("event_time","timestamp") + } } diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/Bronze.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/Bronze.scala index 19e694208..446d55649 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/Bronze.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/Bronze.scala @@ -104,8 +104,6 @@ class Bronze(_workspace: Workspace, _database: Database, _config: Config) } - - lazy private[overwatch] val jobsSnapshotModule = Module(1001, "Bronze_Jobs_Snapshot", this) lazy private val appendJobsProcess: () => ETLDefinition = { () => diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/InitializerFunctions.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/InitializerFunctions.scala index 4c6ff8fb5..5d27594ae 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/InitializerFunctions.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/InitializerFunctions.scala @@ -221,6 +221,7 @@ trait InitializerFunctions case "accounts" => accounts case "dbsql" => dbsql case "notebookcommands" => notebookCommands + case "warehouseevents" => warehouseEvents case scope => { val supportedScopes = s"${OverwatchScope.values.mkString(", ")}, all" throw new BadConfigException(s"Scope $scope is not supported. Supported scopes include: " + diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/PipelineTargets.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/PipelineTargets.scala index cf9793bad..e4078839f 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/PipelineTargets.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/PipelineTargets.scala @@ -400,6 +400,16 @@ abstract class PipelineTargets(config: Config) { excludedReconColumn = Array("Timestamp") //Timestamp is the pipelineSnapTs in epoc ) + lazy private[overwatch] val warehousesStateDetailTarget: PipelineTable = PipelineTable( + name = "warehouse_state_detail_silver", + _keys = Array("warehouse_id", "state", "unixTimeMS_state_start"), + config, + _mode = WriteMode.merge, + incrementalColumns = Array("state_start_date", "unixTimeMS_state_start"), + partitionBy = Seq("organization_id", "state_start_date"), + maxMergeScanDates = 30, // 1 less than warehouseStateFact + ) + } object GoldTargets { diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/Silver.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/Silver.scala index a6007f07e..719b05923 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/Silver.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/Silver.scala @@ -30,7 +30,8 @@ class Silver(_workspace: Workspace, _database: Database, _config: Config) SilverTargets.dbJobsStatusTarget, SilverTargets.notebookStatusTarget, SilverTargets.sqlQueryHistoryTarget, - SilverTargets.warehousesSpecTarget + SilverTargets.warehousesSpecTarget, + SilverTargets.warehousesStateDetailTarget ) } @@ -57,6 +58,7 @@ class Silver(_workspace: Workspace, _database: Database, _config: Config) Array(sqlQueryHistoryModule, warehouseSpecModule) } + case OverwatchScope.warehouseEvents => Array(warehouseStateDetailModule) case _ => Array[Module]() } } @@ -402,6 +404,23 @@ class Silver(_workspace: Workspace, _database: Database, _config: Config) ) } + lazy private[overwatch] val warehouseStateDetailModule = Module(2022, "Silver_WarehouseStateDetail", this, Array(1004, 1013)) + lazy private val appendWarehouseStateDetailProcess: () => ETLDefinition = { + () => + ETLDefinition( + workspace.getWarehousesEventDF(warehouseStateDetailModule.fromTime, + warehouseStateDetailModule.untilTime, + config), + Seq(buildWarehouseStateDetail( + warehouseStateDetailModule.untilTime, + BronzeTargets.auditLogsTarget.asIncrementalDF(warehouseSpecModule, BronzeTargets.auditLogsTarget.incrementalColumns,1), //Added to get the Removed Cluster, + SilverTargets.dbJobRunsTarget.asIncrementalDF(warehouseStateDetailModule, SilverTargets.dbJobRunsTarget.incrementalColumns, 30), + SilverTargets.warehousesSpecTarget + )), + append(SilverTargets.warehousesStateDetailTarget) + ) + } + private def processSparkEvents(): Unit = { executorsModule.execute(appendExecutorsProcess) @@ -433,6 +452,9 @@ class Silver(_workspace: Workspace, _database: Database, _config: Config) jobStatusModule.execute(appendJobStatusProcess) jobRunsModule.execute(appendJobRunsProcess) } + case OverwatchScope.warehouseEvents => { + warehouseStateDetailModule.execute(appendWarehouseStateDetailProcess) + } case _ => } } diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/SilverTransforms.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/SilverTransforms.scala index 35b8e30dc..46512e749 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/SilverTransforms.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/SilverTransforms.scala @@ -5,7 +5,7 @@ import com.databricks.labs.overwatch.pipeline.WorkflowsTransforms._ import com.databricks.labs.overwatch.pipeline.DbsqlTransforms._ import com.databricks.labs.overwatch.utils._ import org.apache.log4j.{Level, Logger} -import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.expressions.{Window, WindowSpec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.{Column, DataFrame} @@ -487,6 +487,54 @@ trait SilverTransforms extends SparkSessionWrapper { .withColumn("gcp_attributes", SchemaTools.structToMap(clusterWithStructs, "gcp_attributes")) } + private def warehouseBase(auditLogDf: DataFrame): DataFrame = { + + val warehouse_id_gen_w = Window.partitionBy('organization_id, 'warehouse_name).orderBy('timestamp).rowsBetween(Window.currentRow, 1000) + val warehouse_name_gen_w = Window.partitionBy('organization_id, 'warehouse_id).orderBy('timestamp).rowsBetween(Window.currentRow, 1000) + val warehouse_id_gen = first('warehouse_id, true).over(warehouse_id_gen_w) + val warehouse_name_gen = first('warehouse_name, true).over(warehouse_name_gen_w) + + val warehouseSummaryCols = auditBaseCols ++ Array[Column]( + deriveWarehouseId.alias("warehouse_id"), + 'name.alias("warehouse_name"), + 'cluster_size, + 'min_num_clusters, + 'max_num_clusters, + 'auto_stop_mins, + 'spot_instance_policy, + 'enable_photon, + get_json_object('channel, "$.name").alias("channel"), + 'tags, + 'enable_serverless_compute, + 'warehouse_type + ) + + val rawAuditLogDf = auditLogDf + .filter('actionName.isin("createEndpoint", "editEndpoint", "createWarehouse", + "editWarehouse", "deleteEndpoint", "deleteWarehouse") + && responseSuccessFilter + && 'serviceName === "databrickssql") + + + if(rawAuditLogDf.isEmpty) + throw new NoNewDataException("No New Data", Level.INFO, allowModuleProgression = true) + + val auditLogDfWithStructs = rawAuditLogDf + .selectExpr("*", "requestParams.*").drop("requestParams", "Overwatch_RunID") + .select(warehouseSummaryCols: _*) + .withColumn("warehouse_id", warehouse_id_gen) + .withColumn("warehouse_name", warehouse_name_gen) + + val auditLogDfWithStructsToMap = auditLogDfWithStructs + .withColumn("tags", SchemaTools.structFromJson(spark, auditLogDfWithStructs, "tags")) + .scrubSchema + + val filteredAuditLogDf = auditLogDfWithStructsToMap + .withColumn("tags", SchemaTools.structToMap(auditLogDfWithStructsToMap, "tags")) + .withColumn("source_table",lit("audit_log_bronze")) + filteredAuditLogDf + } + protected def buildPoolsSpec( poolSnapDF: DataFrame, isFirstRun: Boolean, @@ -689,6 +737,29 @@ trait SilverTransforms extends SparkSessionWrapper { } + /** + * Function to create window spec for window functions + * @param partitionCols + * @param orderByColumn + * @param boundaryStart + * @param boundaryEnd + * @return + */ + private def createWindowSpec(partitionCols: String*) + (orderByColumn: Column) + (boundaryStart: Option[Long] = None, boundaryEnd: Option[Long] = None) + : WindowSpec = { + if (partitionCols.isEmpty) + throw new IllegalArgumentException("partitionCols cannot be empty") + if (orderByColumn.toString().isEmpty) + throw new IllegalArgumentException("orderByColumn cannot be empty") + val baseWindow = Window.partitionBy(partitionCols.map(col): _*) + if (boundaryStart.isEmpty && boundaryEnd.isEmpty) + baseWindow.orderBy(orderByColumn) + else + baseWindow.rowsBetween(boundaryStart.get, boundaryEnd.get).orderBy(orderByColumn) + } + private def getLatestClusterSnapAsSpecSilver(df: DataFrame,deriveClusterType: Column): DataFrame = { val latestClusterSnapW = Window.partitionBy('organization_id, 'cluster_id).orderBy('Pipeline_SnapTS.desc) df.withColumn("rnk", rank().over(latestClusterSnapW)) @@ -1015,18 +1086,26 @@ trait SilverTransforms extends SparkSessionWrapper { resultDF } + def buildClusterStateDetail( untilTime: TimeTypes, auditLogDF: DataFrame, jrsilverDF: DataFrame, clusterSpec: PipelineTable, )(clusterEventsDF: DataFrame): DataFrame = { - val stateUnboundW = Window.partitionBy('organization_id, 'cluster_id).orderBy('timestamp) - val stateFromCurrentW = Window.partitionBy('organization_id, 'cluster_id).rowsBetween(1L, 1000L).orderBy('timestamp) - val stateUntilCurrentW = Window.partitionBy('organization_id, 'cluster_id).rowsBetween(-1000L, -1L).orderBy('timestamp) - val stateUntilPreviousRowW = Window.partitionBy('organization_id, 'cluster_id).rowsBetween(Window.unboundedPreceding, -1L).orderBy('timestamp) - val uptimeW = Window.partitionBy('organization_id, 'cluster_id, 'reset_partition).orderBy('unixTimeMS_state_start) - val orderingWindow = Window.partitionBy('organization_id, 'cluster_id).orderBy(desc("timestamp")) +// val stateUnboundW = Window.partitionBy('organization_id, 'cluster_id).orderBy('timestamp) +// val stateFromCurrentW = Window.partitionBy('organization_id, 'cluster_id).rowsBetween(1L, 1000L).orderBy('timestamp) +// val stateUntilCurrentW = Window.partitionBy('organization_id, 'cluster_id).rowsBetween(-1000L, -1L).orderBy('timestamp) +// val stateUntilPreviousRowW = Window.partitionBy('organization_id, 'cluster_id).rowsBetween(Window.unboundedPreceding, -1L).orderBy('timestamp) +// val uptimeW = Window.partitionBy('organization_id, 'cluster_id, 'reset_partition).orderBy('unixTimeMS_state_start) +// val orderingWindow = Window.partitionBy('organization_id, 'cluster_id).orderBy(desc("timestamp")) + + val stateUnboundW = createWindowSpec("organization_id", "cluster_id")('timestamp)() + val stateFromCurrentW = createWindowSpec("organization_id", "cluster_id")('timestamp)(Some(1L), Some(1000L)) + val stateUntilCurrentW = createWindowSpec("organization_id", "cluster_id")('timestamp)(Some(-1000L), Some(-1L)) + val stateUntilPreviousRowW = createWindowSpec("organization_id", "cluster_id")('timestamp)(Some(Window.unboundedPreceding), Some(-1L)) + val uptimeW = createWindowSpec("organization_id", "cluster_id", "reset_partition")('unixTimeMS_state_start)() + val orderingWindow = createWindowSpec("organization_id", "cluster_id")(col("timestamp").desc)() val nonBillableTypes = Array( "STARTING", "TERMINATING", "CREATING", "RESTARTING" , "TERMINATING_IMPUTED" @@ -1427,4 +1506,232 @@ trait SilverTransforms extends SparkSessionWrapper { .transform(deriveWarehouseBase()) .transform(deriveWarehouseBaseFilled(isFirstRun, bronzeWarehouseSnapLatest, silver_warehouse_spec)) } + + protected def buildWarehouseStateDetail( + untilTime: TimeTypes, + auditLogDF: DataFrame, + jrsilverDF: DataFrame, + warehousesSpec: PipelineTable, + )(warehouseEventsDF: DataFrame): DataFrame = { + +// val stateUnboundW = Window.partitionBy('organization_id, 'warehouse_id).orderBy('timestamp) +// val stateFromCurrentW = Window.partitionBy('organization_id, 'warehouse_id).rowsBetween(1L, 1000L).orderBy('timestamp) +// val stateUntilCurrentW = Window.partitionBy('organization_id, 'warehouse_id).rowsBetween(-1000L, -1L).orderBy('timestamp) +// val stateUntilPreviousRowW = Window.partitionBy('organization_id, 'warehouse_id).rowsBetween(Window.unboundedPreceding, -1L).orderBy('timestamp) +// val uptimeW = Window.partitionBy('organization_id, 'warehouse_id, 'state_transition_flag_sum).orderBy('unixTimeMS_state_start) +// val orderingWindow = Window.partitionBy('organization_id, 'warehouse_id).orderBy(desc("timestamp")) + + val stateUnboundW = createWindowSpec("organization_id", "warehouse_id")(col("timestamp"))() + val stateFromCurrentW = createWindowSpec("organization_id", "warehouse_id")(col("timestamp"))(Some(1L), Some(1000L)) + val stateUntilCurrentW = createWindowSpec("organization_id", "warehouse_id")(col("timestamp"))(Some(-1000L), Some(-1L)) + val stateUntilPreviousRowW = createWindowSpec("organization_id", "warehouse_id")(col("timestamp"))(Some(Window.unboundedPreceding), Some(-1L)) + val uptimeW = createWindowSpec("organization_id", "warehouse_id", "state_transition_flag_sum")(col("unixTimeMS_state_start"))() + val orderingWindow = createWindowSpec("organization_id", "warehouse_id")(col("timestamp").desc)() + + + val nonBillableTypes = Array( + "STARTING", "TERMINATING", "CREATING", "RESTARTING" , "TERMINATING_IMPUTED" + ,"STOPPING","STOPPED" //new states from warehouse_events + ) + + val runningStates = Array( + "STARTING", "INIT_SCRIPTS_STARTED", "RUNNING", "CREATING", + "RESIZING", "UPSIZE_COMPLETED", "DRIVER_HEALTHY" + ,"SCALED_UP","SCALED_DOWN" //new states from warehouse_events + ) + + val invalidEventChain = lead('runningSwitch, 1).over(stateUnboundW).isNotNull && lead('runningSwitch, 1) + .over(stateUnboundW) === lead('previousSwitch, 1).over(stateUnboundW) + + val warehouseEventsFinal = if (jrsilverDF.isEmpty || warehousesSpec.asDF.isEmpty) { + warehouseEventsDF // need to add "min_num_clusters","max_num_clusters" + .withColumn("min_num_clusters",lit(0)) + .withColumn("max_num_clusters",lit(0)) + .withColumn("timestamp", unix_timestamp($"timestamp")*1000) + }else{ + val refinedWarehouseEventsDFFiltered = warehouseEventsDF + .withColumn("row", row_number().over(orderingWindow)) + .filter('state =!= "TERMINATING" && 'row === 1) + + val jrSilverAgg= jrsilverDF + .filter('clusterType === "sqlWarehouse") + .groupBy("clusterID") + .agg(max("TaskExecutionRunTime.endTS").alias("end_run_time")) + .withColumnRenamed("clusterID","warehouseId") + .filter('end_run_time.isNotNull) + + val joined = refinedWarehouseEventsDFFiltered.join(jrSilverAgg, + refinedWarehouseEventsDFFiltered("warehouse_id") === jrSilverAgg("warehouseId"), "inner") + .withColumn("state", lit("TERMINATING_IMPUTED")) // check if STOPPING_IMPUTED can be used ? + + // Join with Cluster Spec to get filter on automated cluster + val warehousesSpecDF = warehousesSpec.asDF + .select("warehouse_id","warehouse_name","min_num_clusters","max_num_clusters") + .dropDuplicates() + + val jobClusterImputed = joined.join(warehousesSpecDF,Seq("warehouse_id"),"inner") + .drop("row","warehouseId","end_run_time","warehouse_name") + + warehouseEventsDF +// .withColumn("min_num_clusters",lit(0)) +// .withColumn("max_num_clusters",lit(0)) + .unionByName(jobClusterImputed, allowMissingColumns = true) + .withColumn("timestamp", unix_timestamp($"timestamp")*1000) // need to add "min_num_clusters","max_num_clusters" + } + + val warehouseBaseDF = warehouseBase(auditLogDF) + + val warehouseBaseDF_latest = warehouseBaseDF + .withColumn("row_num",row_number().over(stateUnboundW)) + .filter('row_num === 1).dropDupColumnByAlias("row_num") + .select("organization_id","warehouse_id","max_num_clusters","min_num_clusters") + .withColumnRenamed("max_num_clusters","warehouse_max_num_clusters") + .withColumnRenamed("min_num_clusters","warehouse_min_num_clusters") + + val warehouseEventsDerived = warehouseEventsFinal.join( + warehouseBaseDF_latest + ,Seq("organization_id","warehouse_id"),"left" + ) + .withColumn("max_num_clusters",when('max_num_clusters === 0,'warehouse_max_num_clusters).otherwise('max_num_clusters)) + // .withColumn("min_num_clusters",when('min_num_clusters === 0,'cluster_count).otherwise('min_num_clusters)) + + val warehouseEventsBaseline = warehouseEventsDerived //warehouseEventsFinal + .withColumn( + "runningSwitch", + when('state.isin("TERMINATING","TERMINATING_IMPUTED","STOPPING"), lit(false)) //added STOPPING + .when('state.isin("CREATING", "STARTING"), lit(true)) + .otherwise(lit(null).cast("boolean"))) + .withColumn( + "previousSwitch", + when('runningSwitch.isNotNull, last('runningSwitch, true).over(stateUntilPreviousRowW)) + ) + .withColumn( + "invalidEventChainHandler", + when(invalidEventChain, array(lit(false), lit(true))).otherwise(array(lit(false))) + ) + .selectExpr("*", "explode(invalidEventChainHandler) as imputedTerminationEvent").drop("invalidEventChainHandler") + .withColumn("state", when('imputedTerminationEvent, "STOPPING").otherwise('state)) // replaced TERMINATED with STOPPING + .withColumn("timestamp", when('imputedTerminationEvent, lag('timestamp, 1).over(stateUnboundW) + 1L).otherwise('timestamp)) + .withColumn("lastRunningSwitch", last('runningSwitch, true).over(stateUntilCurrentW)) // previous on/off switch + .withColumn("nextRunningSwitch", first('runningSwitch, true).over(stateFromCurrentW)) // next on/off switch + // given no anomaly, set on/off state to current state + // if no current state use previous state + // if no previous state found, assume opposite of next state switch + .withColumn("isRunning", coalesce( + when('imputedTerminationEvent, lit(false)).otherwise(lit(null).cast("boolean")), + 'runningSwitch, + 'lastRunningSwitch, + !'nextRunningSwitch + )) + // if isRunning still undetermined, use guaranteed events to create state anchors to identify isRunning anchors + .withColumn("isRunning", when('isRunning.isNull && 'state.isin(runningStates: _*), lit(true)).otherwise('isRunning)) + // use the anchors to fill in the null gaps between the state changes to determine if running + // if ultimately unable to be determined, assume not isRunning + .withColumn("isRunning", coalesce( + when('isRunning.isNull, last('isRunning, true).over(stateUntilCurrentW)).otherwise('isRunning), + when('isRunning.isNull, !first('isRunning, true).over(stateFromCurrentW)).otherwise('isRunning), + lit(false) + )).drop("lastRunningSwitch", "nextRunningSwitch") + .withColumn("previousIsRunning",lag($"isRunning", 1, null).over(stateUnboundW)) + .withColumn("isRunning",when(col("previousIsRunning") === "false" && col("state") === "EXPANDED_DISK",lit(false)).otherwise('isRunning)) + .drop("previousIsRunning") + .withColumn( + "current_num_clusters", + coalesce( + when(!'isRunning || 'isRunning.isNull, lit(null).cast("long")) + .otherwise( + coalesce( // get current_num_workers no matter where the value is stored based on business rules + 'cluster_count, + 'min_num_clusters, + last(coalesce( // look for the last non-null value when current value isn't present + 'cluster_count, + 'min_num_clusters + ), true).over(stateUntilCurrentW) + ) + ), + lit(0) // don't allow null returns + ) + ) + .withColumn( + "target_num_clusters", // need to check this logic and rename column + coalesce( + when(!'isRunning || 'isRunning.isNull, lit(null).cast("long")) + .when('state === "CREATING", + coalesce('min_num_clusters, 'current_num_clusters)) + .otherwise(coalesce('max_num_clusters, 'current_num_clusters)), + lit(0) // don't allow null returns + ) + ) + .select( + 'organization_id, 'warehouse_id, 'isRunning, + 'timestamp, 'state, 'current_num_clusters, 'target_num_clusters + ) + .withColumn("unixTimeMS_state_start", 'timestamp) + .withColumn("unixTimeMS_state_end", coalesce( // if state end open, use pipelineSnapTime, will be merged when state end is received + lead('timestamp, 1).over(stateUnboundW) - lit(1), // subtract 1 millis + lit(untilTime.asUnixTimeMilli) + )) + + // Start changes from here + // Get the warehouseID that has been Permenantly_Deleted + + val removedWarehouseID = warehouseBaseDF + .filter('actionName.isin("deleteEndpoint")) + .select('warehouse_id,'timestamp.alias("deletion_timestamp")).distinct() + + val warehouseEventsBaselineForRemovedCluster = warehouseEventsBaseline.join(removedWarehouseID,Seq("warehouse_id")) + + val window = Window.partitionBy('organization_id, 'warehouse_id).orderBy('timestamp.desc) + val stateBeforeRemoval = warehouseEventsBaselineForRemovedCluster + .withColumn("rnk",rank().over(window)) + .withColumn("rn", row_number().over(window)) + .withColumn("unixTimeMS_state_end",when('state.isin("STOPPING","TERMINATING_IMPUTED"),'unixTimeMS_state_end).otherwise('deletion_timestamp)) + .filter('rnk === 1 && 'rn === 1).drop("rnk", "rn") + + val stateDuringRemoval = stateBeforeRemoval + .withColumn("timestamp",when('state.isin("STOPPING","TERMINATING","TERMINATING_IMPUTED"),'unixTimeMS_state_end+1).otherwise(col("deletion_timestamp")+1)) + .withColumn("isRunning",lit(false)) + .withColumn("unixTimeMS_state_start",('timestamp)) + .withColumn("unixTimeMS_state_end",('timestamp)) + .withColumn("state",lit("PERMENANT_DELETE")) + .withColumn("current_num_clusters",lit(0)) + .withColumn("target_num_clusters",lit(0)) + .drop("deletion_timestamp") + + val columns: Array[String] = warehouseEventsBaseline.columns + val stateDuringRemovalFinal = stateBeforeRemoval.drop("deletion_timestamp") + .unionByName(stateDuringRemoval, allowMissingColumns = true) + .select(columns.map(col): _*) + + val warehouseEventsBaselineFinal = warehouseEventsBaseline.join(stateDuringRemovalFinal,Seq("warehouse_id","timestamp"),"anti") + .select(columns.map(col): _*) + .unionByName(stateDuringRemovalFinal, allowMissingColumns = true) + + warehouseEventsBaselineFinal + .withColumn("state_transition_flag", + when( + lag('state, 1).over(stateUnboundW).isin("STOPPING","TERMINATING", "RESTARTING", "EDITED","TERMINATING_IMPUTED") || + !'isRunning, lit(1) + ).otherwise(lit(0)) + ) + .withColumn("state_transition_flag_sum", sum('state_transition_flag).over(stateUnboundW)) + .withColumn("target_num_clusters", last('target_num_clusters, true).over(stateUnboundW)) + .withColumn("current_num_clusters", last('current_num_clusters, true).over(stateUnboundW)) + .withColumn("timestamp_state_start", from_unixtime('unixTimeMS_state_start.cast("double") / lit(1000)).cast("timestamp")) + .withColumn("timestamp_state_end", from_unixtime('unixTimeMS_state_end.cast("double") / lit(1000)).cast("timestamp")) // subtract 1.0 millis + .withColumn("state_start_date", 'timestamp_state_start.cast("date")) + .withColumn("uptime_in_state_S", ('unixTimeMS_state_end - 'unixTimeMS_state_start) / lit(1000)) + .withColumn("uptime_since_restart_S", + coalesce( + when('state_transition_flag === 1, lit(0)) + .otherwise(sum('uptime_in_state_S).over(uptimeW)), + lit(0) + ) + ) + .withColumn("cloud_billable", 'isRunning) + .withColumn("databricks_billable", 'isRunning && !'state.isin(nonBillableTypes: _*)) + .withColumn("uptime_in_state_H", 'uptime_in_state_S / lit(3600)) + .withColumn("state_dates", sequence('timestamp_state_start.cast("date"), 'timestamp_state_end.cast("date"))) + .withColumn("days_in_state", size('state_dates)) + } } diff --git a/src/main/scala/com/databricks/labs/overwatch/utils/Config.scala b/src/main/scala/com/databricks/labs/overwatch/utils/Config.scala index 054695eac..0ec4a61c0 100644 --- a/src/main/scala/com/databricks/labs/overwatch/utils/Config.scala +++ b/src/main/scala/com/databricks/labs/overwatch/utils/Config.scala @@ -160,7 +160,8 @@ class Config() { private[overwatch] def orderedOverwatchScope: Seq[OverwatchScope.Value] = { import OverwatchScope._ // jobs, clusters, clusterEvents, sparkEvents, pools, audit, passthrough, profiles - Seq(audit, notebooks, accounts, pools, clusters, clusterEvents, sparkEvents, jobs, dbsql,notebookCommands) + Seq(audit, notebooks, accounts, pools, clusters, clusterEvents, sparkEvents, jobs, + dbsql, warehouseEvents, notebookCommands) } def overwatchScope: Seq[OverwatchScope.Value] = _overwatchScope diff --git a/src/main/scala/com/databricks/labs/overwatch/utils/Structures.scala b/src/main/scala/com/databricks/labs/overwatch/utils/Structures.scala index f32326903..2037015f3 100644 --- a/src/main/scala/com/databricks/labs/overwatch/utils/Structures.scala +++ b/src/main/scala/com/databricks/labs/overwatch/utils/Structures.scala @@ -402,8 +402,10 @@ case class SanitizeFieldException(field: StructField, rules: List[SanitizeRule], object OverwatchScope extends Enumeration { type OverwatchScope = Value - val jobs, clusters, clusterEvents, sparkEvents, audit, notebooks, accounts, dbsql, pools, notebookCommands = Value + val jobs, clusters, clusterEvents, sparkEvents, audit, notebooks, accounts, + dbsql, pools, notebookCommands, warehouseEvents = Value // Todo Issue_77 + def toArray: Array[String] = values.map(_.toString).toArray } object WriteMode extends Enumeration {