Skip to content

Commit

Permalink
cdk-java: add fields to WriteConfig (#41952)
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Jul 22, 2024
1 parent d99aacc commit 0c8b3dd
Show file tree
Hide file tree
Showing 17 changed files with 151 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ object JdbcBufferedConsumerFactory {
namingResolver: NamingConventionTransformer,
config: JsonNode,
schemaRequired: Boolean,
parsedCatalog: ParsedCatalog
parsedCatalog: ParsedCatalog,
): List<WriteConfig> {
if (schemaRequired) {
Preconditions.checkState(
Expand All @@ -99,12 +99,13 @@ object JdbcBufferedConsumerFactory {
)
}
return parsedCatalog.streams
.map { parsedStreamToWriteConfig(namingResolver).apply(it) }
.map { parsedStreamToWriteConfig(namingResolver, rawTableSuffix = "").apply(it) }
.toList()
}

private fun parsedStreamToWriteConfig(
namingResolver: NamingConventionTransformer
namingResolver: NamingConventionTransformer,
rawTableSuffix: String,
): Function<StreamConfig, WriteConfig> {
return Function { streamConfig: StreamConfig ->
// TODO We should probably replace WriteConfig with StreamConfig?
Expand All @@ -119,7 +120,11 @@ object JdbcBufferedConsumerFactory {
@Suppress("deprecation")
namingResolver.getTmpTableName(streamConfig.id.rawNamespace),
streamConfig.id.rawName,
streamConfig.destinationSyncMode,
streamConfig.postImportAction,
streamConfig.syncId,
streamConfig.generationId,
streamConfig.minimumGenerationId,
rawTableSuffix
)
}
}
Expand Down Expand Up @@ -150,23 +155,22 @@ object JdbcBufferedConsumerFactory {
}
val queryList: MutableList<String> = ArrayList()
for (writeConfig in writeConfigs) {
val schemaName = writeConfig.outputSchemaName
val dstTableName = writeConfig.outputTableName
val schemaName = writeConfig.rawNamespace
val dstTableName = writeConfig.rawTableName
LOGGER.info {
"Preparing raw table in destination started for stream ${writeConfig.streamName}. schema: $schemaName, table name: $dstTableName"
}
sqlOperations.createSchemaIfNotExists(database, schemaName)
sqlOperations.createTableIfNotExists(database, schemaName, dstTableName)
when (writeConfig.syncMode) {
DestinationSyncMode.OVERWRITE ->
when (writeConfig.minimumGenerationId) {
writeConfig.generationId ->
queryList.add(
sqlOperations.truncateTableQuery(database, schemaName, dstTableName)
)
DestinationSyncMode.APPEND,
DestinationSyncMode.APPEND_DEDUP -> {}
0L -> {}
else ->
throw IllegalStateException(
"Unrecognized sync mode: " + writeConfig.syncMode
"Invalid minimumGenerationId ${writeConfig.minimumGenerationId} for stream ${writeConfig.streamName}. generationId=${writeConfig.generationId}"
)
}
}
Expand Down Expand Up @@ -208,8 +212,8 @@ object JdbcBufferedConsumerFactory {
sqlOperations.insertRecords(
database,
ArrayList(records),
writeConfig.outputSchemaName,
writeConfig.outputTableName
writeConfig.rawNamespace,
writeConfig.rawTableName
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
*/
package io.airbyte.cdk.integrations.destination.jdbc

import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.airbyte.integrations.base.destination.typing_deduping.ImportType
import java.time.Instant

/**
* Write configuration POJO (plain old java object) for all destinations extending
* [AbstractJdbcDestination].
*/
class WriteConfig
data class WriteConfig
@JvmOverloads
constructor(
val streamName: String,
Expand All @@ -20,26 +20,13 @@ constructor(
* @return
*/
val namespace: String?,
val outputSchemaName: String,
val rawNamespace: String,
val tmpTableName: String?,
val outputTableName: String?,
val syncMode: DestinationSyncMode,
val writeDatetime: Instant = Instant.now()
) {
override fun toString(): String {
return "WriteConfig{" +
"streamName=" +
streamName +
", namespace=" +
namespace +
", outputSchemaName=" +
outputSchemaName +
", tmpTableName=" +
tmpTableName +
", outputTableName=" +
outputTableName +
", syncMode=" +
syncMode +
'}'
}
}
val rawTableName: String,
val postImportAction: ImportType,
val syncId: Long,
val generationId: Long,
val minimumGenerationId: Long,
val rawTableSuffix: String,
val writeDatetime: Instant = Instant.now(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,11 @@ import com.google.common.annotations.VisibleForTesting
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType
import io.airbyte.integrations.base.destination.typing_deduping.*
import io.airbyte.integrations.base.destination.typing_deduping.Array
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId
import io.airbyte.integrations.base.destination.typing_deduping.Sql
import io.airbyte.integrations.base.destination.typing_deduping.Sql.Companion.of
import io.airbyte.integrations.base.destination.typing_deduping.Sql.Companion.transactionally
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import io.airbyte.integrations.base.destination.typing_deduping.StreamId.Companion.concatenateRawTableName
import io.airbyte.integrations.base.destination.typing_deduping.Struct
import io.airbyte.integrations.base.destination.typing_deduping.Union
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
import io.airbyte.protocol.models.v0.DestinationSyncMode
import java.sql.Timestamp
import java.time.Instant
import java.util.Locale
Expand Down Expand Up @@ -237,13 +227,13 @@ constructor(

@VisibleForTesting
fun rawTableCondition(
syncMode: DestinationSyncMode,
postImportAction: ImportType,
isCdcDeletedAtPresent: Boolean,
minRawTimestamp: Optional<Instant>
): Condition {
var condition: Condition =
DSL.field(DSL.name(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT)).isNull()
if (syncMode == DestinationSyncMode.APPEND_DEDUP) {
if (postImportAction == ImportType.DEDUPE) {
if (isCdcDeletedAtPresent) {
condition = condition.or(cdcDeletedAtNotNullCondition())
}
Expand Down Expand Up @@ -447,7 +437,7 @@ constructor(
streamConfig.columns,
getFinalTableMetaColumns(false),
rawTableCondition(
streamConfig.destinationSyncMode,
streamConfig.postImportAction,
streamConfig.columns.containsKey(cdcDeletedAtColumn),
minRawTimestamp,
),
Expand Down Expand Up @@ -508,7 +498,7 @@ constructor(
else ""
val checkpointStmt = checkpointRawTable(rawSchema, rawTable, minRawTimestamp)

if (streamConfig.destinationSyncMode != DestinationSyncMode.APPEND_DEDUP) {
if (streamConfig.postImportAction == ImportType.APPEND) {
return transactionally(insertStmt, checkpointStmt)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseF
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction
import io.airbyte.cdk.integrations.destination.jdbc.WriteConfig
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.airbyte.protocol.models.v0.StreamDescriptor
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.*
Expand Down Expand Up @@ -44,16 +43,16 @@ object GeneralStagingFunctions {
// Create raw tables
val queryList: MutableList<String> = ArrayList()
for (writeConfig in writeConfigs) {
val schema = writeConfig.outputSchemaName
val schema = writeConfig.rawNamespace
val stream = writeConfig.streamName
val dstTableName = writeConfig.outputTableName
val dstTableName = writeConfig.rawTableName
val stageName = stagingOperations.getStageName(schema, dstTableName)
val stagingPath =
stagingOperations.getStagingPath(
RANDOM_CONNECTION_ID,
schema,
stream,
writeConfig.outputTableName,
writeConfig.rawTableName,
writeConfig.writeDatetime
)

Expand All @@ -65,16 +64,15 @@ object GeneralStagingFunctions {
stagingOperations.createTableIfNotExists(database, schema, dstTableName)
stagingOperations.createStageIfNotExists(database, stageName)

when (writeConfig.syncMode) {
DestinationSyncMode.OVERWRITE ->
when (writeConfig.minimumGenerationId) {
writeConfig.generationId ->
queryList.add(
stagingOperations.truncateTableQuery(database, schema, dstTableName)
)
DestinationSyncMode.APPEND,
DestinationSyncMode.APPEND_DEDUP -> {}
0L -> {}
else ->
throw IllegalStateException(
"Unrecognized sync mode: " + writeConfig.syncMode
"Invalid minGenerationId ${writeConfig.minimumGenerationId} for stream ${writeConfig.streamName}. GenerationId=${writeConfig.generationId}"
)
}
log.info {
Expand Down Expand Up @@ -143,16 +141,16 @@ object GeneralStagingFunctions {
log.info { "Cleaning up destination started for ${writeConfigs.size} streams" }
typerDeduper.typeAndDedupe(streamSyncSummaries)
for (writeConfig in writeConfigs) {
val schemaName = writeConfig.outputSchemaName
val schemaName = writeConfig.rawNamespace
if (purgeStagingData) {
val stageName =
stagingOperations.getStageName(schemaName, writeConfig.outputTableName)
stagingOperations.getStageName(schemaName, writeConfig.rawTableName)
val stagePath =
stagingOperations.getStagingPath(
RANDOM_CONNECTION_ID,
schemaName,
writeConfig.streamName,
writeConfig.outputTableName,
writeConfig.rawTableName,
writeConfig.writeDatetime
)
log.info {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ object SerialFlush {
}

val writeConfig = pairToWriteConfig.getValue(pair)
val schemaName = writeConfig.outputSchemaName
val stageName = stagingOperations.getStageName(schemaName, writeConfig.outputTableName)
val schemaName = writeConfig.rawNamespace
val stageName = stagingOperations.getStageName(schemaName, writeConfig.rawTableName)
val stagingPath =
stagingOperations.getStagingPath(
RANDOM_CONNECTION_ID,
schemaName,
writeConfig.streamName,
writeConfig.outputTableName,
writeConfig.rawTableName,
writeConfig.writeDatetime
)
try {
Expand All @@ -114,7 +114,7 @@ object SerialFlush {
stageName,
stagingPath,
listOf(stagedFile),
writeConfig.outputTableName,
writeConfig.rawTableName,
schemaName,
stagingOperations,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,29 @@ internal class SerialStagingConsumerFactoryTest {
"source_schema",
"destination_default_schema",
null,
null,
mock()
"output_table_name",
mock(),
-1L,
-1L,
-1L,
"",
),
WriteConfig(
"example_stream",
"source_schema",
"destination_default_schema",
null,
null,
mock()
"output_table_name",
mock(),
-1L,
-1L,
-1L,
"",
)
),
mock(),
)
}

Assertions.assertEquals(
"You are trying to write multiple streams to the same table. Consider switching to a custom namespace format using \${SOURCE_NAMESPACE}, or moving one of them into a separate connection with a different stream prefix. Affected streams: source_schema.example_stream, source_schema.example_stream",
configErrorException.message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ internal class AsyncFlush(
}

val writeConfig: WriteConfig = streamDescToWriteConfig.getValue(streamDescriptor)
val schemaName: String = writeConfig.outputSchemaName
val stageName = stagingOperations!!.getStageName(schemaName, writeConfig.outputTableName)
val schemaName: String = writeConfig.rawNamespace
val stageName = stagingOperations!!.getStageName(schemaName, writeConfig.rawTableName)
val stagingPath =
stagingOperations.getStagingPath(
GeneralStagingFunctions.RANDOM_CONNECTION_ID,
schemaName,
writeConfig.streamName,
writeConfig.outputTableName,
writeConfig.rawTableName,
writeConfig.writeDatetime
)
try {
Expand All @@ -110,7 +110,7 @@ internal class AsyncFlush(
stageName,
stagingPath,
listOf(stagedFile),
writeConfig.outputTableName,
writeConfig.rawTableName,
schemaName,
stagingOperations,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.airbyte.integrations.base.destination.typing_deduping

import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.*

Expand Down Expand Up @@ -44,8 +43,7 @@ abstract class BaseDestinationV1V2Migrator<DialectTableDefinition> : Destination
LOGGER.info {
"Checking whether v1 raw table ${v1RawTable.tableName} in dataset ${v1RawTable.namespace} exists"
}
val syncModeNeedsMigration =
isMigrationRequiredForSyncMode(streamConfig.destinationSyncMode)
val syncModeNeedsMigration = isMigrationRequiredForSyncMode(streamConfig)
val noValidV2RawTableExists = !doesValidV2RawTableAlreadyExist(streamConfig)
val aValidV1RawTableExists =
doesValidV1RawTableExist(v1RawTable.namespace, v1RawTable.tableName)
Expand Down Expand Up @@ -139,8 +137,8 @@ abstract class BaseDestinationV1V2Migrator<DialectTableDefinition> : Destination
* @param destinationSyncMode destination sync mode
* @return whether this is full refresh overwrite
*/
private fun isMigrationRequiredForSyncMode(destinationSyncMode: DestinationSyncMode?): Boolean {
return DestinationSyncMode.OVERWRITE != destinationSyncMode
private fun isMigrationRequiredForSyncMode(streamConfig: StreamConfig): Boolean {
return streamConfig.minimumGenerationId == 0L
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,11 @@ constructor(

return StreamConfig(
sqlGenerator.buildStreamId(stream.stream.namespace, stream.stream.name, rawNamespace),
stream.destinationSyncMode,
when (stream.destinationSyncMode!!) {
DestinationSyncMode.APPEND,
DestinationSyncMode.OVERWRITE -> ImportType.APPEND
DestinationSyncMode.APPEND_DEDUP -> ImportType.DEDUPE
},
primaryKey,
cursor,
columns,
Expand Down
Loading

0 comments on commit 0c8b3dd

Please sign in to comment.