-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
reading mount source from csv implemented #695
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
package com.databricks.labs.overwatch.pipeline | ||
|
||
import com.databricks.dbutils_v1.DBUtilsHolder.dbutils | ||
import com.databricks.labs.overwatch.env.Database | ||
import com.databricks.labs.overwatch.eventhubs.AadAuthInstance | ||
import com.databricks.labs.overwatch.pipeline.WorkflowsTransforms.{workflowsCleanseJobClusters, workflowsCleanseTasks} | ||
|
@@ -897,17 +898,17 @@ trait BronzeTransforms extends SparkSessionWrapper { | |
|
||
|
||
private[overwatch] def getAllEventLogPrefix(inputDataframe: DataFrame, apiEnv: ApiEnv): DataFrame = { | ||
try{ | ||
val mountMap = getMountPointMapping(apiEnv) //Getting the mount info from api and cleaning the data | ||
.withColumn("mount_point", when('mount_point.endsWith("/"), 'mount_point.substr(lit(0), length('mount_point) - 1)).otherwise('mount_point)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. its a different issue when get the source to mount mapping using dbutils i see the data which is present in csv contains the "/" at the end hence some of the joins were failing. #699 contains the "/" issue only in apiURL. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ahh i see, thanks for clarifying |
||
.withColumn("source", when('source.endsWith("/"), 'source.substr(lit(0), length('source) - 1)).otherwise('source)) | ||
.filter(col("mount_point") =!= "/") | ||
|
||
//Cleaning the data for cluster log path | ||
val formattedInputDf = inputDataframe.withColumn("cluster_log_conf", when('cluster_log_conf.endsWith("/"), 'cluster_log_conf.substr(lit(0), length('cluster_log_conf) - 1)).otherwise('cluster_log_conf)) | ||
.withColumn("cluster_mount_point_temp", regexp_replace('cluster_log_conf, "dbfs:", "")) | ||
.withColumn("cluster_mount_point", 'cluster_mount_point_temp) | ||
// .withColumn("cluster_mount_point", regexp_replace('cluster_mount_point_temp, "//", "/")) | ||
|
||
|
||
//Joining the cluster log data with mount point data | ||
val joinDF = formattedInputDf | ||
.join(mountMap, formattedInputDf.col("cluster_mount_point").startsWith(mountMap.col("mount_point")), "left") //starts with then when | ||
|
@@ -925,11 +926,35 @@ trait BronzeTransforms extends SparkSessionWrapper { | |
|
||
val result = pathsDF.select('wildPrefix, 'cluster_id) | ||
result | ||
}catch { | ||
case e:Exception=> | ||
logger.log(Level.ERROR,"Unable to get all the event log prefix",e) | ||
throw e | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's log the exception here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
} | ||
|
||
} | ||
|
||
private def getMountPointMapping(apiEnv: ApiEnv): DataFrame = { | ||
val endPoint = "dbfs/search-mounts" | ||
ApiCallV2(apiEnv, endPoint).execute().asDF() | ||
try{ | ||
if (apiEnv.mountMappingPath.nonEmpty) { | ||
logger.log(Level.INFO, "Reading cluster logs from " + apiEnv.mountMappingPath) | ||
spark.read.option("header", "true") | ||
.option("ignoreLeadingWhiteSpace", true) | ||
.option("ignoreTrailingWhiteSpace", true) | ||
.csv(apiEnv.mountMappingPath.get) | ||
.withColumnRenamed("mountPoint","mount_point") | ||
.select("mount_point", "source") | ||
} else { | ||
logger.log(Level.INFO,"Calling dbfs/search-mounts for cluster logs") | ||
val endPoint = "dbfs/search-mounts" | ||
ApiCallV2(apiEnv, endPoint).execute().asDF() | ||
} | ||
}catch { | ||
case e:Exception=> | ||
logger.log(Level.ERROR,"ERROR while reading mount point",e) | ||
throw e | ||
} | ||
|
||
} | ||
|
||
|
||
|
@@ -941,7 +966,8 @@ trait BronzeTransforms extends SparkSessionWrapper { | |
clusterSnapshotTable: PipelineTable, | ||
sparkLogClusterScaleCoefficient: Double, | ||
apiEnv: ApiEnv, | ||
isMultiWorkSpaceDeployment: Boolean | ||
isMultiWorkSpaceDeployment: Boolean, | ||
organisationId: String | ||
)(incrementalAuditDF: DataFrame): DataFrame = { | ||
|
||
logger.log(Level.INFO, "Collecting Event Log Paths Glob. This can take a while depending on the " + | ||
|
@@ -985,7 +1011,7 @@ trait BronzeTransforms extends SparkSessionWrapper { | |
// Build root level eventLog path prefix from clusterID and log conf | ||
// /some/log/prefix/cluster_id/eventlog | ||
val allEventLogPrefixes = | ||
if(isMultiWorkSpaceDeployment) { | ||
if(isMultiWorkSpaceDeployment && organisationId != Initializer.getOrgId) { | ||
getAllEventLogPrefix(newLogDirsNotIdentifiedInAudit | ||
.unionByName(incrementalClusterWLogging), apiEnv).select('wildPrefix).distinct() | ||
} else { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,7 +49,8 @@ case class ApiEnv( | |
proxyPort: Option[Int] = None, | ||
proxyUserName: Option[String] = None, | ||
proxyPasswordScope: Option[String] = None, | ||
proxyPasswordKey: Option[String] = None | ||
proxyPasswordKey: Option[String] = None, | ||
mountMappingPath: Option[String] = None | ||
) | ||
|
||
|
||
|
@@ -59,7 +60,8 @@ case class ApiEnvConfig( | |
enableUnsafeSSL: Boolean = false, | ||
threadPoolSize: Int = 4, | ||
apiWaitingTime: Long = 300000, | ||
apiProxyConfig: Option[ApiProxyConfig] = None | ||
apiProxyConfig: Option[ApiProxyConfig] = None, | ||
mountMappingPath: Option[String] = None | ||
) | ||
|
||
case class ApiProxyConfig( | ||
|
@@ -101,6 +103,7 @@ case class MultiWorkspaceConfig(workspace_name: String, | |
enable_unsafe_SSL: Option[Boolean]= None, | ||
thread_pool_size: Option[Int] = None, | ||
api_waiting_time: Option[Long] = None, | ||
mount_mapping_path: String, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is option string everywhere else, do you have a default string when instantiating ApiProxyConfig if it's not present through getOrElse or something? I didn't see that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nah this is fine. thanks for explaining. |
||
deployment_id: String, | ||
output_path: String | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you add some validation for this? When this parameter is passed, the >50 mount validation should be disabled and the mapping validation should be enabled. Is this done?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, i thought of it but some part of the code is in
#713
and some part of the code is in
#695
so was waiting for these 2 PR to get merged so that i can enhance it to add the validation part
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, will merge this now and #713 as soon as your review is complete, and you can add the validator logic comments in a new PR after all merged. Thanks.