Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reading mount source from csv implemented #695

Merged
merged 4 commits into from
Feb 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
config.enable_unsafe_SSL.getOrElse(false),
config.thread_pool_size.getOrElse(4),
config.api_waiting_time.getOrElse(300000),
Some(apiProxyConfig))
Some(apiProxyConfig),
Some(config.mount_mapping_path))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you add some validation for this? When this parameter is passed, the >50 mount validation should be disabled and the mapping validation should be enabled. Is this done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, i thought of it but some part of the code is in
#713
and some part of the code is in
#695

so was waiting for these 2 PR to get merged so that i can enhance it to add the validation part

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will merge this now and #713 as soon as your review is complete, and you can add the validator logic comments in a new PR after all merged. Thanks.

apiEnvConfig
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,9 @@ class ParamDeserializer() extends StdDeserializer[OverwatchParams](classOf[Overw
getOptionBoolean(masterNode, "apiEnvConfig.enableUnsafeSSL").getOrElse(false),
getOptionInt(masterNode, "apiEnvConfig.threadPoolSize").getOrElse(4),
getOptionLong(masterNode, "apiEnvConfig.apiWaitingTime").getOrElse(300000),
apiProxyNodeConfig
))
apiProxyNodeConfig,
getOptionString(masterNode, "apiEnvConfig.mountMappingPath"))
)
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ class Bronze(_workspace: Workspace, _database: Database, _config: Config)
BronzeTargets.clustersSnapshotTarget,
sparkLogClusterScaleCoefficient,
config.apiEnv,
config.isMultiworkspaceDeployment
config.isMultiworkspaceDeployment,
config.organizationId
),
generateEventLogsDF(
database,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.databricks.labs.overwatch.pipeline

import com.databricks.dbutils_v1.DBUtilsHolder.dbutils
import com.databricks.labs.overwatch.env.Database
import com.databricks.labs.overwatch.eventhubs.AadAuthInstance
import com.databricks.labs.overwatch.pipeline.WorkflowsTransforms.{workflowsCleanseJobClusters, workflowsCleanseTasks}
Expand Down Expand Up @@ -897,17 +898,17 @@ trait BronzeTransforms extends SparkSessionWrapper {


private[overwatch] def getAllEventLogPrefix(inputDataframe: DataFrame, apiEnv: ApiEnv): DataFrame = {
try{
val mountMap = getMountPointMapping(apiEnv) //Getting the mount info from api and cleaning the data
.withColumn("mount_point", when('mount_point.endsWith("/"), 'mount_point.substr(lit(0), length('mount_point) - 1)).otherwise('mount_point))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wasn't this implemented in #699 ? Curious as to why it's in this PR too. No problem, just want to confirm the logic is exactly the same.

Added #768 for this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its a different issue when get the source to mount mapping using dbutils i see the data which is present in csv contains the "/" at the end hence some of the joins were failing. #699 contains the "/" issue only in apiURL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh i see, thanks for clarifying

.withColumn("source", when('source.endsWith("/"), 'source.substr(lit(0), length('source) - 1)).otherwise('source))
.filter(col("mount_point") =!= "/")

//Cleaning the data for cluster log path
val formattedInputDf = inputDataframe.withColumn("cluster_log_conf", when('cluster_log_conf.endsWith("/"), 'cluster_log_conf.substr(lit(0), length('cluster_log_conf) - 1)).otherwise('cluster_log_conf))
.withColumn("cluster_mount_point_temp", regexp_replace('cluster_log_conf, "dbfs:", ""))
.withColumn("cluster_mount_point", 'cluster_mount_point_temp)
// .withColumn("cluster_mount_point", regexp_replace('cluster_mount_point_temp, "//", "/"))


//Joining the cluster log data with mount point data
val joinDF = formattedInputDf
.join(mountMap, formattedInputDf.col("cluster_mount_point").startsWith(mountMap.col("mount_point")), "left") //starts with then when
Expand All @@ -925,11 +926,35 @@ trait BronzeTransforms extends SparkSessionWrapper {

val result = pathsDF.select('wildPrefix, 'cluster_id)
result
}catch {
case e:Exception=>
logger.log(Level.ERROR,"Unable to get all the event log prefix",e)
throw e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's log the exception here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

}

private def getMountPointMapping(apiEnv: ApiEnv): DataFrame = {
val endPoint = "dbfs/search-mounts"
ApiCallV2(apiEnv, endPoint).execute().asDF()
try{
if (apiEnv.mountMappingPath.nonEmpty) {
logger.log(Level.INFO, "Reading cluster logs from " + apiEnv.mountMappingPath)
spark.read.option("header", "true")
.option("ignoreLeadingWhiteSpace", true)
.option("ignoreTrailingWhiteSpace", true)
.csv(apiEnv.mountMappingPath.get)
.withColumnRenamed("mountPoint","mount_point")
.select("mount_point", "source")
} else {
logger.log(Level.INFO,"Calling dbfs/search-mounts for cluster logs")
val endPoint = "dbfs/search-mounts"
ApiCallV2(apiEnv, endPoint).execute().asDF()
}
}catch {
case e:Exception=>
logger.log(Level.ERROR,"ERROR while reading mount point",e)
throw e
}

}


Expand All @@ -941,7 +966,8 @@ trait BronzeTransforms extends SparkSessionWrapper {
clusterSnapshotTable: PipelineTable,
sparkLogClusterScaleCoefficient: Double,
apiEnv: ApiEnv,
isMultiWorkSpaceDeployment: Boolean
isMultiWorkSpaceDeployment: Boolean,
organisationId: String
)(incrementalAuditDF: DataFrame): DataFrame = {

logger.log(Level.INFO, "Collecting Event Log Paths Glob. This can take a while depending on the " +
Expand Down Expand Up @@ -985,7 +1011,7 @@ trait BronzeTransforms extends SparkSessionWrapper {
// Build root level eventLog path prefix from clusterID and log conf
// /some/log/prefix/cluster_id/eventlog
val allEventLogPrefixes =
if(isMultiWorkSpaceDeployment) {
if(isMultiWorkSpaceDeployment && organisationId != Initializer.getOrgId) {
getAllEventLogPrefix(newLogDirsNotIdentifiedInAudit
.unionByName(incrementalClusterWLogging), apiEnv).select('wildPrefix).distinct()
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,7 @@ object Schema extends SparkSessionWrapper {
StructField("error_batch_size", IntegerType, nullable = true),
StructField("enable_unsafe_SSL", BooleanType, nullable = true),
StructField("thread_pool_size", IntegerType, nullable = true),
StructField("api_waiting_time", LongType, nullable = true)
StructField("api_waiting_time", LongType, nullable = true),
StructField("mount_mapping_path", StringType, nullable = true)
))
}
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,8 @@ class Config() {
setApiEnv(ApiEnv(isLocalTesting, workspaceURL, rawToken, packageVersion, derivedApiEnvConfig.successBatchSize,
derivedApiEnvConfig.errorBatchSize, runID, derivedApiEnvConfig.enableUnsafeSSL, derivedApiEnvConfig.threadPoolSize,
derivedApiEnvConfig.apiWaitingTime, derivedApiProxy.proxyHost, derivedApiProxy.proxyPort,
derivedApiProxy.proxyUserName, derivedApiProxy.proxyPasswordScope, derivedApiProxy.proxyPasswordKey
derivedApiProxy.proxyUserName, derivedApiProxy.proxyPasswordScope, derivedApiProxy.proxyPasswordKey ,
derivedApiEnvConfig.mountMappingPath
))

this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ case class ApiEnv(
proxyPort: Option[Int] = None,
proxyUserName: Option[String] = None,
proxyPasswordScope: Option[String] = None,
proxyPasswordKey: Option[String] = None
proxyPasswordKey: Option[String] = None,
mountMappingPath: Option[String] = None
)


Expand All @@ -59,7 +60,8 @@ case class ApiEnvConfig(
enableUnsafeSSL: Boolean = false,
threadPoolSize: Int = 4,
apiWaitingTime: Long = 300000,
apiProxyConfig: Option[ApiProxyConfig] = None
apiProxyConfig: Option[ApiProxyConfig] = None,
mountMappingPath: Option[String] = None
)

case class ApiProxyConfig(
Expand Down Expand Up @@ -101,6 +103,7 @@ case class MultiWorkspaceConfig(workspace_name: String,
enable_unsafe_SSL: Option[Boolean]= None,
thread_pool_size: Option[Int] = None,
api_waiting_time: Option[Long] = None,
mount_mapping_path: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is option string everywhere else, do you have a default string when instantiating ApiProxyConfig if it's not present through getOrElse or something? I didn't see that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Screenshot 2023-02-20 at 10 53 58 PM
while building the config i am taking the mountMappingPath as Some(config.mount_mapping_path)) and checking is as nonEmpty in later part
Screenshot 2023-02-20 at 10 57 58 PM

let me know if i have to change it and make default value??i thought mount_mapping_path should not have any default value thats why i have implemented in this way

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nah this is fine. thanks for explaining.

deployment_id: String,
output_path: String
)
Expand Down