Skip to content

Commit

Permalink
destination-mysql: implement refreshes
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Jul 16, 2024
1 parent 283b214 commit 3934d7b
Show file tree
Hide file tree
Showing 33 changed files with 198 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolT
import io.airbyte.integrations.base.destination.typing_deduping.BaseSqlGeneratorIntegrationTest
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
import io.github.oshai.kotlinlogging.KotlinLogging
import java.sql.SQLException
import java.util.*
import org.jooq.*
import org.jooq.conf.ParamType
import org.jooq.impl.DSL
import org.jooq.impl.SQLDataType

private val LOGGER = KotlinLogging.logger { }

abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestinationState> :
BaseSqlGeneratorIntegrationTest<DestinationState>() {
protected abstract val database: JdbcDatabase
Expand Down Expand Up @@ -82,6 +85,7 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
}
)
}
LOGGER.info {"executing SQL statement: ${insert.sql}"}
database.execute(insert.getSQL(ParamType.INLINED))
}

Expand Down Expand Up @@ -125,7 +129,8 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
sqlGenerator.columns.rawColumns,
records,
COLUMN_NAME_DATA,
COLUMN_NAME_AB_META
COLUMN_NAME_AB_META,
COLUMN_NAME_AB_GENERATION_ID,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
{
"_airbyte_raw_id": "5ce60e70-98aa-4fe3-8159-67207352c4f0",
"_airbyte_extracted_at": "2023-01-01T00:00:00Z",
"_airbyte_data": {"id1": 1, "id2": 100}
"_airbyte_data": {"id1": 1, "id2": 100},
"_airbyte_generation_id": 42
}
""".trimIndent()
Expand Down Expand Up @@ -611,7 +612,8 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
{
"_airbyte_raw_id": "899d3bc3-7921-44f0-8517-c748a28fe338",
"_airbyte_extracted_at": "2023-01-01T00:00:00Z",
"_airbyte_data": {}
"_airbyte_data": {},
"_airbyte_generation_id": 42
}
""".trimIndent()
Expand All @@ -621,7 +623,8 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
{
"_airbyte_raw_id": "47f46eb6-fcae-469c-a7fc-31d4b9ce7474",
"_airbyte_extracted_at": "2023-01-02T00:00:00Z",
"_airbyte_data": {}
"_airbyte_data": {},
"_airbyte_generation_id": 42
}
""".trimIndent()
Expand Down Expand Up @@ -675,7 +678,8 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
{
"_airbyte_raw_id": "899d3bc3-7921-44f0-8517-c748a28fe338",
"_airbyte_extracted_at": "2023-01-01T12:00:00Z",
"_airbyte_data": {}
"_airbyte_data": {},
"_airbyte_generation_id": 42
}
""".trimIndent()
Expand Down Expand Up @@ -825,7 +829,8 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
"_airbyte_extracted_at": "2022-01-01T00:00:00Z",
"_airbyte_data": {
"string": "foo"
}
},
"_airbyte_generation_id": 42
}
""".trimIndent()
Expand All @@ -837,7 +842,8 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
"_airbyte_extracted_at": "2023-01-01T01:00:00Z",
"_airbyte_data": {
"string": "bar"
}
},
"_airbyte_generation_id": 42
}
""".trimIndent()
Expand Down Expand Up @@ -908,7 +914,7 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina

@Test
@Throws(Exception::class)
fun timestampFormats() {
open fun timestampFormats() {
createRawTable(streamId)
createFinalTable(incrementalAppendStream, "")
insertRawTableRecords(
Expand Down Expand Up @@ -1002,7 +1008,8 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
"id1": 1,
"id2": 100,
"string": "foo"
}
},
"_airbyte_generation_id": 42
}
""".trimIndent()
Expand All @@ -1016,7 +1023,8 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
"id1": 1,
"id2": 100,
"string": "bar"
}
},
"_airbyte_generation_id": 42
}
""".trimIndent()
Expand Down Expand Up @@ -1118,7 +1126,8 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
"id2": 100,
"updated_at": "2023-01-01T00:00:00Z",
"_ab_cdc_deleted_at": "2023-01-01T00:01:00Z"
}
},
"_airbyte_generation_id": 42
}
""".trimIndent()
Expand Down Expand Up @@ -1159,7 +1168,8 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
"id2": 100,
"updated_at": "2023-01-01T00:00:00Z",
"_ab_cdc_deleted_at": "2023-01-01T00:01:00Z"
}
},
"_airbyte_generation_id": 42}
}
""".trimIndent()
Expand Down Expand Up @@ -1316,7 +1326,7 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
*/
@Test
@Throws(Exception::class)
fun softReset() {
open fun softReset() {
createRawTable(streamId)
createFinalTable(cdcIncrementalAppendStream, "")
insertRawTableRecords(
Expand All @@ -1332,7 +1342,8 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
"id1": 1,
"id2": 100,
"_ab_cdc_deleted_at": "2023-01-01T00:01:00Z"
}
},
"_airbyte_generation_id": 42
}
""".trimIndent()
Expand Down Expand Up @@ -1459,7 +1470,9 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
"_airbyte_extracted_at",
"2023-01-01T00:00:00Z",
"_airbyte_data",
java.util.Map.of(str, "bar")
java.util.Map.of(str, "bar"),
"_airbyte_generation_id",
"42"
)
)
)
Expand Down Expand Up @@ -1542,7 +1555,8 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
{
"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc",
"_airbyte_extracted_at": "2023-01-01T00:00:00Z",
"_airbyte_data": {}
"_airbyte_data": {},
"_airbyte_generation_id": 42
}
""".trimIndent()
Expand Down Expand Up @@ -1842,7 +1856,8 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
mapOf(
"_airbyte_raw_id" to "ad3e8c84-e02e-4df4-b146-3d5a007b21b4",
"_airbyte_extracted_at" to "2023-01-01T00:00:00Z",
"_airbyte_data" to mapOf(columnName1 to "foo", columnName2 to "bar")
"_airbyte_data" to mapOf(columnName1 to "foo", columnName2 to "bar"),
"_airbyte_generation_id" to 42,
)
)
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// First batch
{"_airbyte_raw_id": "7e7330a1-42fb-41ec-a955-52f18bd61964", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_loaded_at": "2023-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T02:00:00Z", "string": "alice"}}
{"_airbyte_raw_id": "7e7330a1-42fb-41ec-a955-52f18bd61964", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_loaded_at": "2023-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T02:00:00Z", "string": "alice"}, "_airbyte_generation_id": 42}
// Second batch - this is an outdated deletion record, which should be ignored
{"_airbyte_raw_id": "87ff57d7-41a7-4962-a9dc-d684276283da", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T00:00:00Z", "_ab_cdc_deleted_at": "2023-01-01T00:01:00Z"}}
{"_airbyte_raw_id": "87ff57d7-41a7-4962-a9dc-d684276283da", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T00:00:00Z", "_ab_cdc_deleted_at": "2023-01-01T00:01:00Z"}, "_airbyte_generation_id": 42}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Write raw deletion record from the first batch, which resulted in an empty final table.
// Note the non-null loaded_at - this is to simulate that we previously ran T+D on this record.
{"_airbyte_raw_id": "7e7330a1-42fb-41ec-a955-52f18bd61964", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_loaded_at": "2023-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "_ab_cdc_deleted_at": "2023-01-01T00:01:00Z"}}
{"_airbyte_raw_id": "7e7330a1-42fb-41ec-a955-52f18bd61964", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_loaded_at": "2023-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "_ab_cdc_deleted_at": "2023-01-01T00:01:00Z"}, "_airbyte_generation_id": 42}
// insert raw record from the second record batch - this is an outdated record that should be ignored.
{"_airbyte_raw_id": "87ff57d7-41a7-4962-a9dc-d684276283da", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T00:00:00Z", "string": "alice"}}
{"_airbyte_raw_id": "87ff57d7-41a7-4962-a9dc-d684276283da", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T00:00:00Z", "string": "alice"}, "_airbyte_generation_id": 42}
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
// Records from the first sync (note the non-null loaded_at value)
{"_airbyte_raw_id": "d5790c04-52df-42f3-8f77-a543268822a7", "_airbyte_extracted_at": "2022-12-31T00:00:00Z", "_airbyte_loaded_at": "2022-12-31T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2022-12-01T00:00:00Z", "string": "spooky ghost", "_ab_cdc_deleted_at": null}}
{"_airbyte_raw_id": "3593a002-3ab2-4e67-8b4a-e62f0f9a26f9", "_airbyte_extracted_at": "2022-12-31T00:00:00Z", "_airbyte_loaded_at": "2022-12-31T00:00:01Z", "_airbyte_data": {"id1": 0, "id2": 100, "updated_at": "2022-12-01T01:00:00Z", "string": "zombie", "_ab_cdc_deleted_at": "2022-12-31T00:00:00Z"}}
{"_airbyte_raw_id": "e3b03d92-0f7c-49e5-b203-573dbb7bd1cb", "_airbyte_extracted_at": "2022-12-31T00:00:00Z", "_airbyte_loaded_at": "2022-12-31T00:00:01Z", "_airbyte_data": {"id1": 5, "id2": 100, "updated_at": "2022-12-01T02:00:00Z", "string": "will be deleted", "_ab_cdc_deleted_at": null}}
{"_airbyte_raw_id": "687718e4-a2a9-4233-80a9-9671f83d61ae", "_airbyte_extracted_at": "2022-12-31T00:00:00Z", "_airbyte_loaded_at": "2022-12-31T00:00:01Z", "_airbyte_data": {"id1": 6, "id2": 100, "updated_at": "2022-12-01T03:00:00Z", "string": "should be untouched", "_ab_cdc_deleted_at": null}}
{"_airbyte_raw_id": "d5790c04-52df-42f3-8f77-a543268822a7", "_airbyte_extracted_at": "2022-12-31T00:00:00Z", "_airbyte_loaded_at": "2022-12-31T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2022-12-01T00:00:00Z", "string": "spooky ghost", "_ab_cdc_deleted_at": null}, "_airbyte_generation_id": 42}
{"_airbyte_raw_id": "3593a002-3ab2-4e67-8b4a-e62f0f9a26f9", "_airbyte_extracted_at": "2022-12-31T00:00:00Z", "_airbyte_loaded_at": "2022-12-31T00:00:01Z", "_airbyte_data": {"id1": 0, "id2": 100, "updated_at": "2022-12-01T01:00:00Z", "string": "zombie", "_ab_cdc_deleted_at": "2022-12-31T00:00:00Z"}, "_airbyte_generation_id": 42}
{"_airbyte_raw_id": "e3b03d92-0f7c-49e5-b203-573dbb7bd1cb", "_airbyte_extracted_at": "2022-12-31T00:00:00Z", "_airbyte_loaded_at": "2022-12-31T00:00:01Z", "_airbyte_data": {"id1": 5, "id2": 100, "updated_at": "2022-12-01T02:00:00Z", "string": "will be deleted", "_ab_cdc_deleted_at": null}, "_airbyte_generation_id": 42}
{"_airbyte_raw_id": "687718e4-a2a9-4233-80a9-9671f83d61ae", "_airbyte_extracted_at": "2022-12-31T00:00:00Z", "_airbyte_loaded_at": "2022-12-31T00:00:01Z", "_airbyte_data": {"id1": 6, "id2": 100, "updated_at": "2022-12-01T03:00:00Z", "string": "should be untouched", "_ab_cdc_deleted_at": null}, "_airbyte_generation_id": 42}

// Records from the second sync
{"_airbyte_raw_id": "5f959152-0db0-44b9-b7e4-0d5c44dc2664", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "_ab_cdc_deleted_at": null, "string": "alice"}}
{"_airbyte_raw_id": "a182ff97-8868-42b9-b3cf-c0753fba55e1", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T02:00:00Z", "_ab_cdc_deleted_at": null, "string": "alice2"}}
{"_airbyte_raw_id": "65a6c31f-9ded-4e3d-9339-38ee85b0ae81", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 3, "id2": 100, "updated_at": "2023-01-01T03:00:00Z", "_ab_cdc_deleted_at": null, "string": "bob"}}
{"_airbyte_raw_id": "f7fffb67-cd05-4cf7-bcd9-00f2fe796168", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T04:00:00Z", "_ab_cdc_deleted_at": "2022-12-31T23:59:59Z"}}
{"_airbyte_raw_id": "4d8674a5-eb6e-41ca-a310-69c64c88d101", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 0, "id2": 100, "updated_at": "2023-01-01T05:00:00Z", "_ab_cdc_deleted_at": null, "string": "zombie_returned"}}
{"_airbyte_raw_id": "5f959152-0db0-44b9-b7e4-0d5c44dc2664", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "_ab_cdc_deleted_at": null, "string": "alice"}, "_airbyte_generation_id": 42}
{"_airbyte_raw_id": "a182ff97-8868-42b9-b3cf-c0753fba55e1", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T02:00:00Z", "_ab_cdc_deleted_at": null, "string": "alice2"}, "_airbyte_generation_id": 42}
{"_airbyte_raw_id": "65a6c31f-9ded-4e3d-9339-38ee85b0ae81", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 3, "id2": 100, "updated_at": "2023-01-01T03:00:00Z", "_ab_cdc_deleted_at": null, "string": "bob"}, "_airbyte_generation_id": 42}
{"_airbyte_raw_id": "f7fffb67-cd05-4cf7-bcd9-00f2fe796168", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T04:00:00Z", "_ab_cdc_deleted_at": "2022-12-31T23:59:59Z"}, "_airbyte_generation_id": 42}
{"_airbyte_raw_id": "4d8674a5-eb6e-41ca-a310-69c64c88d101", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 0, "id2": 100, "updated_at": "2023-01-01T05:00:00Z", "_ab_cdc_deleted_at": null, "string": "zombie_returned"}, "_airbyte_generation_id": 42}
// CDC generally outputs an explicit null for deleted_at, but verify that we can also handle the case where deleted_at is unset.
{"_airbyte_raw_id": "f0b59e49-8c74-4101-9f14-cb4d1193fd5a", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T06:00:00Z", "string": "charlie"}}
{"_airbyte_raw_id": "f0b59e49-8c74-4101-9f14-cb4d1193fd5a", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T06:00:00Z", "string": "charlie"}, "_airbyte_generation_id": 42}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{"_airbyte_raw_id": "d7b81af0-01da-4846-a650-cc398986bc99", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "string": "Alice", "struct": {"city": "San Francisco", "state": "CA"}, "integer": 42}}
{"_airbyte_raw_id": "80c99b54-54b4-43bd-b51b-1f67dafa2c52", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}}
{"_airbyte_raw_id": "ad690bfb-c2c2-4172-bd73-a16c86ccbb67", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": 126}}
{"_airbyte_raw_id": "d7b81af0-01da-4846-a650-cc398986bc99", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "string": "Alice", "struct": {"city": "San Francisco", "state": "CA"}, "integer": 42}, "_airbyte_generation_id": 42}
{"_airbyte_raw_id": "80c99b54-54b4-43bd-b51b-1f67dafa2c52", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}, "_airbyte_generation_id": 42}
{"_airbyte_raw_id": "ad690bfb-c2c2-4172-bd73-a16c86ccbb67", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": 126}, "_airbyte_generation_id": 42}
Loading

0 comments on commit 3934d7b

Please sign in to comment.