-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable Incremental in Bronze Snapshot #805
Enable Incremental in Bronze Snapshot #805
Conversation
* initial commit * Refractor InitializerFunctions.scala * Refractor InitializerFunctions.scala * Change Scala Sources Name * Refractor InitializerFunctions.scala * Refractor InitializerFunctions.scala * Added Initializerv2.scala * Added Initializerv2.scala * Changed as per Sriram comment * Changed as per Sriram comment * dropped Initializer Deprecated --------- Co-authored-by: geeksheikh <geeksheikh@users.noreply.github.com> Co-authored-by: Sourav Banerjee <30810740+Sourav692@users.noreply.github.com> Co-authored-by: Daniel Tomes <10840635+GeekSheikh@users.noreply.github.com>
@@ -95,6 +95,37 @@ class Bronze(_workspace: Workspace, _database: Database, _config: Config) | |||
Helpers.parClone(cloneSpecs) | |||
|
|||
} | |||
def snapshotStream( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs to be moved to workspace and called incrementalSnap
currently users backup using workspace.snap
after this is merged customers should be able to call workspace.incrementalSnap
@@ -95,6 +95,37 @@ class Bronze(_workspace: Workspace, _database: Database, _config: Config) | |||
Helpers.parClone(cloneSpecs) | |||
|
|||
} | |||
def snapshotStream( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add code docs on what is expected to be passed in for these variables and an English explanation of what this function does.
@@ -95,6 +95,37 @@ class Bronze(_workspace: Workspace, _database: Database, _config: Config) | |||
Helpers.parClone(cloneSpecs) | |||
|
|||
} | |||
def snapshotStream( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
be sure to confirm that the snapshot backs up ALL data from all organization_ids. This means you need to test on a workspace that has multiple workspaces in the same bronze tables.
@@ -95,6 +95,37 @@ class Bronze(_workspace: Workspace, _database: Database, _config: Config) | |||
Helpers.parClone(cloneSpecs) | |||
|
|||
} | |||
def snapshotStream( | |||
targetPrefix: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should change the name of this in this func and the existing .snap func. It's too close to target prefix meaning PipelineTable ETLPrefix. I think this should be renamed to something like snapshotRootPath
or something to make it more clear.
.filter(_.exists()) // source path must exist | ||
.filterNot(t => cleanExcludes.contains(t.name.toLowerCase)) | ||
|
||
val finalTargetPathPrefix = s"${targetPrefix}/bckup" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we nesting this into a subdir here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this makes sense if it were called something like s"${targetPrefix}/data"
which would allow for another folder called s"${targetPrefix}/report"
where we could persist the snapshot reports.
* @param cloneDetails details required to execute the parallelized clone | ||
* @return | ||
*/ | ||
def tableStream(cloneDetails: Seq[CloneDetail]): Seq[CloneReport] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be private[overwatch] I think since I don't think we want to allow use outside.
let's rename the func to make it more clear that it's only a snapStream.
I'm on the fence as to whether we should have a class/object called snapshot and refactor the core logic out of this Helpers section. We could move snap and this new snapStream
over to the other location. We could have a Helper function that then instantiates/calls the core logic from Snapshot.scala or something. What do you think?
* @param cloneDetails details required to execute the parallelized clone | ||
* @return | ||
*/ | ||
def tableStream(cloneDetails: Seq[CloneDetail]): Seq[CloneReport] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to persist the clone report. This brings more credence to the idea of refactoring the snapshots out of these helper sections to allow for a more complex / controlled workflow to happen. I even think there could be a main class for snapshotting since everyone will be doing this from a scheduled job anyway.
val cloneSpecs = targetsToSnap.map(t => { | ||
val targetPath = s"${finalTargetPathPrefix}/${t.name.toLowerCase}" | ||
CloneDetail(t.tableLocation, targetPath) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks like reusable code -- move to a function?
cloneSpecs | ||
} | ||
|
||
def main(args: Array[String]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add documentation of the function
Helpers.snapStream(cloneSpecs, snapshotRootPath) | ||
} | ||
|
||
def buildCloneSpecs( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make it private
Kudos, SonarCloud Quality Gate passed! 0 Bugs No Coverage information |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a proof for this? Don't just test that it runs -- please test that the data is incrementally appearing in the output, that it looks correct, and that the restore functionality works as expected.
* @return | ||
*/ | ||
private[overwatch] def snap( | ||
bronze: Bronze, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you change this to pipelines and let the user pass in comma delimited list it will be easier for customers to use and it would also allow us to snap any and / or all pipelines.
Suggest you accept similar to MultiWorkspace configs (i.e. "bronze,silver,gold") and then split them, get the pipelines and the necessary tables.
* | ||
* @param arg(0) Source Database Name or Source Remote_OverwatchGlobalPath | ||
* @param arg(1) Target snapshotRootPath | ||
* @param arg(2) Flag to Determine whether the snap is normal batch process or Incremental one.(if "true" then incremental else normal snap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest changing this to accept, "incremental" or "full" to make it easier to use. Since batch doesn't
*/ | ||
|
||
|
||
def main(args: Array[String]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please create variable names for each of the args at the very beginning and then use those variables throughout the code to make it easier to support / debug.
i.e.
val source = "args(0)"
val targetPath = "args(1)"
...
/** | ||
* Create a backup of the Overwatch datasets | ||
* | ||
* @param arg(0) Source Database Name or Source Remote_OverwatchGlobalPath |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is very confusing. You're accepting either a source database or source path prefix. I'm not sure what to do with Source Remote_OverwatchGlobalPath
|
||
val workspace = if (args(0).contains("/")){ | ||
val remote_workspace_id = args(3) | ||
val pathToOverwatchGlobalShareData = args(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the path be the etl_prefix or the etl_prefix/global_share? perhaps you called this out in the docs if so, great otherwise, please make sure it's very clear and also be sure to fail if not correct. Don't hardcode global_share.
@@ -472,7 +523,9 @@ object Helpers extends SparkSessionWrapper { | |||
spark.sql(baseCloneStatement) | |||
CloneReport(cloneSpec, baseCloneStatement, msg) | |||
} | |||
case e: Throwable => CloneReport(cloneSpec, stmt, e.getMessage) | |||
case e: Throwable => { | |||
CloneReport(cloneSpec, stmt, e.getMessage) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create an error message using appendStackStrace
similar to below
val failMsg = PipelineFunctions.appendStackStrace(e)
val msg = s"FAILED: $moduleId-$moduleName Module: API CALL Failed\n$failMsg"
@@ -40,6 +41,7 @@ class InitializeTest extends AnyFunSpec with DataFrameComparer with SparkSession | |||
|
|||
it ("initializeDatabase function should create both elt and consumer database") { | |||
import spark.implicits._ | |||
val sc: SparkContext = spark.sparkContext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove this and the import as they are not used and not needed.
CloneReport(cloneSpec, s"Streaming For: ${cloneSpec.source} --> ${cloneSpec.target}", "SUCCESS") | ||
} catch { | ||
case e: Throwable if (e.getMessage.contains("is after the latest commit timestamp of")) => { | ||
val msg = s"SUCCESS WITH WARNINGS: The timestamp provided, ${cloneSpec.asOfTS.get} " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the appendStackStrace
method shown in other comment to build this message with the full stack trace as well
val sourceName = s"${cloneSpec.source}".split("/").takeRight(1).head | ||
val checkPointLocation = if (snapshotRootPath.takeRight(1) == "/") s"${snapshotRootPath}checkpoint/${sourceName}" else s"${snapshotRootPath}/checkpoint/${sourceName}" | ||
val cloneReportPath = if (snapshotRootPath.takeRight(1) == "/") s"${snapshotRootPath}report/" else s"${snapshotRootPath}/clone_report/" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use the new path slash helpers introduced in #825 -- these are merged to 0720 now so rebase and you should have access to them.
* @param cloneDetails details required to execute the parallelized clone | ||
* @return | ||
*/ | ||
private[overwatch] def snapStream(cloneDetails: Seq[CloneDetail],snapshotRootPath: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move the getQueryListener code to Helpers and update Database.scala -- then reuse this to get a query listener for the stream and provide updates to the logs on query progressions similar to how it's done today in Database.scala. This will provide the user and support with status updates which are great for longer running Trigger.Once streams
overwatch/src/main/scala/com/databricks/labs/overwatch/env/Database.scala
Lines 112 to 133 in 5337cad
private def getQueryListener(query: StreamingQuery, minEventsPerTrigger: Long): StreamingQueryListener = { | |
val streamManager = new StreamingQueryListener() { | |
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { | |
println("Query started: " + queryStarted.id) | |
} | |
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { | |
println("Query terminated: " + queryTerminated.id) | |
} | |
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { | |
println("Query made progress: " + queryProgress.progress) | |
if (config.debugFlag) { | |
println(query.status.prettyJson) | |
} | |
if (queryProgress.progress.numInputRows <= minEventsPerTrigger) { | |
query.stop() | |
} | |
} | |
} | |
streamManager | |
} |
8c9721c
to
f744073
Compare
Duplicate PR already raised from Release-721 |
Enable Incremental in Bronze Snapshot