Skip to content

Commit

Permalink
ddl: support multi-schema change for modify columns (pingcap#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored Apr 12, 2022
1 parent 21669d0 commit 5863bae
Show file tree
Hide file tree
Showing 6 changed files with 307 additions and 132 deletions.
225 changes: 167 additions & 58 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,11 +440,13 @@ type modifyingColInfo struct {
changingCol *model.ColumnInfo
changingIdxs []*model.IndexInfo
pos *ast.ColumnPosition
removedIdxs []int64
}

func getModifyColumnInfo(t *meta.Meta, job *model.Job) (*model.DBInfo, *model.TableInfo, *model.ColumnInfo, *modifyingColInfo, error) {
modifyInfo := &modifyingColInfo{pos: &ast.ColumnPosition{}}
err := job.DecodeArgs(&modifyInfo.newCol, &modifyInfo.oldColName, modifyInfo.pos, &modifyInfo.modifyColumnTp, &modifyInfo.updatedAutoRandomBits, &modifyInfo.changingCol, &modifyInfo.changingIdxs)
err := job.DecodeArgs(&modifyInfo.newCol, &modifyInfo.oldColName, modifyInfo.pos, &modifyInfo.modifyColumnTp,
&modifyInfo.updatedAutoRandomBits, &modifyInfo.changingCol, &modifyInfo.changingIdxs, &modifyInfo.removedIdxs)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, modifyInfo, errors.Trace(err)
Expand Down Expand Up @@ -549,49 +551,69 @@ func (w *worker) onModifyColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
return ver, errors.Trace(err)
}

if modifyInfo.changingCol == nil {
changingCol := modifyInfo.changingCol
if changingCol == nil {
newColName := model.NewCIStr(genChangingColumnUniqueName(tblInfo, oldCol))
if mysql.HasPriKeyFlag(oldCol.Flag) {
job.State = model.JobStateCancelled
msg := "this column has primary key flag"
return ver, dbterror.ErrUnsupportedModifyColumn.GenWithStackByArgs(msg)
}

modifyInfo.changingCol = modifyInfo.newCol.Clone()
modifyInfo.changingCol.Name = newColName
modifyInfo.changingCol.ChangeStateInfo = &model.ChangeStateInfo{DependencyColumnOffset: oldCol.Offset}
originDefVal, err := getOriginDefaultValueForModifyColumn(d, modifyInfo.changingCol, oldCol)
changingCol = modifyInfo.newCol.Clone()
changingCol.Name = newColName
changingCol.ChangeStateInfo = &model.ChangeStateInfo{DependencyColumnOffset: oldCol.Offset}

originDefVal, err := getOriginDefaultValueForModifyColumn(d, changingCol, oldCol)
if err != nil {
return ver, errors.Trace(err)
}
if err = modifyInfo.changingCol.SetOriginDefaultValue(originDefVal); err != nil {
if err = changingCol.SetOriginDefaultValue(originDefVal); err != nil {
return ver, errors.Trace(err)
}

createColumnInfo(tblInfo, modifyInfo.changingCol)

idxInfos, offsets := findIndexesByColName(tblInfo.Indices, oldCol.Name.L)
modifyInfo.changingIdxs = make([]*model.IndexInfo, 0, len(idxInfos))
for i, idxInfo := range idxInfos {
newIdxInfo := idxInfo.Clone()
newIdxInfo.Name = model.NewCIStr(genChangingIndexUniqueName(tblInfo, idxInfo))
newIdxInfo.ID = allocateIndexID(tblInfo)
newIdxChangingCol := newIdxInfo.Columns[offsets[i]]
newIdxChangingCol.Name = newColName
newIdxChangingCol.Offset = modifyInfo.changingCol.Offset
canPrefix := types.IsTypePrefixable(modifyInfo.changingCol.Tp)
if !canPrefix || (canPrefix && modifyInfo.changingCol.Flen < newIdxChangingCol.Length) {
newIdxChangingCol.Length = types.UnspecifiedLength
createColumnInfo(tblInfo, changingCol)
indexesToChange := findIndexesByColName(tblInfo, oldCol.Name)
var indexesToRemove []int64
for _, info := range indexesToChange {
newIdxID := allocateIndexID(tblInfo)
if info.isModifying {
newIdxName := info.indexInfo.Name
newIdxInfo := copyIndexInfoForModifyColumn(info.indexInfo, newIdxID, newIdxName, info.colOffset, changingCol)
indexesToRemove = append(indexesToRemove, info.indexInfo.ID)
tblInfo.Indices[info.idxOffset] = newIdxInfo
} else {
newIdxName := model.NewCIStr(genChangingIndexUniqueName(tblInfo, info.indexInfo))
newIdxInfo := copyIndexInfoForModifyColumn(info.indexInfo, newIdxID, newIdxName, info.colOffset, changingCol)
tblInfo.Indices = append(tblInfo.Indices, newIdxInfo)
}
modifyInfo.changingIdxs = append(modifyInfo.changingIdxs, newIdxInfo)
}
tblInfo.Indices = append(tblInfo.Indices, modifyInfo.changingIdxs...)
modifyInfo.removedIdxs = indexesToRemove
} else {
tblInfo.Columns[len(tblInfo.Columns)-1] = modifyInfo.changingCol
copy(tblInfo.Indices[len(tblInfo.Indices)-len(modifyInfo.changingIdxs):], modifyInfo.changingIdxs)
changingCol = model.FindColumnInfoByID(tblInfo.Columns, modifyInfo.changingCol.ID)
if changingCol == nil {
logutil.BgLogger().Error("[ddl] the changing column has been removed", zap.Error(err))
job.State = model.JobStateCancelled
return ver, errors.Trace(infoschema.ErrColumnNotExists.GenWithStackByArgs(oldCol.Name, tblInfo.Name))
}
}

return w.doModifyColumnTypeWithData(d, t, job, dbInfo, tblInfo, modifyInfo.changingCol, oldCol, modifyInfo.newCol.Name, modifyInfo.pos, modifyInfo.changingIdxs)
return w.doModifyColumnTypeWithData(d, t, job, dbInfo, tblInfo, changingCol, oldCol, modifyInfo.newCol.Name, modifyInfo.pos, modifyInfo.removedIdxs)
}

func copyIndexInfoForModifyColumn(idxInfo *model.IndexInfo, newIndexID int64, newIndexName model.CIStr,
colOffset int, changingCol *model.ColumnInfo) *model.IndexInfo {
newIdxInfo := idxInfo.Clone()
newIdxInfo.Name = newIndexName
newIdxInfo.ID = newIndexID
newIdxChangingCol := newIdxInfo.Columns[colOffset]
newIdxChangingCol.Name = changingCol.Name
newIdxChangingCol.Offset = changingCol.Offset
canPrefix := types.IsTypePrefixable(changingCol.Tp)
if !canPrefix || (canPrefix && changingCol.Flen < newIdxChangingCol.Length) {
newIdxChangingCol.Length = types.UnspecifiedLength
}
return newIdxInfo
}

// rollbackModifyColumnJobWithData is used to rollback modify-column job which need to reorg the data.
Expand All @@ -603,34 +625,62 @@ func rollbackModifyColumnJobWithData(t *meta.Meta, tblInfo *model.TableInfo, job
// Reset PreventNullInsertFlag flag.
tblInfo.Columns[oldCol.Offset].Flag = oldCol.Flag &^ mysql.PreventNullInsertFlag
}
var changingIdxIDs []int64
if modifyInfo.changingCol != nil {
// changingCol isn't nil means the job has been in the mid state. These appended changingCol and changingIndex should
changingIdxIDs = getRelatedIndexIDs(tblInfo, modifyInfo.changingCol.ID)
// The job is in the middle state. The appended changingCol and changingIndex should
// be removed from the tableInfo as well.
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-1]
tblInfo.Indices = tblInfo.Indices[:len(tblInfo.Indices)-len(modifyInfo.changingIdxs)]
removeChangingColAndIdxs(tblInfo, modifyInfo.changingCol.ID)
}
ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
// Refactor the job args to add the abandoned temporary index ids into delete range table.
idxIDs := make([]int64, 0, len(modifyInfo.changingIdxs))
for _, idx := range modifyInfo.changingIdxs {
idxIDs = append(idxIDs, idx.ID)
}
job.Args = []interface{}{idxIDs, getPartitionIDs(tblInfo)}
// Reconstruct the job args to add the abandoned temporary index ids into delete range table.
job.Args = []interface{}{changingIdxIDs, getPartitionIDs(tblInfo)}
return ver, nil
}

func removeChangingColAndIdxs(tblInfo *model.TableInfo, changingColID int64) {
var colName string
for i, c := range tblInfo.Columns {
if c.ID == changingColID {
tblInfo.MoveColumnInfo(i, len(tblInfo.Columns)-1)
colName = c.Name.L
break
}
}
if len(colName) == 0 {
return
}
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-1]
for i, idx := range tblInfo.Indices {
for _, idxCol := range idx.Columns {
if colName == idxCol.Name.L {
tblInfo.Indices[i] = nil
break
}
}
}
tmp := tblInfo.Indices[:0]
for _, idx := range tblInfo.Indices {
if idx != nil {
tmp = append(tmp, idx)
}
}
tblInfo.Indices = tmp
}

func (w *worker) doModifyColumnTypeWithData(
d *ddlCtx, t *meta.Meta, job *model.Job,
dbInfo *model.DBInfo, tblInfo *model.TableInfo, changingCol, oldCol *model.ColumnInfo,
colName model.CIStr, pos *ast.ColumnPosition, changingIdxs []*model.IndexInfo) (ver int64, _ error) {
colName model.CIStr, pos *ast.ColumnPosition, rmIdxIDs []int64) (ver int64, _ error) {
var err error
originalState := changingCol.State
targetCol := changingCol.Clone()
targetCol.Name = colName
changingIdxs := getRelatedIndexInfos(tblInfo, changingCol.ID)
switch changingCol.State {
case model.StateNone:
// Column from null to not null.
Expand Down Expand Up @@ -670,7 +720,7 @@ func (w *worker) doModifyColumnTypeWithData(
// be updated in `updateDDLJob` even if it meets an error in `updateVersionAndTableInfoWithCheck`.
job.SchemaState = model.StateDeleteOnly
metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn).Set(0)
job.Args = append(job.Args, changingCol, changingIdxs)
job.Args = append(job.Args, changingCol, changingIdxs, rmIdxIDs)
case model.StateDeleteOnly:
// Column from null to not null.
if !mysql.HasNotNullFlag(oldCol.Flag) && mysql.HasNotNullFlag(changingCol.Flag) {
Expand Down Expand Up @@ -707,12 +757,12 @@ func (w *worker) doModifyColumnTypeWithData(
}

var done bool
done, ver, err = doReorgWorkForModifyColumn(w, d, t, job, tbl, oldCol, changingCol, changingIdxs)
done, ver, err = doReorgWorkForModifyColumnMultiSchema(w, d, t, job, tbl, oldCol, changingCol, changingIdxs)
if !done {
return ver, err
}

oldIdxIDs := getOldIndexIDs(tblInfo, oldCol) // used by GC delete range.
rmIdxIDs = append(getRelatedIndexIDs(tblInfo, oldCol.ID), rmIdxIDs...)

err = adjustTableInfoAfterModifyColumnWithData(tblInfo, pos, oldCol, changingCol, colName, changingIdxs)
if err != nil {
Expand All @@ -729,7 +779,7 @@ func (w *worker) doModifyColumnTypeWithData(
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
// Refactor the job args to add the old index ids into delete range table.
job.Args = []interface{}{oldIdxIDs, getPartitionIDs(tblInfo)}
job.Args = []interface{}{rmIdxIDs, getPartitionIDs(tblInfo)}
asyncNotifyEvent(d, &ddlutil.Event{Tp: model.ActionModifyColumn, TableInfo: tblInfo, ColumnInfos: []*model.ColumnInfo{changingCol}})
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("column", changingCol.State)
Expand All @@ -738,6 +788,30 @@ func (w *worker) doModifyColumnTypeWithData(
return ver, errors.Trace(err)
}

func doReorgWorkForModifyColumnMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table,
oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) {
if job.MultiSchemaInfo != nil {
if job.IsCancelling() {
logutil.BgLogger().Warn("[ddl] run modify column job failed, convert job to rollback",
zap.String("job", job.String()), zap.Error(err))
w.reorgCtx.cleanNotifyReorgCancel()
job.State = model.JobStateRollingback
return false, ver, err
}
if job.MultiSchemaInfo.Revertible {
done, ver, err = doReorgWorkForModifyColumn(w, d, t, job, tbl, oldCol, changingCol, changingIdxs)
if done {
job.MarkNonRevertible()
done = false // Wait for the other sub jobs.
}
return done, ver, err
}
// Non-revertible means all the sub jobs finished.
return true, ver, err
}
return doReorgWorkForModifyColumn(w, d, t, job, tbl, oldCol, changingCol, changingIdxs)
}

func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table,
oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) {
reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, BuildElements(changingCol, changingIdxs))
Expand Down Expand Up @@ -796,8 +870,8 @@ func adjustTableInfoAfterModifyColumnWithData(tblInfo *model.TableInfo, pos *ast
internalColName := changingCol.Name
changingCol = replaceOldColumn(tblInfo, oldCol, changingCol, newName)
if len(changingIdxs) > 0 {
replaceOldIndexes(tblInfo, changingIdxs)
updateNewIndexesCols(tblInfo, internalColName, newName, changingCol.Offset)
indexesToRemove := updateNewIndexesCols(changingIdxs, internalColName, newName, changingCol.Offset)
replaceOldIndexes(tblInfo, indexesToRemove)
}
// Move the new column to a correct offset.
destOffset, err := locateOffsetToMove(changingCol.Offset, pos, tblInfo)
Expand All @@ -811,17 +885,10 @@ func adjustTableInfoAfterModifyColumnWithData(tblInfo *model.TableInfo, pos *ast
func replaceOldColumn(tblInfo *model.TableInfo, oldCol, changingCol *model.ColumnInfo,
newName model.CIStr) *model.ColumnInfo {
// Replace the old column.
tblInfo.Columns[oldCol.Offset] = changingCol
tblInfo.Columns[changingCol.Offset] = nil
tblInfo.MoveColumnInfo(changingCol.Offset, len(tblInfo.Columns)-1)
changingCol = updateChangingCol(changingCol, newName, oldCol.Offset)
// Remove nil column.
tmp := tblInfo.Columns[:0]
for _, c := range tblInfo.Columns {
if c != nil {
tmp = append(tmp, c)
}
}
tblInfo.Columns = tmp
tblInfo.Columns[oldCol.Offset] = changingCol
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-1]
return changingCol
}

Expand Down Expand Up @@ -856,16 +923,29 @@ func replaceOldIndexes(tblInfo *model.TableInfo, changingIdxs []*model.IndexInfo
}
}

func updateNewIndexesCols(tblInfo *model.TableInfo, oldName, newName model.CIStr, newOffset int) {
for _, idx := range tblInfo.Indices {
// updateNewIndexesCols updates the changing indexes column name&offset, and
// filter out the indexes that can be removed.
func updateNewIndexesCols(changingIdxs []*model.IndexInfo,
oldName, newName model.CIStr, newOffset int) []*model.IndexInfo {
indexesToRemove := make([]*model.IndexInfo, 0, len(changingIdxs))
for _, idx := range changingIdxs {
var hasOtherChangingCol bool
for i, col := range idx.Columns {
if col.Name.L == oldName.L {
idx.Columns[i].Name = newName
idx.Columns[i].Offset = newOffset
break
} else {
if !hasOtherChangingCol {
hasOtherChangingCol = getChangingColumnOriginName(col) != col.Name.O
}
}
}
// For the indexes that still contains other changing column, skip removing it now.
if !hasOtherChangingCol {
indexesToRemove = append(indexesToRemove, idx)
}
}
return indexesToRemove
}

func updateChangingCol(col *model.ColumnInfo, newName model.CIStr, newOffset int) *model.ColumnInfo {
Expand All @@ -880,11 +960,24 @@ func updateChangingCol(col *model.ColumnInfo, newName model.CIStr, newOffset int
return col
}

func getOldIndexIDs(tblInfo *model.TableInfo, oldCol *model.ColumnInfo) []int64 {
func getRelatedIndexInfos(tblInfo *model.TableInfo, colID int64) []*model.IndexInfo {
var indexInfos []*model.IndexInfo
for _, idx := range tblInfo.Indices {
for _, idxCol := range idx.Columns {
if tblInfo.Columns[idxCol.Offset].ID == colID {
indexInfos = append(indexInfos, idx)
break
}
}
}
return indexInfos
}

func getRelatedIndexIDs(tblInfo *model.TableInfo, colID int64) []int64 {
var oldIdxIDs []int64
for _, idx := range tblInfo.Indices {
for _, idxCol := range idx.Columns {
if tblInfo.Columns[idxCol.Offset].ID == oldCol.ID {
if tblInfo.Columns[idxCol.Offset].ID == colID {
oldIdxIDs = append(oldIdxIDs, idx.ID)
break
}
Expand Down Expand Up @@ -1287,6 +1380,12 @@ func (w *worker) doModifyColumn(
}
}

if job.MultiSchemaInfo != nil && job.MultiSchemaInfo.Revertible {
job.MarkNonRevertible()
// Store the mark and enter the next DDL handling loop.
return updateVersionAndTableInfoWithCheck(t, job, tblInfo, false)
}

if err := adjustTableInfoAfterModifyColumn(tblInfo, newCol, oldCol, pos); err != nil {
job.State = model.JobStateRollingback
return ver, errors.Trace(err)
Expand Down Expand Up @@ -1320,7 +1419,7 @@ func adjustTableInfoAfterModifyColumn(
}
tblInfo.Columns[oldCol.Offset] = newCol
tblInfo.MoveColumnInfo(oldCol.Offset, destOffset)
updateNewIndexesCols(tblInfo, oldCol.Name, newCol.Name, newCol.Offset)
updateNewIndexesCols(tblInfo.Indices, oldCol.Name, newCol.Name, newCol.Offset)
return nil
}

Expand Down Expand Up @@ -1687,6 +1786,16 @@ func genChangingIndexUniqueName(tblInfo *model.TableInfo, idxInfo *model.IndexIn
return fmt.Sprintf("%s_%d", newIndexNamePrefix, suffix)
}

func getChangingColumnOriginName(changingCol *model.IndexColumn) string {
colName := strings.ToLower(strings.TrimPrefix(changingCol.Name.O, changingColumnPrefix))
// Since the unique colName may contain the suffix number (columnName_num), better trim the suffix.
var pos int
if pos = strings.LastIndex(colName, "_"); pos == -1 {
return colName
}
return colName[:pos]
}

func getChangingIndexOriginName(changingIdx *model.IndexInfo) string {
idxName := strings.TrimPrefix(changingIdx.Name.O, changingIndexPrefix)
// Since the unique idxName may contain the suffix number (indexName_num), better trim the suffix.
Expand Down
Loading

0 comments on commit 5863bae

Please sign in to comment.