Skip to content

Commit

Permalink
CDK-java: adding a generationId to SqlOperations.truncateTableQuery (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Jul 22, 2024
1 parent 0c8b3dd commit b1b5190
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ abstract class AzureBlobStorageStreamCopier(
}

@Throws(Exception::class)
override fun createDestinationTable(): String? {
override fun createDestinationTable(): String {
@Suppress("DEPRECATION") val destTableName = nameTransformer.getRawTableName(streamName)
LOGGER.info { "Preparing table $destTableName in destination." }
sqlOperations.createTableIfNotExists(db, schemaName, destTableName)
Expand All @@ -167,7 +167,7 @@ abstract class AzureBlobStorageStreamCopier(
}

@Throws(Exception::class)
override fun generateMergeStatement(destTableName: String?): String {
override fun generateMergeStatement(destTableName: String): String {
LOGGER.info {
"Preparing to merge tmp table $tmpTableName to dest table: $destTableName, schema: $schemaName, in destination."
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ interface SqlOperations {
* @param tableName Name of table
* @return Query
*/
fun truncateTableQuery(database: JdbcDatabase?, schemaName: String?, tableName: String?): String
fun truncateTableQuery(database: JdbcDatabase, schemaName: String, tableName: String): String

/**
* Insert records into table. Assumes the table exists.
Expand All @@ -96,7 +96,9 @@ interface SqlOperations {
database: JdbcDatabase,
records: List<PartialAirbyteMessage>,
schemaName: String?,
tableName: String?
tableName: String?,
syncId: Long,
generationId: Long,
)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ interface StreamCopier {
*
* @return the name of the destination table
*/
@Throws(Exception::class) fun createDestinationTable(): String?
@Throws(Exception::class) fun createDestinationTable(): String

/** Generates a merge SQL statement from the temporary table to the final table. */
@Throws(Exception::class) fun generateMergeStatement(destTableName: String?): String
@Throws(Exception::class) fun generateMergeStatement(destTableName: String): String

/**
* Cleans up the copier by removing the staging file and dropping the temporary table after
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,8 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
listOf(dummyRecord),
outputSchema,
outputTableName,
-1,
-1,
)
}
} finally {
Expand Down Expand Up @@ -456,7 +458,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
.withRecord(
PartialAirbyteRecordMessage()
.withStream("stream1")
.withEmittedAt(1602637589000L),
.withEmittedAt(1602637589000L)
)
.withSerialized(dummyDataToInsert.toString())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,11 @@ object JdbcBufferedConsumerFactory {
when (writeConfig.minimumGenerationId) {
writeConfig.generationId ->
queryList.add(
sqlOperations.truncateTableQuery(database, schemaName, dstTableName)
sqlOperations.truncateTableQuery(
database,
schemaName,
dstTableName,
)
)
0L -> {}
else ->
Expand Down Expand Up @@ -213,7 +217,9 @@ object JdbcBufferedConsumerFactory {
database,
ArrayList(records),
writeConfig.rawNamespace,
writeConfig.rawTableName
writeConfig.rawTableName,
writeConfig.syncId,
writeConfig.generationId,
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ object JdbcCheckOperations {
listOf(dummyRecord),
outputSchema,
outputTableName,
-1,
-1,
)
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ abstract class JdbcSqlOperations : SqlOperations {
}

override fun truncateTableQuery(
database: JdbcDatabase?,
schemaName: String?,
tableName: String?
database: JdbcDatabase,
schemaName: String,
tableName: String,
): String {
return String.format("TRUNCATE TABLE %s.%s;\n", schemaName, tableName)
}
Expand Down Expand Up @@ -225,10 +225,12 @@ abstract class JdbcSqlOperations : SqlOperations {
database: JdbcDatabase,
records: List<PartialAirbyteMessage>,
schemaName: String?,
tableName: String?
tableName: String?,
syncId: Long,
generationId: Long
) {
if (isDestinationV2) {
insertRecordsInternalV2(database, records, schemaName, tableName)
insertRecordsInternalV2(database, records, schemaName, tableName, syncId, generationId)
} else {
insertRecordsInternal(database, records, schemaName, tableName)
}
Expand All @@ -247,7 +249,9 @@ abstract class JdbcSqlOperations : SqlOperations {
database: JdbcDatabase,
records: List<PartialAirbyteMessage>,
schemaName: String?,
tableName: String?
tableName: String?,
syncId: Long,
generationId: Long,
)

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ object GeneralStagingFunctions {
when (writeConfig.minimumGenerationId) {
writeConfig.generationId ->
queryList.add(
stagingOperations.truncateTableQuery(database, schema, dstTableName)
stagingOperations.truncateTableQuery(
database,
schema,
dstTableName,
)
)
0L -> {}
else ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ class TestJdbcSqlOperations : JdbcSqlOperations() {
database: JdbcDatabase,
records: List<PartialAirbyteMessage>,
schemaName: String?,
tableName: String?
tableName: String?,
syncId: Long,
generationId: Long,
) {
// Not required for the testing
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ abstract class GcsStreamCopier(
}

@Throws(Exception::class)
override fun createDestinationTable(): String? {
override fun createDestinationTable(): String {
val destTableName = @Suppress("deprecation") nameTransformer.getRawTableName(streamName)
LOGGER.info { "Preparing table $destTableName in destination." }
sqlOperations.createTableIfNotExists(db, schemaName, destTableName)
Expand All @@ -175,7 +175,7 @@ abstract class GcsStreamCopier(
}

@Throws(Exception::class)
override fun generateMergeStatement(destTableName: String?): String {
override fun generateMergeStatement(destTableName: String): String {
LOGGER.info {
"Preparing to merge tmp table $tmpTableName to dest table: $destTableName, schema: $schemaName, in destination."
}
Expand Down

0 comments on commit b1b5190

Please sign in to comment.