Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDK-java: adding a generationId to SqlOperations.truncateTableQuery #41953

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ abstract class AzureBlobStorageStreamCopier(
}

@Throws(Exception::class)
override fun createDestinationTable(): String? {
override fun createDestinationTable(): String {
@Suppress("DEPRECATION") val destTableName = nameTransformer.getRawTableName(streamName)
LOGGER.info { "Preparing table $destTableName in destination." }
sqlOperations.createTableIfNotExists(db, schemaName, destTableName)
Expand All @@ -167,7 +167,7 @@ abstract class AzureBlobStorageStreamCopier(
}

@Throws(Exception::class)
override fun generateMergeStatement(destTableName: String?): String {
override fun generateMergeStatement(destTableName: String): String {
LOGGER.info {
"Preparing to merge tmp table $tmpTableName to dest table: $destTableName, schema: $schemaName, in destination."
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ interface SqlOperations {
* @param tableName Name of table
* @return Query
*/
fun truncateTableQuery(database: JdbcDatabase?, schemaName: String?, tableName: String?): String
fun truncateTableQuery(database: JdbcDatabase, schemaName: String, tableName: String): String

/**
* Insert records into table. Assumes the table exists.
Expand All @@ -96,7 +96,9 @@ interface SqlOperations {
database: JdbcDatabase,
records: List<PartialAirbyteMessage>,
schemaName: String?,
tableName: String?
tableName: String?,
syncId: Long,
generationId: Long,
)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ interface StreamCopier {
*
* @return the name of the destination table
*/
@Throws(Exception::class) fun createDestinationTable(): String?
@Throws(Exception::class) fun createDestinationTable(): String

/** Generates a merge SQL statement from the temporary table to the final table. */
@Throws(Exception::class) fun generateMergeStatement(destTableName: String?): String
@Throws(Exception::class) fun generateMergeStatement(destTableName: String): String

/**
* Cleans up the copier by removing the staging file and dropping the temporary table after
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,8 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
listOf(dummyRecord),
outputSchema,
outputTableName,
-1,
-1,
)
}
} finally {
Expand Down Expand Up @@ -456,7 +458,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
.withRecord(
PartialAirbyteRecordMessage()
.withStream("stream1")
.withEmittedAt(1602637589000L),
.withEmittedAt(1602637589000L)
)
.withSerialized(dummyDataToInsert.toString())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,11 @@ object JdbcBufferedConsumerFactory {
when (writeConfig.minimumGenerationId) {
writeConfig.generationId ->
queryList.add(
sqlOperations.truncateTableQuery(database, schemaName, dstTableName)
sqlOperations.truncateTableQuery(
database,
schemaName,
dstTableName,
)
)
0L -> {}
else ->
Expand Down Expand Up @@ -213,7 +217,9 @@ object JdbcBufferedConsumerFactory {
database,
ArrayList(records),
writeConfig.rawNamespace,
writeConfig.rawTableName
writeConfig.rawTableName,
writeConfig.syncId,
writeConfig.generationId,
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ object JdbcCheckOperations {
listOf(dummyRecord),
outputSchema,
outputTableName,
-1,
-1,
)
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ abstract class JdbcSqlOperations : SqlOperations {
}

override fun truncateTableQuery(
database: JdbcDatabase?,
schemaName: String?,
tableName: String?
database: JdbcDatabase,
schemaName: String,
tableName: String,
): String {
return String.format("TRUNCATE TABLE %s.%s;\n", schemaName, tableName)
}
Expand Down Expand Up @@ -225,10 +225,12 @@ abstract class JdbcSqlOperations : SqlOperations {
database: JdbcDatabase,
records: List<PartialAirbyteMessage>,
schemaName: String?,
tableName: String?
tableName: String?,
syncId: Long,
generationId: Long
) {
if (isDestinationV2) {
insertRecordsInternalV2(database, records, schemaName, tableName)
insertRecordsInternalV2(database, records, schemaName, tableName, syncId, generationId)
} else {
insertRecordsInternal(database, records, schemaName, tableName)
}
Expand All @@ -247,7 +249,9 @@ abstract class JdbcSqlOperations : SqlOperations {
database: JdbcDatabase,
records: List<PartialAirbyteMessage>,
schemaName: String?,
tableName: String?
tableName: String?,
syncId: Long,
generationId: Long,
)

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ object GeneralStagingFunctions {
when (writeConfig.minimumGenerationId) {
writeConfig.generationId ->
queryList.add(
stagingOperations.truncateTableQuery(database, schema, dstTableName)
stagingOperations.truncateTableQuery(
database,
schema,
dstTableName,
)
)
0L -> {}
else ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ class TestJdbcSqlOperations : JdbcSqlOperations() {
database: JdbcDatabase,
records: List<PartialAirbyteMessage>,
schemaName: String?,
tableName: String?
tableName: String?,
syncId: Long,
generationId: Long,
) {
// Not required for the testing
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ abstract class GcsStreamCopier(
}

@Throws(Exception::class)
override fun createDestinationTable(): String? {
override fun createDestinationTable(): String {
val destTableName = @Suppress("deprecation") nameTransformer.getRawTableName(streamName)
LOGGER.info { "Preparing table $destTableName in destination." }
sqlOperations.createTableIfNotExists(db, schemaName, destTableName)
Expand All @@ -175,7 +175,7 @@ abstract class GcsStreamCopier(
}

@Throws(Exception::class)
override fun generateMergeStatement(destTableName: String?): String {
override fun generateMergeStatement(destTableName: String): String {
LOGGER.info {
"Preparing to merge tmp table $tmpTableName to dest table: $destTableName, schema: $schemaName, in destination."
}
Expand Down
Loading