Skip to content

Commit

Permalink
1218 warehouse state details (#1254)
Browse files Browse the repository at this point in the history
* test

* code for warehouse_state_detail_silver

* removed comments

* adding warehouseEvents scope

* added exception for table not found

* added exception to check if system tables are getting used or not

* enhance function getWarehousesEventDF

* added code to fix max number of clusters

* change in column names

* refactored code
  • Loading branch information
aman-db committed Aug 12, 2024
1 parent 59daae5 commit 3055a22
Show file tree
Hide file tree
Showing 9 changed files with 390 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
val sqlComputerDBUPrice: Double = config.sql_compute_dbu_price
val jobsLightDBUPrice: Double = config.jobs_light_dbu_price
val customWorkspaceName: String = config.workspace_name
val standardScopes = "audit,sparkEvents,jobs,clusters,clusterEvents,notebooks,pools,accounts,dbsql,notebookCommands".split(",")
val standardScopes = OverwatchScope.toArray
val scopesToExecute = (standardScopes.map(_.toLowerCase).toSet --
config.excluded_scopes.getOrElse("").split(":").map(_.toLowerCase).toSet).toArray

Expand Down
36 changes: 36 additions & 0 deletions src/main/scala/com/databricks/labs/overwatch/env/Workspace.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import scala.concurrent.Future
import scala.concurrent.forkjoin.ForkJoinPool
import scala.util.{Failure, Success, Try}
import scala.concurrent.ExecutionContext.Implicits.global
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter


/**
Expand Down Expand Up @@ -422,6 +424,40 @@ class Workspace(config: Config) extends SparkSessionWrapper {
addReport
}

/**
* Fetch the warehouse event data from system.compute.warehouse_events
* @param fromTime : from time to fetch the data
* @param untilTime: until time to fetch the data
* @param maxHistoryDays: maximum history days to fetch the data
* @return
*/
def getWarehousesEventDF(fromTime: TimeTypes,
untilTime: TimeTypes,
config: Config,
maxHistoryDays: Int = 30
): DataFrame = {
val sysTableFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
val moduleFromTime = fromTime.asLocalDateTime.format(sysTableFormat)
val moduleUntilTime = untilTime.asLocalDateTime.format(sysTableFormat)
val useSystemTableMessage = "Use system tables as a source to audit logs"
val tableDoesNotExistsMessage = "Table system.compute.warehouse_events does not exists"

if(config.auditLogConfig.systemTableName.isEmpty)
throw new NoNewDataException(useSystemTableMessage, Level.WARN, allowModuleProgression = false)

if(!spark.catalog.tableExists("system.compute.warehouse_events"))
throw new NoNewDataException(tableDoesNotExistsMessage, Level.WARN, allowModuleProgression = false)

spark.sql(s"""
select * from system.compute.warehouse_events
WHERE workspace_id = '${config.organizationId}'
and event_time >= DATE_SUB('${moduleFromTime}', ${maxHistoryDays})
and event_time <= '${moduleUntilTime}'
""")
.withColumnRenamed("event_type","state")
.withColumnRenamed("workspace_id","organization_id")
.withColumnRenamed("event_time","timestamp")
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ class Bronze(_workspace: Workspace, _database: Database, _config: Config)

}



lazy private[overwatch] val jobsSnapshotModule = Module(1001, "Bronze_Jobs_Snapshot", this)
lazy private val appendJobsProcess: () => ETLDefinition = {
() =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ trait InitializerFunctions
case "accounts" => accounts
case "dbsql" => dbsql
case "notebookcommands" => notebookCommands
case "warehouseevents" => warehouseEvents
case scope => {
val supportedScopes = s"${OverwatchScope.values.mkString(", ")}, all"
throw new BadConfigException(s"Scope $scope is not supported. Supported scopes include: " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,16 @@ abstract class PipelineTargets(config: Config) {
excludedReconColumn = Array("Timestamp") //Timestamp is the pipelineSnapTs in epoc
)

lazy private[overwatch] val warehousesStateDetailTarget: PipelineTable = PipelineTable(
name = "warehouse_state_detail_silver",
_keys = Array("warehouse_id", "state", "unixTimeMS_state_start"),
config,
_mode = WriteMode.merge,
incrementalColumns = Array("state_start_date", "unixTimeMS_state_start"),
partitionBy = Seq("organization_id", "state_start_date"),
maxMergeScanDates = 30, // 1 less than warehouseStateFact
)

}

object GoldTargets {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class Silver(_workspace: Workspace, _database: Database, _config: Config)
SilverTargets.dbJobsStatusTarget,
SilverTargets.notebookStatusTarget,
SilverTargets.sqlQueryHistoryTarget,
SilverTargets.warehousesSpecTarget
SilverTargets.warehousesSpecTarget,
SilverTargets.warehousesStateDetailTarget
)
}

Expand All @@ -57,6 +58,7 @@ class Silver(_workspace: Workspace, _database: Database, _config: Config)
Array(sqlQueryHistoryModule,
warehouseSpecModule)
}
case OverwatchScope.warehouseEvents => Array(warehouseStateDetailModule)
case _ => Array[Module]()
}
}
Expand Down Expand Up @@ -402,6 +404,23 @@ class Silver(_workspace: Workspace, _database: Database, _config: Config)
)
}

lazy private[overwatch] val warehouseStateDetailModule = Module(2022, "Silver_WarehouseStateDetail", this, Array(1004, 1013))
lazy private val appendWarehouseStateDetailProcess: () => ETLDefinition = {
() =>
ETLDefinition(
workspace.getWarehousesEventDF(warehouseStateDetailModule.fromTime,
warehouseStateDetailModule.untilTime,
config),
Seq(buildWarehouseStateDetail(
warehouseStateDetailModule.untilTime,
BronzeTargets.auditLogsTarget.asIncrementalDF(warehouseSpecModule, BronzeTargets.auditLogsTarget.incrementalColumns,1), //Added to get the Removed Cluster,
SilverTargets.dbJobRunsTarget.asIncrementalDF(warehouseStateDetailModule, SilverTargets.dbJobRunsTarget.incrementalColumns, 30),
SilverTargets.warehousesSpecTarget
)),
append(SilverTargets.warehousesStateDetailTarget)
)
}

private def processSparkEvents(): Unit = {

executorsModule.execute(appendExecutorsProcess)
Expand Down Expand Up @@ -433,6 +452,9 @@ class Silver(_workspace: Workspace, _database: Database, _config: Config)
jobStatusModule.execute(appendJobStatusProcess)
jobRunsModule.execute(appendJobRunsProcess)
}
case OverwatchScope.warehouseEvents => {
warehouseStateDetailModule.execute(appendWarehouseStateDetailProcess)
}
case _ =>
}
}
Expand Down
Loading

0 comments on commit 3055a22

Please sign in to comment.