Skip to content

Commit

Permalink
[SPARK-43123][SQL] Internal field metadata should not be leaked to ca…
Browse files Browse the repository at this point in the history
…talogs

### What changes were proposed in this pull request?

In Spark, we have defined some internal field metadata to help query resolution and compilation. For example, there are quite some field metadata that are related to metadata columns.

However, when we create tables, these internal field metadata can be leaked. This PR updates CTAS/RTAS commands to remove these internal field metadata before creating tables. CREATE/REPLACE TABLE command is fine as users can't generate these internal field metadata via the type string.

### Why are the changes needed?

to avoid potential issues, like mistakenly treating a data column as metadata column

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new test

Closes #40776 from cloud-fan/meta.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cloud-fan committed Apr 14, 2023
1 parent ea49637 commit 4c938d6
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.trees.AlwaysProcess
import org.apache.spark.sql.catalyst.trees.CurrentOrigin.withOrigin
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.catalyst.util.{toPrettySQL, CharVarcharUtils, StringUtils}
import org.apache.spark.sql.catalyst.util.{toPrettySQL, AUTO_GENERATED_ALIAS, CharVarcharUtils, StringUtils}
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.connector.catalog.{View => _, _}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
Expand Down Expand Up @@ -492,7 +492,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
case l: Literal => Alias(l, toPrettySQL(l))()
case e =>
val metaForAutoGeneratedAlias = new MetadataBuilder()
.putString("__autoGeneratedAlias", "true")
.putString(AUTO_GENERATED_ALIAS, "true")
.build()
Alias(e, toPrettySQL(e))(explicitMetadata = Some(metaForAutoGeneratedAlias))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import com.google.common.io.ByteStreams
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{MetadataBuilder, NumericType, StringType}
import org.apache.spark.sql.types.{MetadataBuilder, NumericType, StringType, StructType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -191,12 +191,13 @@ package object util extends Logging {

val METADATA_COL_ATTR_KEY = "__metadata_col"

/**
* If set, this metadata column can only be accessed with qualifiers, e.g. `qualifiers.col` or
* `qualifiers.*`. If not set, metadata columns cannot be accessed via star.
*/
val QUALIFIED_ACCESS_ONLY = "__qualified_access_only"

implicit class MetadataColumnHelper(attr: Attribute) {
/**
* If set, this metadata column can only be accessed with qualifiers, e.g. `qualifiers.col` or
* `qualifiers.*`. If not set, metadata columns cannot be accessed via star.
*/
val QUALIFIED_ACCESS_ONLY = "__qualified_access_only"

def isMetadataCol: Boolean = MetadataAttribute.isValid(attr.metadata)

Expand Down Expand Up @@ -225,4 +226,25 @@ package object util extends Logging {
}
}
}

val AUTO_GENERATED_ALIAS = "__autoGeneratedAlias"

val INTERNAL_METADATA_KEYS = Seq(
AUTO_GENERATED_ALIAS,
METADATA_COL_ATTR_KEY,
QUALIFIED_ACCESS_ONLY,
FileSourceMetadataAttribute.FILE_SOURCE_METADATA_COL_ATTR_KEY,
FileSourceConstantMetadataStructField.FILE_SOURCE_CONSTANT_METADATA_COL_ATTR_KEY,
FileSourceGeneratedMetadataStructField.FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY
)

def removeInternalMetadata(schema: StructType): StructType = {
StructType(schema.map { field =>
var builder = new MetadataBuilder().withMetadata(field.metadata)
INTERNAL_METADATA_KEYS.foreach { key =>
builder = builder.remove(key)
}
field.copy(metadata = builder.build())
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.net.URI
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.CommandExecutionMode
import org.apache.spark.sql.execution.datasources._
Expand Down Expand Up @@ -181,7 +181,8 @@ case class CreateDataSourceTableAsSelectCommand(
}
val result = saveDataIntoTable(
sparkSession, table, tableLocation, SaveMode.Overwrite, tableExists = false)
val tableSchema = CharVarcharUtils.getRawSchema(result.schema, sessionState.conf)
val tableSchema = CharVarcharUtils.getRawSchema(
removeInternalMetadata(result.schema), sessionState.conf)
val newTable = table.copy(
storage = table.storage.copy(locationUri = tableLocation),
// We will use the schema of resolved.relation as the schema of the table (instead of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, TableSpec, UnaryNode}
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, WriteDeltaProjections}
import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils, WriteDeltaProjections}
import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSERT_OPERATION, UPDATE_OPERATION}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.metric.CustomMetric
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, Write, WriterCommitMessage}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{LongAccumulator, Utils}

/**
Expand Down Expand Up @@ -69,7 +70,7 @@ case class CreateTableAsSelectExec(
query: LogicalPlan,
tableSpec: TableSpec,
writeOptions: Map[String, String],
ifNotExists: Boolean) extends TableWriteExecHelper {
ifNotExists: Boolean) extends V2CreateTableAsSelectBaseExec {

val properties = CatalogV2Util.convertTableProperties(tableSpec)

Expand All @@ -78,14 +79,10 @@ case class CreateTableAsSelectExec(
if (ifNotExists) {
return Nil
}

throw QueryCompilationErrors.tableAlreadyExistsError(ident)
}

val columns = CatalogV2Util.structTypeToV2Columns(
CharVarcharUtils.getRawSchema(query.schema, conf).asNullable)
val table = catalog.createTable(ident, columns,
partitioning.toArray, properties.asJava)
val table = catalog.createTable(
ident, getV2Columns(query.schema), partitioning.toArray, properties.asJava)
writeToTable(catalog, table, writeOptions, ident, query)
}
}
Expand All @@ -106,7 +103,7 @@ case class AtomicCreateTableAsSelectExec(
query: LogicalPlan,
tableSpec: TableSpec,
writeOptions: Map[String, String],
ifNotExists: Boolean) extends TableWriteExecHelper {
ifNotExists: Boolean) extends V2CreateTableAsSelectBaseExec {

val properties = CatalogV2Util.convertTableProperties(tableSpec)

Expand All @@ -115,13 +112,10 @@ case class AtomicCreateTableAsSelectExec(
if (ifNotExists) {
return Nil
}

throw QueryCompilationErrors.tableAlreadyExistsError(ident)
}
val columns = CatalogV2Util.structTypeToV2Columns(
CharVarcharUtils.getRawSchema(query.schema, conf).asNullable)
val stagedTable = catalog.stageCreate(
ident, columns, partitioning.toArray, properties.asJava)
ident, getV2Columns(query.schema), partitioning.toArray, properties.asJava)
writeToTable(catalog, stagedTable, writeOptions, ident, query)
}
}
Expand All @@ -144,7 +138,8 @@ case class ReplaceTableAsSelectExec(
tableSpec: TableSpec,
writeOptions: Map[String, String],
orCreate: Boolean,
invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends TableWriteExecHelper {
invalidateCache: (TableCatalog, Table, Identifier) => Unit)
extends V2CreateTableAsSelectBaseExec {

val properties = CatalogV2Util.convertTableProperties(tableSpec)

Expand All @@ -164,10 +159,8 @@ case class ReplaceTableAsSelectExec(
} else if (!orCreate) {
throw QueryCompilationErrors.cannotReplaceMissingTableError(ident)
}
val columns = CatalogV2Util.structTypeToV2Columns(
CharVarcharUtils.getRawSchema(query.schema, conf).asNullable)
val table = catalog.createTable(
ident, columns, partitioning.toArray, properties.asJava)
ident, getV2Columns(query.schema), partitioning.toArray, properties.asJava)
writeToTable(catalog, table, writeOptions, ident, query)
}
}
Expand All @@ -192,13 +185,13 @@ case class AtomicReplaceTableAsSelectExec(
tableSpec: TableSpec,
writeOptions: Map[String, String],
orCreate: Boolean,
invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends TableWriteExecHelper {
invalidateCache: (TableCatalog, Table, Identifier) => Unit)
extends V2CreateTableAsSelectBaseExec {

val properties = CatalogV2Util.convertTableProperties(tableSpec)

override protected def run(): Seq[InternalRow] = {
val columns = CatalogV2Util.structTypeToV2Columns(
CharVarcharUtils.getRawSchema(query.schema, conf).asNullable)
val columns = getV2Columns(query.schema)
if (catalog.tableExists(ident)) {
val table = catalog.loadTable(ident)
invalidateCache(catalog, table, ident)
Expand Down Expand Up @@ -559,9 +552,14 @@ case class DeltaWithMetadataWritingSparkTask(
}
}

private[v2] trait TableWriteExecHelper extends LeafV2CommandExec {
private[v2] trait V2CreateTableAsSelectBaseExec extends LeafV2CommandExec {
override def output: Seq[Attribute] = Nil

protected def getV2Columns(schema: StructType): Array[Column] = {
CatalogV2Util.structTypeToV2Columns(CharVarcharUtils.getRawSchema(
removeInternalMetadata(schema), conf).asNullable)
}

protected def writeToTable(
catalog: TableCatalog,
table: Table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.spark.sql.connector

import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.types.IntegerType

class MetadataColumnSuite extends DatasourceV2SQLBase {
import testImplicits._
Expand Down Expand Up @@ -340,4 +343,17 @@ class MetadataColumnSuite extends DatasourceV2SQLBase {
assert(relations(0).output != relations(1).output)
}
}

test("SPARK-43123: Metadata column related field metadata should not be leaked to catalogs") {
withTable(tbl, "testcat.target") {
prepareTable()
sql(s"CREATE TABLE testcat.target AS SELECT index FROM $tbl")
val cols = catalog("testcat").asTableCatalog.loadTable(
Identifier.of(Array.empty, "target")).columns()
assert(cols.length == 1)
assert(cols.head.name() == "index")
assert(cols.head.dataType() == IntegerType)
assert(cols.head.metadataInJSON() == null)
}
}
}

0 comments on commit 4c938d6

Please sign in to comment.