Skip to content

Commit

Permalink
Addressed conflicts.
Browse files Browse the repository at this point in the history
  • Loading branch information
andreaschat-db committed Aug 19, 2024
1 parent 835f918 commit e804b85
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
val existingFeatureNames = newProtocolBeforeAddingFeatures.readerAndWriterFeatureNames
if (!newFeaturesFromTableConf.map(_.name).subsetOf(existingFeatureNames)) {
// When enabling legacy features, include all preceding legacy features.
val implicitFeatures = TableFeatureProtocolUtils.implicitFeatures(newFeaturesFromTableConf)
val implicitFeatures = TableFeatureProtocolUtils.implicitlySupportedFeatures(newFeaturesFromTableConf)
newProtocol = Some(
Protocol(
readerVersionForNewProtocol,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,9 +500,15 @@ object TableFeatureProtocolUtils {
((features.map(_.minReaderVersion) :+ 1).max, (features.map(_.minWriterVersion) :+ 1).max)

/**
* Return a set with the implicit features of the provided feature set.
* Returns a set of legacy features that contains the input features as well as the
* features that will be supported by the protocol as a "byproduct" of supporting the
* given legacy `features`.
*
* As an example, the legacy protocol for supporting ColumnMapping also supports
* AppendOnly, Invariants, CheckConstraints, CDF, GeneratedColumns as byproducts and there is
* no way to not support them.
*/
def implicitFeatures(features: Set[TableFeature]): Set[TableFeature] =
def implicitlySupportedFeatures(features: Set[TableFeature]): Set[TableFeature] =
features.flatMap { f =>
Protocol(f.minReaderVersion, f.minWriterVersion).implicitlySupportedFeatures
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ object Protocol {
}

// When enabling legacy features, include all preceding legacy features.
val implicitFeatures = TableFeatureProtocolUtils.implicitFeatures(allEnabledFeatures)
val implicitFeatures = TableFeatureProtocolUtils.implicitlySupportedFeatures(allEnabledFeatures)

(finalReaderVersion, finalWriterVersion,
allEnabledFeatures ++ implicitFeatures ++ implicitFeaturesFromTableConf)
Expand Down Expand Up @@ -430,7 +430,7 @@ object Protocol {
}

// When enabling legacy features, include all preceding legacy features.
val implicitFeatures = TableFeatureProtocolUtils.implicitFeatures(enabledFeatures)
val implicitFeatures = TableFeatureProtocolUtils.implicitlySupportedFeatures(enabledFeatures)

(readerVersion, writerVersion, enabledFeatures ++ implicitFeatures)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2407,25 +2407,14 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
|$featureProperty = "true"
|)""".stripMargin)

/*
val expectedReaderVersion = Math.max(feature.minReaderVersion, 1)
val expectedReaderVersion = Math.max(1, feature.minReaderVersion)
val expectProtocol = if (feature.isLegacyFeature) {
Protocol(expectedReaderVersion, feature.minWriterVersion)
} else {
Protocol(
expectedReaderVersion,
TABLE_FEATURES_MIN_WRITER_VERSION,
if (supportsReaderFeatures(expectedReaderVersion)) Some(Set(feature.name)) else None,
Some(Set(feature.name, InvariantsTableFeature.name, AppendOnlyTableFeature.name)))
}
*/
val expectProtocol = if (feature.isLegacyFeature) {
Protocol(Math.max(1, feature.minReaderVersion), feature.minWriterVersion)
} else {
Protocol(3, 7).withFeatures(Seq(
Protocol(expectedReaderVersion, 7).withFeatures(Seq(
InvariantsTableFeature,
AppendOnlyTableFeature,
feature)).normalized
feature))
}

assert(deltaLog.update().protocol === expectProtocol)
Expand Down Expand Up @@ -4205,3 +4194,4 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
}

class DeltaProtocolVersionSuite extends DeltaProtocolVersionSuiteBase
with DeltaProtocolVersionSuiteEdge
Original file line number Diff line number Diff line change
Expand Up @@ -335,17 +335,17 @@ class DeltaTableFeatureSuite
val log = DeltaLog.forTable(spark, TableIdentifier("tbl"))
val protocol = log.update().protocol
assert(protocol.readerAndWriterFeatureNames === Set(
AppendOnlyTableFeature.name,
InvariantsTableFeature.name,
CheckConstraintsTableFeature.name,
GeneratedColumnsTableFeature.name,
ChangeDataFeedTableFeature.name,
ColumnMappingTableFeature.name,
TestLegacyWriterFeature.name,
TestRemovableLegacyReaderWriterFeature.name,
TestLegacyReaderWriterFeature.name,
TestRemovableLegacyWriterFeature.name,
TestWriterFeature.name))
AppendOnlyTableFeature,
InvariantsTableFeature,
CheckConstraintsTableFeature,
GeneratedColumnsTableFeature,
ChangeDataFeedTableFeature,
ColumnMappingTableFeature,
TestLegacyWriterFeature,
TestRemovableLegacyReaderWriterFeature,
TestLegacyReaderWriterFeature,
TestRemovableLegacyWriterFeature,
TestWriterFeature).map(_.name))
}
}
}
Expand Down Expand Up @@ -396,12 +396,12 @@ class DeltaTableFeatureSuite
commandName, targetTableName = "tbl", sourceTableName = "tbl", tblProperties))
val protocol = log.update().protocol
assert(protocol.readerAndWriterFeatureNames === Set(
AppendOnlyTableFeature.name,
InvariantsTableFeature.name,
CheckConstraintsTableFeature.name,
GeneratedColumnsTableFeature.name,
ChangeDataFeedTableFeature.name,
TestWriterFeature.name))
AppendOnlyTableFeature,
InvariantsTableFeature,
CheckConstraintsTableFeature,
GeneratedColumnsTableFeature,
ChangeDataFeedTableFeature,
TestWriterFeature).map(_.name))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import java.util.concurrent.TimeUnit

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.DeltaConfigs._
import org.apache.spark.sql.delta.actions.Protocol
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.sources.DeltaSQLConf._

import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -81,31 +83,35 @@ class DropColumnMappingFeatureSuite extends RemoveColumnMappingSuiteUtils {
}

test("drop column mapping from a table without table feature") {
sql(
s"""CREATE TABLE $testTableName
|USING delta
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name',
| '${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = 'false',
| 'delta.minReaderVersion' = '3',
| 'delta.minWriterVersion' = '7')
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
testDroppingColumnMapping()
withSQLConf(DeltaSQLConf.TABLE_FEATURES_TEST_FEATURES_ENABLED.key -> false.toString) {
sql(
s"""CREATE TABLE $testTableName
|USING delta
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name',
| '${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = 'false',
| 'delta.minReaderVersion' = '3',
| 'delta.minWriterVersion' = '7')
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
testDroppingColumnMapping(protocolContainsDVs = false)
}
}

test("drop column mapping from a table with table feature") {
sql(
s"""CREATE TABLE $testTableName
|USING delta
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name',
| '${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = 'false',
| 'delta.minReaderVersion' = '3',
| 'delta.minWriterVersion' = '7')
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
testDroppingColumnMapping()
withSQLConf(DeltaSQLConf.TABLE_FEATURES_TEST_FEATURES_ENABLED.key -> false.toString) {
sql(
s"""CREATE TABLE $testTableName
|USING delta
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name',
| '${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = 'true',
| 'delta.minReaderVersion' = '3',
| 'delta.minWriterVersion' = '7')
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
testDroppingColumnMapping(protocolContainsDVs = true)
}
}

test("drop column mapping from a table without column mapping table property") {
Expand Down Expand Up @@ -135,20 +141,22 @@ class DropColumnMappingFeatureSuite extends RemoveColumnMappingSuiteUtils {
}

test("drop column mapping in id mode") {
sql(
s"""CREATE TABLE $testTableName
|USING delta
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'id',
| '${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = 'false',
| 'delta.minReaderVersion' = '3',
| 'delta.minWriterVersion' = '7')
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
testDroppingColumnMapping()
withSQLConf(DeltaSQLConf.TABLE_FEATURES_TEST_FEATURES_ENABLED.key -> false.toString) {
sql(
s"""CREATE TABLE $testTableName
|USING delta
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'id',
| '${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = 'false',
| 'delta.minReaderVersion' = '3',
| 'delta.minWriterVersion' = '7')
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
testDroppingColumnMapping(protocolContainsDVs = false)
}
}

def testDroppingColumnMapping(): Unit = {
def testDroppingColumnMapping(protocolContainsDVs: Boolean): Unit = {
// Verify the input data is as expected.
val originalData = spark.table(tableName = testTableName).select(logicalColumnName).collect()
// Add a schema comment and verify it is preserved after the rewrite.
Expand Down Expand Up @@ -186,10 +194,10 @@ class DropColumnMappingFeatureSuite extends RemoveColumnMappingSuiteUtils {
// Verify the schema comment is preserved after the rewrite.
assert(deltaLog.update().schema.head.getComment().get == comment,
"Should preserve the schema comment.")
verifyDropFeatureTruncateHistory()
verifyDropFeatureTruncateHistory(protocolContainsDVs)
}

protected def verifyDropFeatureTruncateHistory() = {
protected def verifyDropFeatureTruncateHistory(protocolContainsDVs: Boolean) = {
val deltaLog1 = DeltaLog.forTable(spark, TableIdentifier(tableName = testTableName), clock)
// Populate the delta cache with the delta log with the right data path so it stores the clock.
// This is currently the only way to make sure the drop feature command uses the clock.
Expand All @@ -209,8 +217,23 @@ class DropColumnMappingFeatureSuite extends RemoveColumnMappingSuiteUtils {
|ALTER TABLE $testTableName DROP FEATURE ${ColumnMappingTableFeature.name} TRUNCATE HISTORY
|""".stripMargin)
val newSnapshot = deltaLog.update()
assert(!newSnapshot.protocol.readerAndWriterFeatures.contains(ColumnMappingTableFeature),
"Should drop the feature.")

val expectedProtocol = if (protocolContainsDVs) {
Protocol(
minReaderVersion = 3,
minWriterVersion = 7,
Some(Set(DeletionVectorsTableFeature.name)),
Some(Set(
AppendOnlyTableFeature,
InvariantsTableFeature,
CheckConstraintsTableFeature,
ChangeDataFeedTableFeature,
GeneratedColumnsTableFeature,
DeletionVectorsTableFeature).map(_.name)))
} else {
Protocol(1, 4)
}
assert(newSnapshot.protocol === expectedProtocol)
}

protected def dropColumnMappingTableFeature(): Unit = {
Expand Down

0 comments on commit e804b85

Please sign in to comment.