Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slash Management #825

Merged
merged 5 commits into from
Apr 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.databricks.labs.overwatch
import com.databricks.labs.overwatch.env.Workspace
import com.databricks.labs.overwatch.pipeline.TransformFunctions._
import com.databricks.labs.overwatch.pipeline._
import com.databricks.labs.overwatch.utils.Helpers.removeTrailingSlashes
import com.databricks.labs.overwatch.utils._
import com.databricks.labs.overwatch.validation.DeploymentValidation
import org.apache.log4j.{Level, Logger}
Expand Down Expand Up @@ -391,7 +392,7 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
try {
val baseConfig = generateBaseConfig(configLocation)
val multiWorkspaceConfig = baseConfig
.withColumn("api_url", when('api_url.endsWith("/"), 'api_url.substr(lit(0), length('api_url) - 1)).otherwise('api_url))
.withColumn("api_url", removeTrailingSlashes('api_url))
.withColumn("deployment_id", lit(deploymentId))
.withColumn("output_path", lit(outputPath))
.as[MultiWorkspaceConfig]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.databricks.dbutils_v1.DBUtilsHolder.dbutils
import com.databricks.labs.overwatch.env.Database
import com.databricks.labs.overwatch.eventhubs.AadAuthInstance
import com.databricks.labs.overwatch.pipeline.WorkflowsTransforms.{workflowsCleanseJobClusters, workflowsCleanseTasks}
import com.databricks.labs.overwatch.utils.Helpers.getDatesGlob
import com.databricks.labs.overwatch.utils.Helpers.{getDatesGlob, removeTrailingSlashes}
import com.databricks.labs.overwatch.utils.SchemaTools.structFromJson
import com.databricks.labs.overwatch.utils._
import com.databricks.labs.overwatch.api.{ApiCall, ApiCallV2}
Expand Down Expand Up @@ -887,11 +887,11 @@ trait BronzeTransforms extends SparkSessionWrapper {
private[overwatch] def getAllEventLogPrefix(inputDataframe: DataFrame, apiEnv: ApiEnv): DataFrame = {
try{
val mountMap = getMountPointMapping(apiEnv) //Getting the mount info from api and cleaning the data
.withColumn("mount_point", when('mount_point.endsWith("/"), 'mount_point.substr(lit(0), length('mount_point) - 1)).otherwise('mount_point))
.withColumn("source", when('source.endsWith("/"), 'source.substr(lit(0), length('source) - 1)).otherwise('source))
.withColumn("mount_point", removeTrailingSlashes('mount_point))
.withColumn("source", removeTrailingSlashes('source))
.filter(col("mount_point") =!= "/")
//Cleaning the data for cluster log path
val formattedInputDf = inputDataframe.withColumn("cluster_log_conf", when('cluster_log_conf.endsWith("/"), 'cluster_log_conf.substr(lit(0), length('cluster_log_conf) - 1)).otherwise('cluster_log_conf))
val formattedInputDf = inputDataframe.withColumn("cluster_log_conf", removeTrailingSlashes('cluster_log_conf))
.withColumn("cluster_mount_point_temp", regexp_replace('cluster_log_conf, "dbfs:", ""))
.withColumn("cluster_mount_point", 'cluster_mount_point_temp)
// .withColumn("cluster_mount_point", regexp_replace('cluster_mount_point_temp, "//", "/"))
Expand Down Expand Up @@ -1005,7 +1005,7 @@ trait BronzeTransforms extends SparkSessionWrapper {
} else {
newLogDirsNotIdentifiedInAudit
.unionByName(incrementalClusterWLogging)
.withColumn("cluster_log_conf", when('cluster_log_conf.endsWith("/"), 'cluster_log_conf.substr(lit(0), length('cluster_log_conf) - 1)) .otherwise('cluster_log_conf))
.withColumn("cluster_log_conf",removeTrailingSlashes('cluster_log_conf))
.withColumn("topLevelTargets", array(col("cluster_log_conf"), col("cluster_id"), lit("eventlog")))
.withColumn("wildPrefix", concat_ws("/", 'topLevelTargets))
.select('wildPrefix)
Expand Down
56 changes: 54 additions & 2 deletions src/main/scala/com/databricks/labs/overwatch/utils/Tools.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package com.databricks.labs.overwatch.utils
import com.amazonaws.services.s3.model.AmazonS3Exception
import com.databricks.labs.overwatch.env.Workspace
import com.databricks.dbutils_v1.DBUtilsHolder.dbutils

import java.io.FileNotFoundException
import com.databricks.labs.overwatch.pipeline
import com.databricks.labs.overwatch.pipeline.TransformFunctions.datesStream
Expand All @@ -17,7 +16,7 @@ import org.apache.commons.lang3.StringEscapeUtils
import org.apache.hadoop.conf._
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -883,5 +882,58 @@ object Helpers extends SparkSessionWrapper {

}

/**
* Function removes the trailing slashes and double slashes of the given URL.
* @param url
* @return
*/
def sanitizeURL(url:String):String={
GeekSheikh marked this conversation as resolved.
Show resolved Hide resolved
val inputUrl = url.trim
removeDuplicateSlashes(removeTrailingSlashes(inputUrl))
}

/**
* FUnction removes the double slashes of the given URL.
* @param url
* @return
*/
def removeDuplicateSlashes(url: String): String = {
GeekSheikh marked this conversation as resolved.
Show resolved Hide resolved
val stringURL = url.replaceAll("//", "/")
val makeFirstSlashDoubleSlash =
if (stringURL.contains("s3a:/") ||
stringURL.contains("s3:/") ||
stringURL.contains("gs:/") ||
stringURL.contains("abfss:/") ||
stringURL.contains("http:/") ||
stringURL.contains("https:/")) true else false
if (makeFirstSlashDoubleSlash) {
stringURL.replaceFirst("/", "//")
} else {
stringURL
}
}

/**
* Removes the slash if the slash is is present at the end of the URL.
* @param url
* @return
*/
def removeTrailingSlashes(url: String): String = {
GeekSheikh marked this conversation as resolved.
Show resolved Hide resolved
if(url.lastIndexOf("/") == url.length-1){
url.substring(0,url.length-1)
}else{
url
}
}

/**
* Removes the slash if the slash is is present at the end of the URL.
*
* @param url
* @return
*/
def removeTrailingSlashes(url: Column): Column = {
when(url.endsWith("/"), url.substr(lit(0), length(url) - 1)).otherwise(url)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,57 @@ class HelpersTest extends AnyFunSpec {
assert(Helpers.isNumeric("1234") == true)
assert(Helpers.isNumeric("abcd") == false)
}
it("Trailing slash test"){
assert(Helpers.removeTrailingSlashes("http://hiii.com/") == "http://hiii.com")
assert(Helpers.removeTrailingSlashes("http://hiii.com") == "http://hiii.com")
}
it("Duplicate slash test") {
//dbfs
assert(Helpers.removeDuplicateSlashes("dbfs:/mnt//cluster_logs/0310-182313-l2tbouaj") == "dbfs:/mnt/cluster_logs/0310-182313-l2tbouaj")
assert(Helpers.removeDuplicateSlashes("dbfs://mnt//cluster_logs/0310-182313-l2tbouaj") == "dbfs:/mnt/cluster_logs/0310-182313-l2tbouaj")
assert(Helpers.removeDuplicateSlashes("dbfs://mnt//cluster_logs/0310-182313-l2tbouaj/") == "dbfs:/mnt/cluster_logs/0310-182313-l2tbouaj/")
//Azure
assert(Helpers.removeDuplicateSlashes("abfss://LOCATION1//ABC//ccc") == "abfss://LOCATION1/ABC/ccc")
assert(Helpers.removeDuplicateSlashes("abfss://LOCATION1/ABC//ccc") == "abfss://LOCATION1/ABC/ccc")
assert(Helpers.removeDuplicateSlashes("abfss:/LOCATION1//ABC//ccc") == "abfss://LOCATION1/ABC/ccc")


//FOR AWS
assert(Helpers.removeDuplicateSlashes("s3a://databricks-field-eng-audit-logs/demo") == "s3a://databricks-field-eng-audit-logs/demo")
assert(Helpers.removeDuplicateSlashes("s3a://databricks-field-eng-audit-logs//demo") == "s3a://databricks-field-eng-audit-logs/demo")
assert(Helpers.removeDuplicateSlashes("s3a:/databricks-field-eng-audit-logs//demo") == "s3a://databricks-field-eng-audit-logs/demo")
assert(Helpers.removeDuplicateSlashes("s3a:/databricks-field-eng-audit-logs//demo/") == "s3a://databricks-field-eng-audit-logs/demo/")

assert(Helpers.removeDuplicateSlashes("s3://databricks-field-eng-audit-logs//demo/") == "s3://databricks-field-eng-audit-logs/demo/")
assert(Helpers.removeDuplicateSlashes("s3://databricks-field-eng-audit-logs/demo") == "s3://databricks-field-eng-audit-logs/demo")
assert(Helpers.removeDuplicateSlashes("s3://databricks-field-eng-audit-logs//demo/") == "s3://databricks-field-eng-audit-logs/demo/")
assert(Helpers.removeDuplicateSlashes("s3:/databricks-field-eng-audit-logs//demo/") == "s3://databricks-field-eng-audit-logs/demo/")

//FOR GCP
assert(Helpers.removeDuplicateSlashes("gs://overwatch-global-gcp/test_mws") == "gs://overwatch-global-gcp/test_mws")
assert(Helpers.removeDuplicateSlashes("gs://overwatch-global-gcp//test_mws") == "gs://overwatch-global-gcp/test_mws")
assert(Helpers.removeDuplicateSlashes("gs://overwatch-global-gcp//test_mws/") == "gs://overwatch-global-gcp/test_mws/")
assert(Helpers.removeDuplicateSlashes("gs:/overwatch-global-gcp//test_mws/") == "gs://overwatch-global-gcp/test_mws/")

//HTTP
assert(Helpers.removeDuplicateSlashes("http://databricks.com") == "http://databricks.com")
assert(Helpers.removeDuplicateSlashes("http:/databricks.com") == "http://databricks.com")
assert(Helpers.removeDuplicateSlashes("http:/databricks.com/") == "http://databricks.com/")


assert(Helpers.removeDuplicateSlashes("https://databricks.com") == "https://databricks.com")
assert(Helpers.removeDuplicateSlashes("https:/databricks.com") == "https://databricks.com")
assert(Helpers.removeDuplicateSlashes("https://databricks.com/") == "https://databricks.com/")

}
it("test sanitizeURL"){
assert(Helpers.sanitizeURL("https://databricks.com") == "https://databricks.com")
assert(Helpers.sanitizeURL("https:/databricks.com") == "https://databricks.com")
assert(Helpers.sanitizeURL("gs:/overwatch-global-gcp//test_mws/") == "gs://overwatch-global-gcp/test_mws")
assert(Helpers.sanitizeURL("dbfs://mnt//cluster_logs/0310-182313-l2tbouaj/") == "dbfs:/mnt/cluster_logs/0310-182313-l2tbouaj")
assert(Helpers.sanitizeURL("s3a:/databricks-field-eng-audit-logs//demo/") == "s3a://databricks-field-eng-audit-logs/demo")
assert(Helpers.sanitizeURL("s3a://databricks-field-eng-audit-logs//demo/") == "s3a://databricks-field-eng-audit-logs/demo")
}

}

Expand Down