Skip to content

Commit

Permalink
Address Carmen's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
andreaschat-db committed Jul 10, 2024
1 parent 2f7492a commit c132361
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,29 +131,9 @@ trait DeltaColumnMappingBase extends DeltaLogging {
}

val isChangingModeOnExistingTable = oldMappingMode != newMappingMode && !isCreatingNewTable
if (isChangingModeOnExistingTable) {
if (!allowMappingModeChange(oldMappingMode, newMappingMode)) {
throw DeltaErrors.changeColumnMappingModeNotSupported(
oldMappingMode.name, newMappingMode.name)
} else {
// legal mode change, now check if protocol is upgraded before or part of this txn
val caseInsensitiveMap = CaseInsensitiveMap(newMetadata.configuration)
val minReaderVersion = caseInsensitiveMap
.get(Protocol.MIN_READER_VERSION_PROP).map(_.toInt)
.getOrElse(oldProtocol.minReaderVersion)
val minWriterVersion = caseInsensitiveMap
.get(Protocol.MIN_WRITER_VERSION_PROP).map(_.toInt)
.getOrElse(oldProtocol.minWriterVersion)
var newProtocol = Protocol(minReaderVersion, minWriterVersion)
val satisfiesWriterVersion = minWriterVersion >= ColumnMappingTableFeature.minWriterVersion
val satisfiesReaderVersion = minReaderVersion >= ColumnMappingTableFeature.minReaderVersion
// This is an OR check because `readerFeatures` and `writerFeatures` can independently
// support table features.
if ((newProtocol.supportsReaderFeatures && satisfiesWriterVersion) ||
(newProtocol.supportsWriterFeatures && satisfiesReaderVersion)) {
newProtocol = newProtocol.withFeature(ColumnMappingTableFeature)
}
}
if (isChangingModeOnExistingTable && !allowMappingModeChange(oldMappingMode, newMappingMode)) {
throw DeltaErrors.changeColumnMappingModeNotSupported(
oldMappingMode.name, newMappingMode.name)
}

val updatedMetadata = updateColumnMappingMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,10 @@ object Protocol {
val (readerVersion, writerVersion, enabledFeatures) =
minProtocolComponentsFromMetadata(spark, metadata)
// New table protocols should always be denormalized and then normalized to convert the
// protocol to the weakest possible form. For example:
// 1) (3, 7, RowIDs) is normalized to (3, 7, RowIDs).
// protocol to the weakest possible form. This means either converting a table features
// protocol to a legacy protocol or reducing the versions of a table features protocol.
// For example:
// 1) (3, 7, RowTracking) is normalized to (1, 7, RowTracking).
// 2) (3, 7, AppendOnly, Invariants) is normalized to (1, 2).
// 3) (2, 3) is normalized to (1, 3).
Protocol(readerVersion, writerVersion)
Expand Down
16 changes: 10 additions & 6 deletions spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Locale
import scala.language.postfixOps

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.{DeltaIllegalArgumentException, DeltaLog, DeltaTableFeatureException, FakeFileSystem, TestReaderWriterFeature, TestWriterFeature}
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{ Metadata, Protocol }
import org.apache.spark.sql.delta.storage.LocalLogStore
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
Expand Down Expand Up @@ -574,13 +574,17 @@ class DeltaTableHadoopOptionsSuite extends QueryTest
// update the protocol to support a writer feature.
val table = DeltaTable.forPath(spark, path, fsOptions)
table.addFeatureSupport(TestWriterFeature.name)
assert(log.update().protocol === Protocol(1, 7)
.withFeature(TestWriterFeature).merge(Protocol(1, 2)))
assert(log.update().protocol === Protocol(1, 7).withFeatures(Seq(
AppendOnlyTableFeature,
InvariantsTableFeature,
TestWriterFeature)))
table.addFeatureSupport(TestReaderWriterFeature.name)
assert(
log.update().protocol === Protocol(3, 7)
.withFeatures(Seq(TestWriterFeature, TestReaderWriterFeature))
.merge(Protocol(1, 2)))
log.update().protocol === Protocol(3, 7).withFeatures(Seq(
AppendOnlyTableFeature,
InvariantsTableFeature,
TestWriterFeature,
TestReaderWriterFeature)))

// update the protocol again with invalid feature name.
assert(intercept[DeltaTableFeatureException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1895,9 +1895,7 @@ class DeltaColumnMappingSuite extends QueryTest
s"""CREATE TABLE $testTableName
|USING DELTA
|TBLPROPERTIES(
|'$minReaderKey' = '3',
|'$minWriterKey' = '7',
|'${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = 'true'
|'${DeltaConfigs.ROW_TRACKING_ENABLED.key}' = 'true'
|)
|AS SELECT * FROM RANGE(1)
|""".stripMargin)
Expand All @@ -1909,6 +1907,14 @@ class DeltaColumnMappingSuite extends QueryTest
s"""ALTER TABLE $testTableName SET TBLPROPERTIES(
|'$columnMappingMode'='name'
|)""".stripMargin)

val deltaLog = DeltaLog.forTable(spark, TableIdentifier(testTableName))
assert(deltaLog.update().protocol === Protocol(2, 7).withFeatures(Seq(
AppendOnlyTableFeature,
InvariantsTableFeature,
ColumnMappingTableFeature,
RowTrackingFeature
)))
}
}

Expand Down

0 comments on commit c132361

Please sign in to comment.