From 4c938d62d791742b9f0c6a77b66fc06a90d7c0ad Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 14 Apr 2023 17:08:29 +0800 Subject: [PATCH] [SPARK-43123][SQL] Internal field metadata should not be leaked to catalogs ### What changes were proposed in this pull request? In Spark, we have defined some internal field metadata to help query resolution and compilation. For example, there are quite some field metadata that are related to metadata columns. However, when we create tables, these internal field metadata can be leaked. This PR updates CTAS/RTAS commands to remove these internal field metadata before creating tables. CREATE/REPLACE TABLE command is fine as users can't generate these internal field metadata via the type string. ### Why are the changes needed? to avoid potential issues, like mistakenly treating a data column as metadata column ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new test Closes #40776 from cloud-fan/meta. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../sql/catalyst/analysis/Analyzer.scala | 4 +- .../spark/sql/catalyst/util/package.scala | 34 ++++++++++++--- .../command/createDataSourceTables.scala | 5 ++- .../v2/WriteToDataSourceV2Exec.scala | 42 +++++++++---------- .../sql/connector/MetadataColumnSuite.scala | 16 +++++++ 5 files changed, 69 insertions(+), 32 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 126b0618727d4..74592c15d2311 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.catalyst.trees.AlwaysProcess import org.apache.spark.sql.catalyst.trees.CurrentOrigin.withOrigin import org.apache.spark.sql.catalyst.trees.TreePattern._ -import org.apache.spark.sql.catalyst.util.{toPrettySQL, CharVarcharUtils, StringUtils} +import org.apache.spark.sql.catalyst.util.{toPrettySQL, AUTO_GENERATED_ALIAS, CharVarcharUtils, StringUtils} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.connector.catalog.{View => _, _} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ @@ -492,7 +492,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor case l: Literal => Alias(l, toPrettySQL(l))() case e => val metaForAutoGeneratedAlias = new MetadataBuilder() - .putString("__autoGeneratedAlias", "true") + .putString(AUTO_GENERATED_ALIAS, "true") .build() Alias(e, toPrettySQL(e))(explicitMetadata = Some(metaForAutoGeneratedAlias)) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index e1ce45c135385..23ddb534af919 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -27,7 +27,7 @@ import com.google.common.io.ByteStreams import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{MetadataBuilder, NumericType, StringType} +import org.apache.spark.sql.types.{MetadataBuilder, NumericType, StringType, StructType} import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -191,12 +191,13 @@ package object util extends Logging { val METADATA_COL_ATTR_KEY = "__metadata_col" + /** + * If set, this metadata column can only be accessed with qualifiers, e.g. `qualifiers.col` or + * `qualifiers.*`. If not set, metadata columns cannot be accessed via star. + */ + val QUALIFIED_ACCESS_ONLY = "__qualified_access_only" + implicit class MetadataColumnHelper(attr: Attribute) { - /** - * If set, this metadata column can only be accessed with qualifiers, e.g. `qualifiers.col` or - * `qualifiers.*`. If not set, metadata columns cannot be accessed via star. - */ - val QUALIFIED_ACCESS_ONLY = "__qualified_access_only" def isMetadataCol: Boolean = MetadataAttribute.isValid(attr.metadata) @@ -225,4 +226,25 @@ package object util extends Logging { } } } + + val AUTO_GENERATED_ALIAS = "__autoGeneratedAlias" + + val INTERNAL_METADATA_KEYS = Seq( + AUTO_GENERATED_ALIAS, + METADATA_COL_ATTR_KEY, + QUALIFIED_ACCESS_ONLY, + FileSourceMetadataAttribute.FILE_SOURCE_METADATA_COL_ATTR_KEY, + FileSourceConstantMetadataStructField.FILE_SOURCE_CONSTANT_METADATA_COL_ATTR_KEY, + FileSourceGeneratedMetadataStructField.FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY + ) + + def removeInternalMetadata(schema: StructType): StructType = { + StructType(schema.map { field => + var builder = new MetadataBuilder().withMetadata(field.metadata) + INTERNAL_METADATA_KEYS.foreach { key => + builder = builder.remove(key) + } + field.copy(metadata = builder.build()) + }) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index bf14ef14cf463..3848d5505155e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -22,7 +22,7 @@ import java.net.URI import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.util.CharVarcharUtils +import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.CommandExecutionMode import org.apache.spark.sql.execution.datasources._ @@ -181,7 +181,8 @@ case class CreateDataSourceTableAsSelectCommand( } val result = saveDataIntoTable( sparkSession, table, tableLocation, SaveMode.Overwrite, tableExists = false) - val tableSchema = CharVarcharUtils.getRawSchema(result.schema, sessionState.conf) + val tableSchema = CharVarcharUtils.getRawSchema( + removeInternalMetadata(result.schema), sessionState.conf) val newTable = table.copy( storage = table.storage.copy(locationUri = tableLocation), // We will use the schema of resolved.relation as the schema of the table (instead of diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 8355ac8e70366..426f33129a6ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -26,15 +26,16 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, TableSpec, UnaryNode} -import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, WriteDeltaProjections} +import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils, WriteDeltaProjections} import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSERT_OPERATION, UPDATE_OPERATION} -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.metric.CustomMetric import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, Write, WriterCommitMessage} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, SQLMetrics} +import org.apache.spark.sql.types.StructType import org.apache.spark.util.{LongAccumulator, Utils} /** @@ -69,7 +70,7 @@ case class CreateTableAsSelectExec( query: LogicalPlan, tableSpec: TableSpec, writeOptions: Map[String, String], - ifNotExists: Boolean) extends TableWriteExecHelper { + ifNotExists: Boolean) extends V2CreateTableAsSelectBaseExec { val properties = CatalogV2Util.convertTableProperties(tableSpec) @@ -78,14 +79,10 @@ case class CreateTableAsSelectExec( if (ifNotExists) { return Nil } - throw QueryCompilationErrors.tableAlreadyExistsError(ident) } - - val columns = CatalogV2Util.structTypeToV2Columns( - CharVarcharUtils.getRawSchema(query.schema, conf).asNullable) - val table = catalog.createTable(ident, columns, - partitioning.toArray, properties.asJava) + val table = catalog.createTable( + ident, getV2Columns(query.schema), partitioning.toArray, properties.asJava) writeToTable(catalog, table, writeOptions, ident, query) } } @@ -106,7 +103,7 @@ case class AtomicCreateTableAsSelectExec( query: LogicalPlan, tableSpec: TableSpec, writeOptions: Map[String, String], - ifNotExists: Boolean) extends TableWriteExecHelper { + ifNotExists: Boolean) extends V2CreateTableAsSelectBaseExec { val properties = CatalogV2Util.convertTableProperties(tableSpec) @@ -115,13 +112,10 @@ case class AtomicCreateTableAsSelectExec( if (ifNotExists) { return Nil } - throw QueryCompilationErrors.tableAlreadyExistsError(ident) } - val columns = CatalogV2Util.structTypeToV2Columns( - CharVarcharUtils.getRawSchema(query.schema, conf).asNullable) val stagedTable = catalog.stageCreate( - ident, columns, partitioning.toArray, properties.asJava) + ident, getV2Columns(query.schema), partitioning.toArray, properties.asJava) writeToTable(catalog, stagedTable, writeOptions, ident, query) } } @@ -144,7 +138,8 @@ case class ReplaceTableAsSelectExec( tableSpec: TableSpec, writeOptions: Map[String, String], orCreate: Boolean, - invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends TableWriteExecHelper { + invalidateCache: (TableCatalog, Table, Identifier) => Unit) + extends V2CreateTableAsSelectBaseExec { val properties = CatalogV2Util.convertTableProperties(tableSpec) @@ -164,10 +159,8 @@ case class ReplaceTableAsSelectExec( } else if (!orCreate) { throw QueryCompilationErrors.cannotReplaceMissingTableError(ident) } - val columns = CatalogV2Util.structTypeToV2Columns( - CharVarcharUtils.getRawSchema(query.schema, conf).asNullable) val table = catalog.createTable( - ident, columns, partitioning.toArray, properties.asJava) + ident, getV2Columns(query.schema), partitioning.toArray, properties.asJava) writeToTable(catalog, table, writeOptions, ident, query) } } @@ -192,13 +185,13 @@ case class AtomicReplaceTableAsSelectExec( tableSpec: TableSpec, writeOptions: Map[String, String], orCreate: Boolean, - invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends TableWriteExecHelper { + invalidateCache: (TableCatalog, Table, Identifier) => Unit) + extends V2CreateTableAsSelectBaseExec { val properties = CatalogV2Util.convertTableProperties(tableSpec) override protected def run(): Seq[InternalRow] = { - val columns = CatalogV2Util.structTypeToV2Columns( - CharVarcharUtils.getRawSchema(query.schema, conf).asNullable) + val columns = getV2Columns(query.schema) if (catalog.tableExists(ident)) { val table = catalog.loadTable(ident) invalidateCache(catalog, table, ident) @@ -559,9 +552,14 @@ case class DeltaWithMetadataWritingSparkTask( } } -private[v2] trait TableWriteExecHelper extends LeafV2CommandExec { +private[v2] trait V2CreateTableAsSelectBaseExec extends LeafV2CommandExec { override def output: Seq[Attribute] = Nil + protected def getV2Columns(schema: StructType): Array[Column] = { + CatalogV2Util.structTypeToV2Columns(CharVarcharUtils.getRawSchema( + removeInternalMetadata(schema), conf).asNullable) + } + protected def writeToTable( catalog: TableCatalog, table: Table, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala index d03bf402170ca..b043bf2f5be23 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala @@ -18,8 +18,11 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ +import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.functions.{col, struct} +import org.apache.spark.sql.types.IntegerType class MetadataColumnSuite extends DatasourceV2SQLBase { import testImplicits._ @@ -340,4 +343,17 @@ class MetadataColumnSuite extends DatasourceV2SQLBase { assert(relations(0).output != relations(1).output) } } + + test("SPARK-43123: Metadata column related field metadata should not be leaked to catalogs") { + withTable(tbl, "testcat.target") { + prepareTable() + sql(s"CREATE TABLE testcat.target AS SELECT index FROM $tbl") + val cols = catalog("testcat").asTableCatalog.loadTable( + Identifier.of(Array.empty, "target")).columns() + assert(cols.length == 1) + assert(cols.head.name() == "index") + assert(cols.head.dataType() == IntegerType) + assert(cols.head.metadataInJSON() == null) + } + } }