Skip to content

Commit

Permalink
Merge branch 'master' into issue-28851
Browse files Browse the repository at this point in the history
  • Loading branch information
LittleFall committed Dec 6, 2021
2 parents 569b82a + 7203707 commit d4cb75e
Show file tree
Hide file tree
Showing 12 changed files with 616 additions and 521 deletions.
181 changes: 0 additions & 181 deletions br/tests/br_log_restore/run.sh

This file was deleted.

12 changes: 0 additions & 12 deletions br/tests/br_log_restore/workload

This file was deleted.

2 changes: 2 additions & 0 deletions br/tests/br_other/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ run_curl https://$PD_ADDR/pd/api/v1/config/schedule | jq '."max-merge-region-siz
run_curl https://$PD_ADDR/pd/api/v1/config/schedule | jq '."max-merge-region-keys"' | grep -E "^0$"

backup_fail=0
# generate 1.sst to make another backup failed.
touch "$TEST_DIR/$DB/lock/1.sst"
echo "another backup start expect to fail due to last backup add a lockfile"
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/lock" --concurrency 4 || backup_fail=1
if [ "$backup_fail" -ne "1" ];then
Expand Down
4 changes: 4 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,10 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
d.limitJobCh <- task
// worker should restart to continue handling tasks in limitJobCh, and send back through task.err
err := <-task.err
if err != nil {
// The transaction of enqueuing job is failed.
return errors.Trace(err)
}

ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true

Expand Down
45 changes: 45 additions & 0 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/testleak"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
)

Expand Down Expand Up @@ -129,6 +130,17 @@ func getSchemaVer(c *C, ctx sessionctx.Context) int64 {
return ver
}

func getSchemaVerT(t *testing.T, ctx sessionctx.Context) int64 {
err := ctx.NewTxn(context.Background())
require.NoError(t, err)
txn, err := ctx.Txn(true)
require.NoError(t, err)
m := meta.NewMeta(txn)
ver, err := m.GetSchemaVersion()
require.NoError(t, err)
return ver
}

type historyJobArgs struct {
ver int64
db *model.DBInfo
Expand All @@ -146,6 +158,16 @@ func checkEqualTable(c *C, t1, t2 *model.TableInfo) {
c.Assert(t1.AutoIncID, DeepEquals, t2.AutoIncID)
}

func checkEqualTableT(t *testing.T, t1, t2 *model.TableInfo) {
require.Equal(t, t1.ID, t2.ID)
require.Equal(t, t1.Name, t2.Name)
require.Equal(t, t1.Charset, t2.Charset)
require.Equal(t, t1.Collate, t2.Collate)
require.EqualValues(t, t1.PKIsHandle, t2.PKIsHandle)
require.EqualValues(t, t1.Comment, t2.Comment)
require.EqualValues(t, t1.AutoIncID, t2.AutoIncID)
}

func checkHistoryJob(c *C, job *model.Job) {
c.Assert(job.State, Equals, model.JobStateSynced)
}
Expand Down Expand Up @@ -173,6 +195,29 @@ func checkHistoryJobArgs(c *C, ctx sessionctx.Context, id int64, args *historyJo
}
}

func checkHistoryJobArgsT(t *testing.T, ctx sessionctx.Context, id int64, args *historyJobArgs) {
txn, err := ctx.Txn(true)
require.NoError(t, err)
tt := meta.NewMeta(txn)
historyJob, err := tt.GetHistoryDDLJob(id)
require.NoError(t, err)
require.Greater(t, historyJob.BinlogInfo.FinishedTS, uint64(0))

if args.tbl != nil {
require.Equal(t, args.ver, historyJob.BinlogInfo.SchemaVersion)
checkEqualTableT(t, historyJob.BinlogInfo.TableInfo, args.tbl)
return
}

// for handling schema job
require.Equal(t, args.ver, historyJob.BinlogInfo.SchemaVersion)
require.EqualValues(t, args.db, historyJob.BinlogInfo.DBInfo)
// only for creating schema job
if args.db != nil && len(args.tblIDs) == 0 {
return
}
}

func buildCreateIdxJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, unique bool, indexName string, colName string) *model.Job {
return &model.Job{
SchemaID: dbInfo.ID,
Expand Down
11 changes: 10 additions & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,11 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) {
return errors.Trace(err)
}
}
failpoint.Inject("mockAddBatchDDLJobsErr", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(errors.Errorf("mockAddBatchDDLJobsErr"))
}
})
return nil
})
var jobs string
Expand All @@ -310,7 +315,11 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) {
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerAddDDLJob, task.job.Type.String(),
metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
}
logutil.BgLogger().Info("[ddl] add DDL jobs", zap.Int("batch count", len(tasks)), zap.String("jobs", jobs))
if err != nil {
logutil.BgLogger().Warn("[ddl] add DDL jobs failed", zap.String("jobs", jobs), zap.Error(err))
} else {
logutil.BgLogger().Info("[ddl] add DDL jobs", zap.Int("batch count", len(tasks)), zap.String("jobs", jobs))
}
}

// getHistoryDDLJob gets a DDL job with job's ID from history queue.
Expand Down
Loading

0 comments on commit d4cb75e

Please sign in to comment.