Skip to content

Commit

Permalink
recon enhancement done to deal with different columns in source and t…
Browse files Browse the repository at this point in the history
…arget (#1216)

* Initial commit

* recon enhancement done to deal with different columns in source and target

---------

Co-authored-by: Guenia <guenia.izquierdo@databricks.com>
  • Loading branch information
sriram251-code and gueniai committed Jul 8, 2024
1 parent 97236ae commit 25671b7
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ case class ReconReport(
deviationPercentage: Option[Double] = None,
sourceQuery: Option[String] = None,
targetQuery: Option[String] = None,
sourceOnlyColumns: Option[String] = None,
targetOnlyColumns: Option[String] = None,
errorMsg: Option[String] = None
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import com.databricks.labs.overwatch.utils.Helpers.getAllPipelineTargets
import com.databricks.labs.overwatch.utils.{Helpers, ReconReport, SparkSessionWrapper}
import org.apache.log4j.Logger
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, hash, lit}
import org.apache.spark.sql.functions.{col, concat_ws, hash, lit, sha2}


import java.time.LocalDateTime
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -36,10 +37,11 @@ object DataReconciliation extends SparkSessionWrapper {
performBasicRecon(sourceOrgIDArr,targetOrgIDArr)
val sourceWorkspace = getConfig(sourceEtl,sourceOrgIDArr(0))
val targetWorkspace = getConfig(targetEtl,targetOrgIDArr(0))
val targets = getAllPipelineTargets(sourceWorkspace)
println("Number of tables for recon: "+targets.length)
println(targets.foreach(t => println(t.name)))
val report = runRecon(targets, sourceEtl, sourceOrgIDArr, targetEtl)
val sourceTargets = getAllPipelineTargets(sourceWorkspace)
val targetTargets = getAllPipelineTargets(targetWorkspace)
println("Number of tables for recon: "+sourceTargets.length)
println(sourceTargets.foreach(t => println(t.name)))
val report = runRecon(sourceTargets,targetTargets, sourceEtl, sourceOrgIDArr, targetEtl)
val reconRunId: String = java.util.UUID.randomUUID.toString
val etlStoragePrefix = targetWorkspace.getConfig.etlDataPathPrefix.substring(0, targetWorkspace.getConfig.etlDataPathPrefix.length - 13)
saveReconReport(report, etlStoragePrefix, "ReconReport", reconRunId)
Expand All @@ -59,18 +61,27 @@ object DataReconciliation extends SparkSessionWrapper {
* @param targetEtl
* @return
*/
private def hashValidation(target: PipelineTable,orgId: String, sourceEtl:String,targetEtl:String ):ReconReport ={
private def hashValidation(sourceTarget: PipelineTable,targetTarget: PipelineTable,orgId: String, sourceEtl:String,targetEtl:String ):ReconReport ={
val reconType = "Validation by hashing"
try {
val sourceQuery = getQuery(s"""${target.tableFullName}""", orgId)
val sourceTable = hashAllColumns(getTableDF(sourceQuery,target))
val targetQuery = getQuery(s"""${target.tableFullName.replaceAll(sourceEtl,targetEtl)}""", orgId)
val targetTable = hashAllColumns(getTableDF(targetQuery,target))
val sourceCount = sourceTable.count()
val targetCount = targetTable.count()
val missingSourceCount = targetTable.exceptAll(sourceTable).count()
val missingTargetCount = sourceTable.exceptAll(targetTable).count()
val commonDataCount = sourceTable.intersectAll(targetTable).count()
val sourceQuery = getQuery(s"""${sourceTarget.tableFullName}""", orgId)
val targetQuery = getQuery(s"""${targetTarget.tableFullName}""", orgId)
val sourceDF = sourceTarget.asDF.filter(col("organization_id") === orgId)
val targetDF = targetTarget.asDF.filter(col("organization_id") === orgId)
val commonColumns = sourceDF.columns.intersect(targetDF.columns)
val colInSourceNotInTarget = sourceDF.columns.diff(targetDF.columns).mkString(",")
val colInTargetNotInSource = targetDF.columns.diff(sourceDF.columns).mkString(",")
val sourceTable = hashAllColumns(getTableDF(sourceDF.select(commonColumns.map(col):_*),sourceTarget))
val targetTable = hashAllColumns(getTableDF(targetDF.select(commonColumns.map(col):_*),targetTarget))
val sumSource = sourceTable.withColumn("hash", sha2(concat_ws("||", sourceTable.columns.map(col): _*), 256)).select("hash")
val sumTarget = targetTable.withColumn("hash", sha2(concat_ws("||", targetTable.columns.map(col): _*), 256)).select("hash")
sourceTable.unpersist()
targetTable.unpersist()
val sourceCount = sumSource.count()
val targetCount = sumTarget.count()
val missingSourceCount = sumTarget.exceptAll(sumSource).count()
val missingTargetCount = sumSource.exceptAll(sumTarget).count()
val commonDataCount = sumSource.intersectAll(sumTarget).count()
val deviationFactor = {
if ((missingSourceCount + missingTargetCount) == 0) {
1
Expand Down Expand Up @@ -100,7 +111,7 @@ object DataReconciliation extends SparkSessionWrapper {
reconType = reconType,
sourceDB = sourceEtl,
targetDB = targetEtl,
tableName = target.name,
tableName = targetTarget.name,
sourceCount = Some(sourceCount),
targetCount = Some(targetCount),
missingInSource = Some(missingSourceCount),
Expand All @@ -109,6 +120,8 @@ object DataReconciliation extends SparkSessionWrapper {
deviationPercentage = Some(deviation),
sourceQuery = Some(sourceQuery),
targetQuery = Some(targetQuery),
sourceOnlyColumns = Some(colInSourceNotInTarget),
targetOnlyColumns = Some(colInTargetNotInSource),
errorMsg = Some(""))

} catch {
Expand All @@ -120,7 +133,7 @@ object DataReconciliation extends SparkSessionWrapper {
reconType = reconType,
sourceDB = sourceEtl,
targetDB = targetEtl,
tableName = target.tableFullName,
tableName = targetTarget.tableFullName,
errorMsg = Some(fullMsg)
)
}
Expand All @@ -129,6 +142,7 @@ object DataReconciliation extends SparkSessionWrapper {

}


/**
* This method runs the reconciliation for all targets in parallel.
*
Expand All @@ -138,16 +152,35 @@ object DataReconciliation extends SparkSessionWrapper {
* @param targetEtl : ETL name of the current version of OW
* @return Array of ReconReport
*/
private[overwatch] def runRecon(targets: ParArray[PipelineTable] ,
private[overwatch] def runRecon(sourceTargets: ParArray[PipelineTable] ,
targetTargets: ParArray[PipelineTable],
sourceEtl:String,
sourceOrgIDArr: Array[String],
targetEtl:String,
):Array[ReconReport]={
spark.conf.set("spark.sql.legacy.allowHashOnMapType","true")
val reconStatus: ArrayBuffer[ReconReport] = new ArrayBuffer[ReconReport]()
sourceOrgIDArr.foreach(orgId=> {
targets.foreach(target => {
reconStatus.append(hashValidation(target, orgId, sourceEtl, targetEtl))
sourceTargets.foreach(st => {
println("performing recon for"+st.name)
var tableFound = false
targetTargets.foreach(tt =>
if(tt.name == st.name)
{
tableFound = true
reconStatus.append(hashValidation(st,tt, orgId, sourceEtl, targetEtl))
}
)
if(!tableFound){
reconStatus.append(ReconReport(
workspaceId = orgId,
reconType = "Validation by hashing",
sourceDB = sourceEtl,
targetDB = targetEtl,
tableName = st.name,
errorMsg = Some("Table not found in target")
))
}
})
}
)
Expand Down Expand Up @@ -204,15 +237,15 @@ object DataReconciliation extends SparkSessionWrapper {
* @param target : PipelineTable object
* @return DataFrame
*/
private def getTableDF(query: String,target: PipelineTable):DataFrame = {
private def getTableDF(df: DataFrame,target: PipelineTable):DataFrame = {
try{
val excludedCol = target.excludedReconColumn
val dropCol = excludedCol ++ Array("Overwatch_RunID", "Pipeline_SnapTS", "__overwatch_ctrl_noise")
val filterDF = spark.sql(query).drop(dropCol: _ *)
val filterDF = df.drop(dropCol: _ *)
filterDF
}catch {
case exception: Exception =>
println(s"""Exception: Unable to run the query ${query}"""+exception.getMessage)
println(s"""Exception: Unable filter dataframe ${target.tableFullName}"""+exception.getMessage)
spark.emptyDataFrame
}

Expand Down

0 comments on commit 25671b7

Please sign in to comment.