diff --git a/airbyte-cdk/java/airbyte-cdk/azure-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/azure/AzureBlobStorageStreamCopier.kt b/airbyte-cdk/java/airbyte-cdk/azure-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/azure/AzureBlobStorageStreamCopier.kt index 4f7c158d6181..4f2449764571 100644 --- a/airbyte-cdk/java/airbyte-cdk/azure-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/azure/AzureBlobStorageStreamCopier.kt +++ b/airbyte-cdk/java/airbyte-cdk/azure-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/azure/AzureBlobStorageStreamCopier.kt @@ -157,7 +157,7 @@ abstract class AzureBlobStorageStreamCopier( } @Throws(Exception::class) - override fun createDestinationTable(): String? { + override fun createDestinationTable(): String { @Suppress("DEPRECATION") val destTableName = nameTransformer.getRawTableName(streamName) LOGGER.info { "Preparing table $destTableName in destination." } sqlOperations.createTableIfNotExists(db, schemaName, destTableName) @@ -167,7 +167,7 @@ abstract class AzureBlobStorageStreamCopier( } @Throws(Exception::class) - override fun generateMergeStatement(destTableName: String?): String { + override fun generateMergeStatement(destTableName: String): String { LOGGER.info { "Preparing to merge tmp table $tmpTableName to dest table: $destTableName, schema: $schemaName, in destination." } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.kt index 77a028ff31f7..cddc13956142 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.kt @@ -80,7 +80,7 @@ interface SqlOperations { * @param tableName Name of table * @return Query */ - fun truncateTableQuery(database: JdbcDatabase?, schemaName: String?, tableName: String?): String + fun truncateTableQuery(database: JdbcDatabase, schemaName: String, tableName: String): String /** * Insert records into table. Assumes the table exists. @@ -96,7 +96,9 @@ interface SqlOperations { database: JdbcDatabase, records: List, schemaName: String?, - tableName: String? + tableName: String?, + syncId: Long, + generationId: Long, ) /** diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/StreamCopier.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/StreamCopier.kt index b3a7c8d756eb..5d1446b48f6f 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/StreamCopier.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/StreamCopier.kt @@ -45,10 +45,10 @@ interface StreamCopier { * * @return the name of the destination table */ - @Throws(Exception::class) fun createDestinationTable(): String? + @Throws(Exception::class) fun createDestinationTable(): String /** Generates a merge SQL statement from the temporary table to the final table. */ - @Throws(Exception::class) fun generateMergeStatement(destTableName: String?): String + @Throws(Exception::class) fun generateMergeStatement(destTableName: String): String /** * Cleans up the copier by removing the staging file and dropping the temporary table after diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt index dde56527f700..fff521193f45 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt @@ -426,6 +426,8 @@ abstract class AbstractJdbcDestination queryList.add( - sqlOperations.truncateTableQuery(database, schemaName, dstTableName) + sqlOperations.truncateTableQuery( + database, + schemaName, + dstTableName, + ) ) 0L -> {} else -> @@ -213,7 +217,9 @@ object JdbcBufferedConsumerFactory { database, ArrayList(records), writeConfig.rawNamespace, - writeConfig.rawTableName + writeConfig.rawTableName, + writeConfig.syncId, + writeConfig.generationId, ) } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcCheckOperations.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcCheckOperations.kt index 892f6efe200d..d0f39997adbc 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcCheckOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcCheckOperations.kt @@ -75,6 +75,8 @@ object JdbcCheckOperations { listOf(dummyRecord), outputSchema, outputTableName, + -1, + -1, ) } } finally { diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.kt index 9f89861f9b32..7e4dbb2bdb33 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.kt @@ -163,9 +163,9 @@ abstract class JdbcSqlOperations : SqlOperations { } override fun truncateTableQuery( - database: JdbcDatabase?, - schemaName: String?, - tableName: String? + database: JdbcDatabase, + schemaName: String, + tableName: String, ): String { return String.format("TRUNCATE TABLE %s.%s;\n", schemaName, tableName) } @@ -225,10 +225,12 @@ abstract class JdbcSqlOperations : SqlOperations { database: JdbcDatabase, records: List, schemaName: String?, - tableName: String? + tableName: String?, + syncId: Long, + generationId: Long ) { if (isDestinationV2) { - insertRecordsInternalV2(database, records, schemaName, tableName) + insertRecordsInternalV2(database, records, schemaName, tableName, syncId, generationId) } else { insertRecordsInternal(database, records, schemaName, tableName) } @@ -247,7 +249,9 @@ abstract class JdbcSqlOperations : SqlOperations { database: JdbcDatabase, records: List, schemaName: String?, - tableName: String? + tableName: String?, + syncId: Long, + generationId: Long, ) companion object { diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.kt index 706e14f2739e..1966159e382f 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.kt @@ -67,7 +67,11 @@ object GeneralStagingFunctions { when (writeConfig.minimumGenerationId) { writeConfig.generationId -> queryList.add( - stagingOperations.truncateTableQuery(database, schema, dstTableName) + stagingOperations.truncateTableQuery( + database, + schema, + dstTableName, + ) ) 0L -> {} else -> diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/jdbc/TestJdbcSqlOperations.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/jdbc/TestJdbcSqlOperations.kt index e052d2dd8ddf..a6aa536a6aa1 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/jdbc/TestJdbcSqlOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/jdbc/TestJdbcSqlOperations.kt @@ -26,7 +26,9 @@ class TestJdbcSqlOperations : JdbcSqlOperations() { database: JdbcDatabase, records: List, schemaName: String?, - tableName: String? + tableName: String?, + syncId: Long, + generationId: Long, ) { // Not required for the testing } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.kt index 9013a119d779..9d15516a02f5 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.kt @@ -165,7 +165,7 @@ abstract class GcsStreamCopier( } @Throws(Exception::class) - override fun createDestinationTable(): String? { + override fun createDestinationTable(): String { val destTableName = @Suppress("deprecation") nameTransformer.getRawTableName(streamName) LOGGER.info { "Preparing table $destTableName in destination." } sqlOperations.createTableIfNotExists(db, schemaName, destTableName) @@ -175,7 +175,7 @@ abstract class GcsStreamCopier( } @Throws(Exception::class) - override fun generateMergeStatement(destTableName: String?): String { + override fun generateMergeStatement(destTableName: String): String { LOGGER.info { "Preparing to merge tmp table $tmpTableName to dest table: $destTableName, schema: $schemaName, in destination." }