Skip to content

Commit

Permalink
Flink: Port Support inspecting metadata table to Flink 1.14 & 1.15 (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
hililiwei committed Jan 17, 2023
1 parent 046a81a commit 35151fe
Show file tree
Hide file tree
Showing 24 changed files with 3,742 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.EnvironmentContext;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand All @@ -73,6 +74,7 @@
import org.apache.iceberg.flink.util.FlinkPackage;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -149,15 +151,28 @@ public Catalog catalog() {
return icebergCatalog;
}

private Namespace toNamespace(String database) {
/** Append a new level to the base namespace */
private static Namespace appendLevel(Namespace baseNamespace, String newLevel) {
String[] namespace = new String[baseNamespace.levels().length + 1];
System.arraycopy(baseNamespace.levels(), 0, namespace, 0, baseNamespace.levels().length);
namespace[baseNamespace.levels().length] = database;
namespace[baseNamespace.levels().length] = newLevel;
return Namespace.of(namespace);
}

TableIdentifier toIdentifier(ObjectPath path) {
return TableIdentifier.of(toNamespace(path.getDatabaseName()), path.getObjectName());
String objectName = path.getObjectName();
List<String> tableName = Splitter.on('$').splitToList(objectName);

if (tableName.size() == 1) {
return TableIdentifier.of(
appendLevel(baseNamespace, path.getDatabaseName()), path.getObjectName());
} else if (tableName.size() == 2 && MetadataTableType.from(tableName.get(1)) != null) {
return TableIdentifier.of(
appendLevel(appendLevel(baseNamespace, path.getDatabaseName()), tableName.get(0)),
tableName.get(1));
} else {
throw new IllegalArgumentException("Illegal table name:" + objectName);
}
}

@Override
Expand All @@ -183,7 +198,8 @@ public CatalogDatabase getDatabase(String databaseName)
} else {
try {
Map<String, String> metadata =
Maps.newHashMap(asNamespaceCatalog.loadNamespaceMetadata(toNamespace(databaseName)));
Maps.newHashMap(
asNamespaceCatalog.loadNamespaceMetadata(appendLevel(baseNamespace, databaseName)));
String comment = metadata.remove("comment");
return new CatalogDatabaseImpl(metadata, comment);
} catch (NoSuchNamespaceException e) {
Expand Down Expand Up @@ -214,7 +230,7 @@ private void createDatabase(
throws DatabaseAlreadyExistException, CatalogException {
if (asNamespaceCatalog != null) {
try {
asNamespaceCatalog.createNamespace(toNamespace(databaseName), metadata);
asNamespaceCatalog.createNamespace(appendLevel(baseNamespace, databaseName), metadata);
} catch (AlreadyExistsException e) {
if (!ignoreIfExists) {
throw new DatabaseAlreadyExistException(getName(), databaseName, e);
Expand Down Expand Up @@ -243,7 +259,7 @@ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade
throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
if (asNamespaceCatalog != null) {
try {
boolean success = asNamespaceCatalog.dropNamespace(toNamespace(name));
boolean success = asNamespaceCatalog.dropNamespace(appendLevel(baseNamespace, name));
if (!success && !ignoreIfNotExists) {
throw new DatabaseNotExistException(getName(), name);
}
Expand All @@ -265,7 +281,7 @@ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade
public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
if (asNamespaceCatalog != null) {
Namespace namespace = toNamespace(name);
Namespace namespace = appendLevel(baseNamespace, name);
Map<String, String> updates = Maps.newHashMap();
Set<String> removals = Sets.newHashSet();

Expand Down Expand Up @@ -314,7 +330,7 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno
public List<String> listTables(String databaseName)
throws DatabaseNotExistException, CatalogException {
try {
return icebergCatalog.listTables(toNamespace(databaseName)).stream()
return icebergCatalog.listTables(appendLevel(baseNamespace, databaseName)).stream()
.map(TableIdentifier::name)
.collect(Collectors.toList());
} catch (NoSuchNamespaceException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ public static Schema convert(Schema baseSchema, TableSchema flinkSchema) {
Types.StructType struct = convert(flinkSchema).asStruct();
// reassign ids to match the base schema
Schema schema = TypeUtil.reassignIds(new Schema(struct.fields()), baseSchema);
// reassign doc to match the base schema
schema = TypeUtil.reassignDoc(schema, baseSchema);

// fix types that can't be represented in Flink (UUID)
Schema fixedSchema = FlinkFixupTypes.fixup(schema, baseSchema);
return freshIdentifierFieldIds(fixedSchema, flinkSchema);
Expand Down
Loading

0 comments on commit 35151fe

Please sign in to comment.