Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

fix restore auto increment ID overflow #458

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
180 changes: 110 additions & 70 deletions pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/types"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/glue"
Expand Down Expand Up @@ -108,94 +111,131 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error {
}

var restoreMetaSQL string
if table.Info.IsSequence() {
setValFormat := fmt.Sprintf("do setval(%s.%s, %%d);",
utils.EncloseName(table.DB.Name.O),
utils.EncloseName(table.Info.Name.O))
if table.Info.Sequence.Cycle {
increment := table.Info.Sequence.Increment
// TiDB sequence's behaviour is designed to keep the same pace
// among all nodes within the same cluster. so we need restore round.
// Here is a hack way to trigger sequence cycle round > 0 according to
// https://github.com/pingcap/br/pull/242#issuecomment-631307978
// TODO use sql to set cycle round
nextSeqSQL := fmt.Sprintf("do nextval(%s.%s);",
switch {
case table.Info.IsSequence():
{
setValFormat := fmt.Sprintf("do setval(%s.%s, %%d);",
utils.EncloseName(table.DB.Name.O),
utils.EncloseName(table.Info.Name.O))
var setValSQL string
if increment < 0 {
setValSQL = fmt.Sprintf(setValFormat, table.Info.Sequence.MinValue)
if table.Info.Sequence.Cycle {
increment := table.Info.Sequence.Increment
// TiDB sequence's behaviour is designed to keep the same pace
// among all nodes within the same cluster. so we need restore round.
// Here is a hack way to trigger sequence cycle round > 0 according to
// https://github.com/pingcap/br/pull/242#issuecomment-631307978
// TODO use sql to set cycle round
nextSeqSQL := fmt.Sprintf("do nextval(%s.%s);",
utils.EncloseName(table.DB.Name.O),
utils.EncloseName(table.Info.Name.O))
var setValSQL string
if increment < 0 {
setValSQL = fmt.Sprintf(setValFormat, table.Info.Sequence.MinValue)
} else {
setValSQL = fmt.Sprintf(setValFormat, table.Info.Sequence.MaxValue)
}
err = db.se.Execute(ctx, setValSQL)
if err != nil {
log.Error("restore meta sql failed",
zap.String("query", setValSQL),
zap.Stringer("db", table.DB.Name),
zap.Stringer("table", table.Info.Name),
zap.Error(err))
return errors.Trace(err)
}

// trigger cycle round > 0
err = db.se.Execute(ctx, nextSeqSQL)
if err != nil {
log.Error("restore meta sql failed",
zap.String("query", nextSeqSQL),
zap.Stringer("db", table.DB.Name),
zap.Stringer("table", table.Info.Name),
zap.Error(err))
return errors.Trace(err)
}
}
restoreMetaSQL = fmt.Sprintf(setValFormat, table.Info.AutoIncID)
err = db.se.Execute(ctx, restoreMetaSQL)
}
case utils.NeedAutoID(table.Info):
{
var alterAutoIncIDFormat string
switch {
case table.Info.IsView():
return nil
default:
alterAutoIncIDFormat = "alter table %s.%s auto_increment = %d;"
}

var autoIncID uint64
// auto inc id overflow
if table.Info.AutoIncID < 0 {
log.Info("table auto inc id overflow",
zap.Stringer("db", table.DB.Name),
zap.Stringer("table", table.Info.Name),
zap.Int64("auto inc id", table.Info.AutoIncID),
)
autoIncID = uint64(table.Info.AutoIncID)
} else {
setValSQL = fmt.Sprintf(setValFormat, table.Info.Sequence.MaxValue)
autoIncID = uint64(table.Info.AutoIncID)
}
Comment on lines +178 to 181
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the two branches are the same? so this is simply

    }
    autoIncID = uint64(table.Info.AutoIncID)

?

err = db.se.Execute(ctx, setValSQL)
restoreMetaSQL = fmt.Sprintf(
alterAutoIncIDFormat,
utils.EncloseName(table.DB.Name.O),
utils.EncloseName(table.Info.Name.O),
autoIncID)

err = db.se.Execute(ctx, restoreMetaSQL)
if err != nil {
log.Error("restore meta sql failed",
zap.String("query", setValSQL),
zap.String("query", restoreMetaSQL),
zap.Stringer("db", table.DB.Name),
zap.Stringer("table", table.Info.Name),
zap.Error(err))
return errors.Trace(err)
}
}
case table.Info.PKIsHandle && table.Info.ContainsAutoRandomBits():
{
// this table has auto random id, we need rebase it
var autoRandColTp types.FieldType
for _, c := range table.Info.Columns {
if mysql.HasPriKeyFlag(c.Flag) {
autoRandColTp = c.FieldType
break
}
}
autoRandID := table.Info.AutoRandID
layout := autoid.NewShardIDLayout(&autoRandColTp, table.Info.AutoRandomBits)
autoRandCap := int64(layout.IncrementalBitsCapacity())
if autoRandID >= autoRandCap {
// autoRandID is overflow, so we need set it lower than max allow base.
// for example. max allow base is 288230376151711743 when autoRandomBits is 5.
// so we need set autoRandID to 288230376151711742
autoRandID = autoRandCap - 1
log.Info("table auto rand id overflow",
zap.Stringer("db", table.DB.Name),
zap.Stringer("table", table.Info.Name),
zap.Int64("auto rand id", table.Info.AutoRandID),
)
}
// we can't merge two alter query, because
// it will cause Error: [ddl:8200]Unsupported multi schema change
alterAutoRandIDSQL := fmt.Sprintf(
"alter table %s.%s auto_random_base = %d",
utils.EncloseName(table.DB.Name.O),
utils.EncloseName(table.Info.Name.O),
autoRandID)

// trigger cycle round > 0
err = db.se.Execute(ctx, nextSeqSQL)
err = db.se.Execute(ctx, alterAutoRandIDSQL)
if err != nil {
log.Error("restore meta sql failed",
zap.String("query", nextSeqSQL),
log.Error("alter AutoRandID failed",
zap.String("query", alterAutoRandIDSQL),
zap.Stringer("db", table.DB.Name),
zap.Stringer("table", table.Info.Name),
zap.Error(err))
return errors.Trace(err)
}
}
restoreMetaSQL = fmt.Sprintf(setValFormat, table.Info.AutoIncID)
err = db.se.Execute(ctx, restoreMetaSQL)
} else {
var alterAutoIncIDFormat string
switch {
case table.Info.IsView():
return nil
default:
alterAutoIncIDFormat = "alter table %s.%s auto_increment = %d;"
}
restoreMetaSQL = fmt.Sprintf(
alterAutoIncIDFormat,
utils.EncloseName(table.DB.Name.O),
utils.EncloseName(table.Info.Name.O),
table.Info.AutoIncID)
if utils.NeedAutoID(table.Info) {
err = db.se.Execute(ctx, restoreMetaSQL)
}
}

if err != nil {
log.Error("restore meta sql failed",
zap.String("query", restoreMetaSQL),
zap.Stringer("db", table.DB.Name),
zap.Stringer("table", table.Info.Name),
zap.Error(err))
return errors.Trace(err)
}
if table.Info.PKIsHandle && table.Info.ContainsAutoRandomBits() {
// this table has auto random id, we need rebase it

// we can't merge two alter query, because
// it will cause Error: [ddl:8200]Unsupported multi schema change
alterAutoRandIDSQL := fmt.Sprintf(
"alter table %s.%s auto_random_base = %d",
utils.EncloseName(table.DB.Name.O),
utils.EncloseName(table.Info.Name.O),
table.Info.AutoRandID)

err = db.se.Execute(ctx, alterAutoRandIDSQL)
if err != nil {
log.Error("alter AutoRandID failed",
zap.String("query", alterAutoRandIDSQL),
zap.Stringer("db", table.DB.Name),
zap.Stringer("table", table.Info.Name),
zap.Error(err))
}
}

return errors.Trace(err)
Expand Down
20 changes: 20 additions & 0 deletions tests/br_other/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,23 @@ run_br --version
run_br -V

run_sql "DROP DATABASE $DB;"

run_sql "CREATE DATABASE $DB;"
# generate table with auto_inc_id
run_sql "create table $DB.autoid(auto_inc_id bigint primary key auto_increment)"
run_sql "insert into $DB.autoid values(9223372036854775805)"

run_sql "create table $DB.autoid2(auto_inc_id bigint unsigned primary key auto_increment)"
run_sql "insert into $DB.autoid2 values (12345678901234567890)"

# backup db
echo "backup start overflow test..."
run_br backup db --db "$DB" -s "local://$TEST_DIR/$DB/autoid" --pd $PD_ADDR

run_sql "DROP DATABASE $DB;"

# restore db
echo "restore start..."
run_br restore db --db $DB -s "local://$TEST_DIR/$DB/autoid" --pd $PD_ADDR

run_sql "DROP DATABASE $DB;"