Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Incremental in Bronze Snapshot #805

Closed

Conversation

souravbaner-da
Copy link
Contributor

Enable Incremental in Bronze Snapshot

GeekSheikh and others added 3 commits March 6, 2023 12:19
* initial commit

* Refractor InitializerFunctions.scala

* Refractor InitializerFunctions.scala

* Change Scala Sources Name

* Refractor InitializerFunctions.scala

* Refractor InitializerFunctions.scala

* Added Initializerv2.scala

* Added Initializerv2.scala

* Changed as per Sriram comment

* Changed as per Sriram comment

* dropped Initializer Deprecated

---------

Co-authored-by: geeksheikh <geeksheikh@users.noreply.github.com>
Co-authored-by: Sourav Banerjee <30810740+Sourav692@users.noreply.github.com>
Co-authored-by: Daniel Tomes <10840635+GeekSheikh@users.noreply.github.com>
@souravbaner-da souravbaner-da added the enhancement New feature or request label Mar 8, 2023
@souravbaner-da souravbaner-da added this to the 0.7.2.0 milestone Mar 8, 2023
@souravbaner-da souravbaner-da self-assigned this Mar 8, 2023
@souravbaner-da souravbaner-da linked an issue Mar 8, 2023 that may be closed by this pull request
@@ -95,6 +95,37 @@ class Bronze(_workspace: Workspace, _database: Database, _config: Config)
Helpers.parClone(cloneSpecs)

}
def snapshotStream(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be moved to workspace and called incrementalSnap

currently users backup using workspace.snap after this is merged customers should be able to call workspace.incrementalSnap

@@ -95,6 +95,37 @@ class Bronze(_workspace: Workspace, _database: Database, _config: Config)
Helpers.parClone(cloneSpecs)

}
def snapshotStream(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add code docs on what is expected to be passed in for these variables and an English explanation of what this function does.

@@ -95,6 +95,37 @@ class Bronze(_workspace: Workspace, _database: Database, _config: Config)
Helpers.parClone(cloneSpecs)

}
def snapshotStream(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

be sure to confirm that the snapshot backs up ALL data from all organization_ids. This means you need to test on a workspace that has multiple workspaces in the same bronze tables.

@@ -95,6 +95,37 @@ class Bronze(_workspace: Workspace, _database: Database, _config: Config)
Helpers.parClone(cloneSpecs)

}
def snapshotStream(
targetPrefix: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should change the name of this in this func and the existing .snap func. It's too close to target prefix meaning PipelineTable ETLPrefix. I think this should be renamed to something like snapshotRootPath or something to make it more clear.

.filter(_.exists()) // source path must exist
.filterNot(t => cleanExcludes.contains(t.name.toLowerCase))

val finalTargetPathPrefix = s"${targetPrefix}/bckup"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we nesting this into a subdir here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes sense if it were called something like s"${targetPrefix}/data" which would allow for another folder called s"${targetPrefix}/report" where we could persist the snapshot reports.

* @param cloneDetails details required to execute the parallelized clone
* @return
*/
def tableStream(cloneDetails: Seq[CloneDetail]): Seq[CloneReport] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be private[overwatch] I think since I don't think we want to allow use outside.

let's rename the func to make it more clear that it's only a snapStream.

I'm on the fence as to whether we should have a class/object called snapshot and refactor the core logic out of this Helpers section. We could move snap and this new snapStream over to the other location. We could have a Helper function that then instantiates/calls the core logic from Snapshot.scala or something. What do you think?

* @param cloneDetails details required to execute the parallelized clone
* @return
*/
def tableStream(cloneDetails: Seq[CloneDetail]): Seq[CloneReport] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to persist the clone report. This brings more credence to the idea of refactoring the snapshots out of these helper sections to allow for a more complex / controlled workflow to happen. I even think there could be a main class for snapshotting since everyone will be doing this from a scheduled job anyway.

Comment on lines 119 to 122
val cloneSpecs = targetsToSnap.map(t => {
val targetPath = s"${finalTargetPathPrefix}/${t.name.toLowerCase}"
CloneDetail(t.tableLocation, targetPath)
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like reusable code -- move to a function?

cloneSpecs
}

def main(args: Array[String]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add documentation of the function

Helpers.snapStream(cloneSpecs, snapshotRootPath)
}

def buildCloneSpecs(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make it private

@sonarcloud
Copy link

sonarcloud bot commented Apr 5, 2023

Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 1 Code Smell

No Coverage information No Coverage information
0.0% 0.0% Duplication

Copy link
Contributor

@GeekSheikh GeekSheikh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a proof for this? Don't just test that it runs -- please test that the data is incrementally appearing in the output, that it looks correct, and that the restore functionality works as expected.

* @return
*/
private[overwatch] def snap(
bronze: Bronze,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you change this to pipelines and let the user pass in comma delimited list it will be easier for customers to use and it would also allow us to snap any and / or all pipelines.

Suggest you accept similar to MultiWorkspace configs (i.e. "bronze,silver,gold") and then split them, get the pipelines and the necessary tables.

*
* @param arg(0) Source Database Name or Source Remote_OverwatchGlobalPath
* @param arg(1) Target snapshotRootPath
* @param arg(2) Flag to Determine whether the snap is normal batch process or Incremental one.(if "true" then incremental else normal snap)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest changing this to accept, "incremental" or "full" to make it easier to use. Since batch doesn't

*/


def main(args: Array[String]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please create variable names for each of the args at the very beginning and then use those variables throughout the code to make it easier to support / debug.

i.e.
val source = "args(0)"
val targetPath = "args(1)"
...

/**
* Create a backup of the Overwatch datasets
*
* @param arg(0) Source Database Name or Source Remote_OverwatchGlobalPath
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is very confusing. You're accepting either a source database or source path prefix. I'm not sure what to do with Source Remote_OverwatchGlobalPath


val workspace = if (args(0).contains("/")){
val remote_workspace_id = args(3)
val pathToOverwatchGlobalShareData = args(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the path be the etl_prefix or the etl_prefix/global_share? perhaps you called this out in the docs if so, great otherwise, please make sure it's very clear and also be sure to fail if not correct. Don't hardcode global_share.

@@ -472,7 +523,9 @@ object Helpers extends SparkSessionWrapper {
spark.sql(baseCloneStatement)
CloneReport(cloneSpec, baseCloneStatement, msg)
}
case e: Throwable => CloneReport(cloneSpec, stmt, e.getMessage)
case e: Throwable => {
CloneReport(cloneSpec, stmt, e.getMessage)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create an error message using appendStackStrace similar to below

        val failMsg = PipelineFunctions.appendStackStrace(e)
        val msg = s"FAILED: $moduleId-$moduleName Module: API CALL Failed\n$failMsg"

@@ -40,6 +41,7 @@ class InitializeTest extends AnyFunSpec with DataFrameComparer with SparkSession

it ("initializeDatabase function should create both elt and consumer database") {
import spark.implicits._
val sc: SparkContext = spark.sparkContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove this and the import as they are not used and not needed.

CloneReport(cloneSpec, s"Streaming For: ${cloneSpec.source} --> ${cloneSpec.target}", "SUCCESS")
} catch {
case e: Throwable if (e.getMessage.contains("is after the latest commit timestamp of")) => {
val msg = s"SUCCESS WITH WARNINGS: The timestamp provided, ${cloneSpec.asOfTS.get} " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the appendStackStrace method shown in other comment to build this message with the full stack trace as well

Comment on lines +458 to +460
val sourceName = s"${cloneSpec.source}".split("/").takeRight(1).head
val checkPointLocation = if (snapshotRootPath.takeRight(1) == "/") s"${snapshotRootPath}checkpoint/${sourceName}" else s"${snapshotRootPath}/checkpoint/${sourceName}"
val cloneReportPath = if (snapshotRootPath.takeRight(1) == "/") s"${snapshotRootPath}report/" else s"${snapshotRootPath}/clone_report/"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the new path slash helpers introduced in #825 -- these are merged to 0720 now so rebase and you should have access to them.

* @param cloneDetails details required to execute the parallelized clone
* @return
*/
private[overwatch] def snapStream(cloneDetails: Seq[CloneDetail],snapshotRootPath: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move the getQueryListener code to Helpers and update Database.scala -- then reuse this to get a query listener for the stream and provide updates to the logs on query progressions similar to how it's done today in Database.scala. This will provide the user and support with status updates which are great for longer running Trigger.Once streams

private def getQueryListener(query: StreamingQuery, minEventsPerTrigger: Long): StreamingQueryListener = {
val streamManager = new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println("Query started: " + queryStarted.id)
}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
println("Query terminated: " + queryTerminated.id)
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
println("Query made progress: " + queryProgress.progress)
if (config.debugFlag) {
println(query.status.prettyJson)
}
if (queryProgress.progress.numInputRows <= minEventsPerTrigger) {
query.stop()
}
}
}
streamManager
}

@GeekSheikh GeekSheikh modified the milestones: 0.7.2.0, 0.7.3.0 Apr 11, 2023
@souravbaner-da
Copy link
Contributor Author

Duplicate PR already raised from Release-721
#909

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEAT] - Bronze Snapshots - Enable Incremental
5 participants