Skip to content

Commit

Permalink
Flink: Backport: Flink: Refactor sink tests to use HadoopCatalogResou…
Browse files Browse the repository at this point in the history
…rce (apache#6602)
  • Loading branch information
pvary authored and krvikash committed Mar 16, 2023
1 parent 65e0f13 commit a782268
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@

import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.rules.ExternalResource;
import org.junit.rules.TemporaryFolder;
Expand All @@ -30,9 +34,9 @@ public class HadoopCatalogResource extends ExternalResource {
protected final String database;
protected final String tableName;

protected HadoopCatalog catalog;
protected Catalog catalog;
protected CatalogLoader catalogLoader;
protected String warehouse;
protected String location;
protected TableLoader tableLoader;

public HadoopCatalogResource(TemporaryFolder temporaryFolder, String database, String tableName) {
Expand All @@ -47,23 +51,39 @@ protected void before() throws Throwable {
Assert.assertTrue(warehouseFile.delete());
// before variables
this.warehouse = "file:" + warehouseFile;
Configuration hadoopConf = new Configuration();
this.catalog = new HadoopCatalog(hadoopConf, warehouse);
this.location = String.format("%s/%s/%s", warehouse, database, tableName);
this.tableLoader = TableLoader.fromHadoopTable(location);
this.catalogLoader =
CatalogLoader.hadoop(
"hadoop",
new Configuration(),
ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouse));
this.catalog = catalogLoader.loadCatalog();
this.tableLoader =
TableLoader.fromCatalog(catalogLoader, TableIdentifier.of(database, tableName));
}

@Override
protected void after() {}
protected void after() {
try {
catalog.dropTable(TableIdentifier.of(database, tableName));
((HadoopCatalog) catalog).close();
tableLoader.close();
} catch (Exception e) {
throw new RuntimeException("Failed to close catalog resource");
}
}

public TableLoader tableLoader() {
return tableLoader;
}

public HadoopCatalog catalog() {
public Catalog catalog() {
return catalog;
}

public CatalogLoader catalogLoader() {
return catalogLoader;
}

public String warehouse() {
return warehouse;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,6 @@ protected void before() throws Throwable {
tableLoader.open();
}

@Override
protected void after() {
try {
catalog.dropTable(TableIdentifier.of(database, tableName));
catalog.close();
tableLoader.close();
} catch (Exception e) {
throw new RuntimeException("Failed to close catalog resource");
}
super.after();
}

public Table table() {
return table;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.flink.sink;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
Expand All @@ -36,12 +35,16 @@
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.HadoopCatalogResource;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.BoundedTestSource;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand All @@ -50,6 +53,7 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
Expand All @@ -64,12 +68,15 @@ public class TestFlinkIcebergSink {

@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

@Rule
public final HadoopCatalogResource catalogResource =
new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);

private static final TypeInformation<Row> ROW_TYPE_INFO =
new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
private static final DataFormatConverters.RowConverter CONVERTER =
new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());

private String tablePath;
private Table table;
private StreamExecutionEnvironment env;
private TableLoader tableLoader;
Expand Down Expand Up @@ -104,14 +111,16 @@ public TestFlinkIcebergSink(String format, int parallelism, boolean partitioned)

@Before
public void before() throws IOException {
File folder = TEMPORARY_FOLDER.newFolder();
String warehouse = folder.getAbsolutePath();

tablePath = warehouse.concat("/test");
Assert.assertTrue("Should create the table path correctly.", new File(tablePath).mkdir());

Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
table = SimpleDataUtil.createTable(tablePath, props, partitioned);
table =
catalogResource
.catalog()
.createTable(
TestFixtures.TABLE_IDENTIFIER,
SimpleDataUtil.SCHEMA,
partitioned
? PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build()
: PartitionSpec.unpartitioned(),
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()));

env =
StreamExecutionEnvironment.getExecutionEnvironment(
Expand All @@ -120,7 +129,7 @@ public void before() throws IOException {
.setParallelism(parallelism)
.setMaxParallelism(parallelism);

tableLoader = TableLoader.fromHadoopTable(tablePath);
tableLoader = catalogResource.tableLoader();
}

private List<RowData> convertToRowData(List<Row> rows) {
Expand Down Expand Up @@ -148,7 +157,7 @@ public void testWriteRowData() throws Exception {
env.execute("Test Iceberg DataStream");

// Assert the iceberg table's records.
SimpleDataUtil.assertTableRows(tablePath, convertToRowData(rows));
SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
}

private List<Row> createRows(String prefix) {
Expand Down Expand Up @@ -180,7 +189,7 @@ private void testWriteRow(TableSchema tableSchema, DistributionMode distribution
// Execute the program.
env.execute("Test Iceberg DataStream.");

SimpleDataUtil.assertTableRows(tablePath, convertToRowData(rows));
SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
}

private int partitionFiles(String partition) throws IOException {
Expand Down Expand Up @@ -280,15 +289,31 @@ public void testShuffleByPartitionWithSchema() throws Exception {
public void testTwoSinksInDisjointedDAG() throws Exception {
Map<String, String> props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());

String leftTablePath = TEMPORARY_FOLDER.newFolder().getAbsolutePath().concat("/left");
Assert.assertTrue("Should create the table path correctly.", new File(leftTablePath).mkdir());
Table leftTable = SimpleDataUtil.createTable(leftTablePath, props, partitioned);
TableLoader leftTableLoader = TableLoader.fromHadoopTable(leftTablePath);

String rightTablePath = TEMPORARY_FOLDER.newFolder().getAbsolutePath().concat("/right");
Assert.assertTrue("Should create the table path correctly.", new File(rightTablePath).mkdir());
Table rightTable = SimpleDataUtil.createTable(rightTablePath, props, partitioned);
TableLoader rightTableLoader = TableLoader.fromHadoopTable(rightTablePath);
Table leftTable =
catalogResource
.catalog()
.createTable(
TableIdentifier.of("left"),
SimpleDataUtil.SCHEMA,
partitioned
? PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build()
: PartitionSpec.unpartitioned(),
props);
TableLoader leftTableLoader =
TableLoader.fromCatalog(catalogResource.catalogLoader(), TableIdentifier.of("left"));

Table rightTable =
catalogResource
.catalog()
.createTable(
TableIdentifier.of("right"),
SimpleDataUtil.SCHEMA,
partitioned
? PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build()
: PartitionSpec.unpartitioned(),
props);
TableLoader rightTableLoader =
TableLoader.fromCatalog(catalogResource.catalogLoader(), TableIdentifier.of("right"));

env =
StreamExecutionEnvironment.getExecutionEnvironment(
Expand Down Expand Up @@ -330,8 +355,8 @@ public void testTwoSinksInDisjointedDAG() throws Exception {
// Execute the program.
env.execute("Test Iceberg DataStream.");

SimpleDataUtil.assertTableRows(leftTablePath, convertToRowData(leftRows));
SimpleDataUtil.assertTableRows(rightTablePath, convertToRowData(rightRows));
SimpleDataUtil.assertTableRows(leftTable, convertToRowData(leftRows));
SimpleDataUtil.assertTableRows(rightTable, convertToRowData(rightRows));

leftTable.refresh();
Assert.assertNull(leftTable.currentSnapshot().summary().get("flink.test"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.flink.sink;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
Expand All @@ -36,12 +35,13 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableTestBase;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.HadoopCatalogResource;
import org.apache.iceberg.flink.MiniClusterResource;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestTableLoader;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.BoundedTestSource;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand All @@ -53,20 +53,25 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestFlinkIcebergSinkV2 extends TableTestBase {
public class TestFlinkIcebergSinkV2 {

@ClassRule
public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
MiniClusterResource.createWithClassloaderCheckDisabled();

@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

@Rule
public final HadoopCatalogResource catalogResource =
new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);

private static final int FORMAT_V2 = 2;
private static final TypeInformation<Row> ROW_TYPE_INFO =
new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
Expand All @@ -86,8 +91,9 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
private final boolean partitioned;
private final String writeDistributionMode;

private Table table;
private StreamExecutionEnvironment env;
private TestTableLoader tableLoader;
private TableLoader tableLoader;

@Parameterized.Parameters(
name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}, WriteDistributionMode ={3}")
Expand All @@ -110,27 +116,28 @@ public static Object[][] parameters() {

public TestFlinkIcebergSinkV2(
String format, int parallelism, boolean partitioned, String writeDistributionMode) {
super(FORMAT_V2);
this.format = FileFormat.fromString(format);
this.parallelism = parallelism;
this.partitioned = partitioned;
this.writeDistributionMode = writeDistributionMode;
}

@Before
public void setupTable() throws IOException {
this.tableDir = temp.newFolder();
this.metadataDir = new File(tableDir, "metadata");
Assert.assertTrue(tableDir.delete());

if (!partitioned) {
table = create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned());
} else {
table =
create(
SimpleDataUtil.SCHEMA,
PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build());
}
public void setupTable() {
table =
catalogResource
.catalog()
.createTable(
TestFixtures.TABLE_IDENTIFIER,
SimpleDataUtil.SCHEMA,
partitioned
? PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build()
: PartitionSpec.unpartitioned(),
ImmutableMap.of(
TableProperties.DEFAULT_FILE_FORMAT,
format.name(),
TableProperties.FORMAT_VERSION,
String.valueOf(FORMAT_V2)));

table
.updateProperties()
Expand All @@ -145,10 +152,10 @@ public void setupTable() throws IOException {
.setParallelism(parallelism)
.setMaxParallelism(parallelism);

tableLoader = new TestTableLoader(tableDir.getAbsolutePath());
tableLoader = catalogResource.tableLoader();
}

private List<Snapshot> findValidSnapshots(Table table) {
private List<Snapshot> findValidSnapshots() {
List<Snapshot> validSnapshots = Lists.newArrayList();
for (Snapshot snapshot : table.snapshots()) {
if (snapshot.allManifests(table.io()).stream()
Expand Down Expand Up @@ -181,7 +188,7 @@ private void testChangeLogs(
env.execute("Test Iceberg Change-Log DataStream.");

table.refresh();
List<Snapshot> snapshots = findValidSnapshots(table);
List<Snapshot> snapshots = findValidSnapshots();
int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
Assert.assertEquals(
"Should have the expected snapshot number", expectedSnapshotNum, snapshots.size());
Expand Down
Loading

0 comments on commit a782268

Please sign in to comment.