Skip to content

Commit

Permalink
ddl: Exchange part schema load fix (pingcap#46126) (pingcap#46191)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot committed Sep 16, 2023
1 parent ec0ffe8 commit 774c4e7
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 31 deletions.
45 changes: 29 additions & 16 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1556,27 +1556,40 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ...
diff.OldSchemaID = oldSchemaIDs[0]
diff.AffectedOpts = affects
case model.ActionExchangeTablePartition:
// From start of function: diff.SchemaID = job.SchemaID
// Old is original non partitioned table
diff.OldTableID = job.TableID
diff.OldSchemaID = job.SchemaID
// Update the partitioned table (it is only done in the last state)
var (
ptSchemaID int64
ptTableID int64
ptDefID int64
partName string // Not used
withValidation bool // Not used
)
// See ddl.ExchangeTablePartition
err = job.DecodeArgs(&ptDefID, &ptSchemaID, &ptTableID, &partName, &withValidation)
if err != nil {
return 0, errors.Trace(err)
}
// This is needed for not crashing TiFlash!
// TODO: Update TiFlash, to handle StateWriteOnly
diff.AffectedOpts = []*model.AffectedOption{{
TableID: ptTableID,
}}
if job.SchemaState != model.StatePublic {
// No change, just to refresh the non-partitioned table
// with its new ExchangePartitionInfo.
diff.TableID = job.TableID
diff.SchemaID = job.SchemaID
// Keep this as Schema ID of non-partitioned table
// to avoid trigger early rename in TiFlash
diff.AffectedOpts[0].SchemaID = job.SchemaID
} else {
// Update the partitioned table (it is only done in the last state)
var (
ptSchemaID int64
ptTableID int64
ptDefID int64 // Not needed, will reload the whole table
partName string // Not used
withValidation bool // Not used
)
// See ddl.ExchangeTablePartition
err = job.DecodeArgs(&ptDefID, &ptSchemaID, &ptTableID, &partName, &withValidation)
if err != nil {
return 0, errors.Trace(err)
}
diff.SchemaID = ptSchemaID
diff.TableID = ptTableID
// Swap
diff.TableID = ptDefID
// Also add correct SchemaID in case different schemas
diff.AffectedOpts[0].SchemaID = ptSchemaID
}
case model.ActionTruncateTablePartition:
diff.TableID = job.TableID
Expand Down
1 change: 0 additions & 1 deletion ddl/metadatalocktest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ go_test(
"//ddl",
"//errno",
"//server",
"//sessionctx/variable",
"//testkit",
"//testkit/testsetup",
"//util/logutil",
Expand Down
13 changes: 5 additions & 8 deletions ddl/metadatalocktest/mdl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/failpoint"
mysql "github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/server"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/logutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1148,6 +1147,7 @@ func TestExchangePartitionStates(t *testing.T) {
tk.MustExec("create database " + dbName)
tk.MustExec("use " + dbName)
tk.MustExec(`set @@global.tidb_enable_metadata_lock = ON`)
defer tk.MustExec(`set @@global.tidb_enable_metadata_lock = DEFAULT`)
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use " + dbName)
tk3 := testkit.NewTestKit(t, store)
Expand All @@ -1159,7 +1159,6 @@ func TestExchangePartitionStates(t *testing.T) {
tk.MustExec(`insert into t values (1, "1")`)
tk.MustExec(`insert into tp values (2, "2")`)
tk.MustExec(`analyze table t,tp`)
tk.MustQuery(`select * from information_schema.global_variables`).Check(testkit.Rows())
var wg sync.WaitGroup
wg.Add(1)
dumpChan := make(chan struct{})
Expand All @@ -1171,7 +1170,6 @@ func TestExchangePartitionStates(t *testing.T) {
tk.MustExec("BEGIN")
tk.MustQuery(`select * from t`).Check(testkit.Rows("1 1"))
tk.MustQuery(`select * from tp`).Check(testkit.Rows("2 2"))
//require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/exchangePartitionAutoID", `pause`))
alterChan := make(chan error)
go func() {
// WITH VALIDATION is the default
Expand All @@ -1193,6 +1191,8 @@ func TestExchangePartitionStates(t *testing.T) {
}
time.Sleep(50 * time.Millisecond)
}
// Sleep 50ms to wait load InfoSchema finish, issue #46815.
time.Sleep(50 * time.Millisecond)
}
waitFor("t", "write only", 4)
tk3.MustExec(`BEGIN`)
Expand All @@ -1205,7 +1205,6 @@ func TestExchangePartitionStates(t *testing.T) {
// MDL will block the alter to not continue until all clients
// are in StateWriteOnly, which tk is blocking until it commits
tk.MustExec(`COMMIT`)
//require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/exchangePartitionAutoID"))
waitFor("t", "rollback done", 11)
// MDL will block the alter from finish, tk is in 'rollbacked' schema version
// but the alter is still waiting for tk3 to commit, before continuing
Expand Down Expand Up @@ -1260,10 +1259,8 @@ func TestExchangePartitionStates(t *testing.T) {
}

func TestExchangePartitionMultiTable(t *testing.T) {
logutil.BgLogger().Info("mdl related variable status before bootstrap", zap.Bool("EnableMDL", variable.EnableMDL.Load()), zap.Bool("EnableConcurrentDDL", variable.EnableConcurrentDDL.Load()))
store := testkit.CreateMockStore(t)
tk1 := testkit.NewTestKit(t, store)
logutil.BgLogger().Info("mdl related variable status after bootstrap", zap.Bool("EnableMDL", variable.EnableMDL.Load()), zap.Bool("EnableConcurrentDDL", variable.EnableConcurrentDDL.Load()))

dbName := "ExchangeMultiTable"
tk1.MustExec(`create schema ` + dbName)
Expand All @@ -1275,7 +1272,6 @@ func TestExchangePartitionMultiTable(t *testing.T) {
tk1.MustExec(`insert into t1 values (0)`)
tk1.MustExec(`insert into t2 values (3)`)
tk1.MustExec(`insert into tp values (6)`)
logutil.BgLogger().Info("mdl related variable status after inserting rows", zap.Bool("EnableMDL", variable.EnableMDL.Load()), zap.Bool("EnableConcurrentDDL", variable.EnableConcurrentDDL.Load()))

tk2 := testkit.NewTestKit(t, store)
tk2.MustExec(`use ` + dbName)
Expand Down Expand Up @@ -1304,6 +1300,8 @@ func TestExchangePartitionMultiTable(t *testing.T) {
}
time.Sleep(100 * time.Millisecond)
}
// Sleep 50ms to wait load InfoSchema finish, issue #46815.
time.Sleep(50 * time.Millisecond)
}
var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -1319,7 +1317,6 @@ func TestExchangePartitionMultiTable(t *testing.T) {
tk3.MustExec(`insert into t1 values (1)`)
tk3.MustExec(`insert into t2 values (2)`)
tk3.MustExec(`insert into tp values (3)`)
//require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/exchangePartitionAutoID", `pause`))
go func() {
alterChan1 <- tk1.ExecToErr(`alter table tp exchange partition p0 with table t1`)
}()
Expand Down
23 changes: 23 additions & 0 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2073,6 +2073,16 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
return ver, errors.Trace(err)
}

if defID != partDef.ID {
logutil.BgLogger().Info("Exchange partition id changed, updating to actual id", zap.String("category", "ddl"),
zap.String("job", job.String()), zap.Int64("defID", defID), zap.Int64("partDef.ID", partDef.ID))
job.Args[0] = partDef.ID
defID = partDef.ID
err = updateDDLJob2Table(w.sess, job, true)
if err != nil {
return ver, errors.Trace(err)
}
}
nt.ExchangePartitionInfo = &model.ExchangePartitionInfo{
ExchangePartitionID: ptID,
ExchangePartitionDefID: defID,
Expand All @@ -2094,6 +2104,18 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
delayForAsyncCommit()
}

if defID != partDef.ID {
// Should never happen, should have been updated above, in previous state!
logutil.BgLogger().Error("Exchange partition id changed, updating to actual id", zap.String("category", "ddl"),
zap.String("job", job.String()), zap.Int64("defID", defID), zap.Int64("partDef.ID", partDef.ID))
job.Args[0] = partDef.ID
defID = partDef.ID
err = updateDDLJob2Table(w.sess, job, true)
if err != nil {
return ver, errors.Trace(err)
}
}

if withValidation {
err = checkExchangePartitionRecordValidation(w, pt, index, ntDbInfo.Name, nt.Name)
if err != nil {
Expand Down Expand Up @@ -2201,6 +2223,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
ntr := rules[ntrID]
ptr := rules[ptrID]

// This must be a bug, nt cannot be partitioned!
partIDs := getPartitionIDs(nt)

var setRules []*label.Rule
Expand Down
28 changes: 22 additions & 6 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,31 +296,46 @@ func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDi
ntID := diff.OldTableID
ptSchemaID := diff.SchemaID
ptID := diff.TableID
partID := diff.TableID
if len(diff.AffectedOpts) > 0 {
// From old version
ptID = diff.AffectedOpts[0].TableID
ptSchemaID = diff.AffectedOpts[0].SchemaID
if diff.AffectedOpts[0].SchemaID != 0 {
ptSchemaID = diff.AffectedOpts[0].SchemaID
}
}
// The normal table needs to be updated first:
// Just update the tables separately
currDiff := &model.SchemaDiff{
// This is only for the case since https://github.com/pingcap/tidb/pull/45877
// Fixed now, by adding back the AffectedOpts
// to carry the partitioned Table ID.
Type: diff.Type,
Version: diff.Version,
TableID: ntID,
SchemaID: ntSchemaID,
}
if ptID != partID {
currDiff.TableID = partID
currDiff.OldTableID = ntID
currDiff.OldSchemaID = ntSchemaID
}
ntIDs, err := b.applyTableUpdate(m, currDiff)
if err != nil {
return nil, errors.Trace(err)
}
b.markPartitionBundleShouldUpdate(ntID)
// Then the partitioned table
// partID is the new id for the non-partitioned table!
b.markTableBundleShouldUpdate(partID)
// Then the partitioned table, will re-read the whole table, including all partitions!
currDiff.TableID = ptID
currDiff.SchemaID = ptSchemaID
currDiff.OldTableID = ptID
currDiff.OldSchemaID = ptSchemaID
ptIDs, err := b.applyTableUpdate(m, currDiff)
if err != nil {
return nil, errors.Trace(err)
}
b.markTableBundleShouldUpdate(ptID)
// ntID is the new id for the partition!
b.markPartitionBundleShouldUpdate(ntID)
err = updateAutoIDForExchangePartition(b.store, ptSchemaID, ptID, ntSchemaID, ntID)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -426,7 +441,8 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6
newTableID = diff.TableID
case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence:
oldTableID = diff.TableID
case model.ActionTruncateTable, model.ActionCreateView:
case model.ActionTruncateTable, model.ActionCreateView,
model.ActionExchangeTablePartition:
oldTableID = diff.OldTableID
newTableID = diff.TableID
default:
Expand Down

0 comments on commit 774c4e7

Please sign in to comment.