diff --git a/br/tests/br_log_restore/run.sh b/br/tests/br_log_restore/run.sh deleted file mode 100755 index 3753e58267816..0000000000000 --- a/br/tests/br_log_restore/run.sh +++ /dev/null @@ -1,181 +0,0 @@ -#!/bin/bash -# -# Copyright 2020 PingCAP, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -set -eux -DB="$TEST_NAME" -TABLE="usertable" -DB_COUNT=3 -BUCKET="cdcs3" -CDC_COUNT=3 - -# start the s3 server -export MINIO_ACCESS_KEY=brs3accesskey -export MINIO_SECRET_KEY=brs3secretkey -export MINIO_BROWSER=off -export AWS_ACCESS_KEY_ID=$MINIO_ACCESS_KEY -export AWS_SECRET_ACCESS_KEY=$MINIO_SECRET_KEY -export S3_ENDPOINT=127.0.0.1:24928 -rm -rf "$TEST_DIR/$DB" -mkdir -p "$TEST_DIR/$DB" -bin/minio server --address $S3_ENDPOINT "$TEST_DIR/$DB" & -i=0 -while ! curl -o /dev/null -s "http://$S3_ENDPOINT/"; do - i=$(($i+1)) - if [ $i -gt 30 ]; then - echo 'Failed to start minio' - exit 1 - fi - sleep 2 -done - -bin/mc config --config-dir "$TEST_DIR/$TEST_NAME" \ - host add minio http://$S3_ENDPOINT $MINIO_ACCESS_KEY $MINIO_SECRET_KEY -bin/mc mb --config-dir "$TEST_DIR/$TEST_NAME" minio/$BUCKET - -# Start cdc servers -run_cdc server --pd=https://$PD_ADDR --log-file=ticdc.log --addr=0.0.0.0:18301 --advertise-addr=127.0.0.1:18301 & -trap 'cat ticdc.log' ERR - -# TODO: remove this after TiCDC supports TiDB clustered index -run_sql "set @@global.tidb_enable_clustered_index=0" -# TiDB global variables cache 2 seconds -sleep 2 - -# create change feed for s3 log -run_cdc cli changefeed create --pd=https://$PD_ADDR --sink-uri="s3://$BUCKET/$DB?endpoint=http://$S3_ENDPOINT" --changefeed-id="simple-replication-task" - -start_ts=$(run_sql "show master status;" | grep Position | awk -F ':' '{print $2}' | xargs) - -# Fill in the database -for i in $(seq $DB_COUNT); do - run_sql "CREATE DATABASE $DB${i};" - go-ycsb load mysql -P tests/$TEST_NAME/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB${i} -done - -for i in $(seq $DB_COUNT); do - row_count_ori[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}') -done - -# test drop & create schema/table, finally only db2 has one row -run_sql "create schema ${DB}_DDL1;" -run_sql "create table ${DB}_DDL1.t1 (a int primary key, b varchar(10));" -run_sql "insert into ${DB}_DDL1.t1 values (1, 'x');" - -run_sql "drop schema ${DB}_DDL1;" -run_sql "create schema ${DB}_DDL1;" -run_sql "create schema ${DB}_DDL2;" - -run_sql "create table ${DB}_DDL2.t2 (a int primary key, b varchar(10));" -run_sql "insert into ${DB}_DDl2.t2 values (2, 'x');" - -run_sql "drop table ${DB}_DDL2.t2;" -run_sql "create table ${DB}_DDL2.t2 (a int primary key, b varchar(10));" -run_sql "insert into ${DB}_DDL2.t2 values (3, 'x');" -run_sql "delete from ${DB}_DDL2.t2 where a = 3;" -run_sql "insert into ${DB}_DDL2.t2 values (4, 'x');" - -end_ts=$(run_sql "show master status;" | grep Position | awk -F ':' '{print $2}' | xargs) - - -# if we restore with ts range [start_ts, end_ts], then the below record won't be restored. -run_sql "insert into ${DB}_DDL2.t2 values (5, 'x');" - -wait_time=0 -checkpoint_ts=$(run_cdc cli changefeed query -c simple-replication-task --pd=https://$PD_ADDR | jq '.status."checkpoint-ts"') -while [ "$checkpoint_ts" -lt "$end_ts" ]; do - echo "waiting for cdclog syncing... (checkpoint_ts = $checkpoint_ts; end_ts = $end_ts)" - if [ "$wait_time" -gt 300 ]; then - echo "cdc failed to sync after 300s, please check the CDC log." - exit 1 - fi - sleep 5 - wait_time=$(( wait_time + 5 )) - checkpoint_ts=$(run_cdc cli changefeed query -c simple-replication-task --pd=https://$PD_ADDR | jq '.status."checkpoint-ts"') -done - -# remove the change feed, because we don't want to record the drop ddl. -echo "Y" | run_cdc cli unsafe reset --pd=https://$PD_ADDR - -for i in $(seq $DB_COUNT); do - run_sql "DROP DATABASE $DB${i};" -done -run_sql "DROP DATABASE ${DB}_DDL1" -run_sql "DROP DATABASE ${DB}_DDL2" - -# restore full -export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/backend/local/FailIngestMeta=return("notleader")' -echo "restore start..." -run_br restore cdclog -s "s3://$BUCKET/$DB" --pd $PD_ADDR --s3.endpoint="http://$S3_ENDPOINT" \ - --log-file "restore.log" --log-level "info" --start-ts $start_ts --end-ts $end_ts - -for i in $(seq $DB_COUNT); do - row_count_new[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}') -done - -fail=false -row_count=$(run_sql "SELECT COUNT(*) FROM ${DB}_DDL2.t2 WHERE a=4;" | awk '/COUNT/{print $2}') -if [ "$row_count" -ne "1" ]; then - fail=true - echo "TEST: [$TEST_NAME] fail on dml&ddl drop test." -fi - - -# record a=5 shouldn't be restore, because we set -end-ts without this record. -row_count=$(run_sql "SELECT COUNT(*) FROM ${DB}_DDL2.t2 WHERE a=5;" | awk '/COUNT/{print $2}') -if [ "$row_count" -ne "0" ]; then - fail=true - echo "TEST: [$TEST_NAME] fail on ts range test." -fi - -export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/backend/local/FailIngestMeta=return("epochnotmatch")' -echo "restore again to restore a=5 record..." -run_br restore cdclog -s "s3://$BUCKET/$DB" --pd $PD_ADDR --s3.endpoint="http://$S3_ENDPOINT" \ - --log-file "restore.log" --log-level "info" --start-ts $end_ts - -# record a=5 should be restore, because we set -end-ts without this record. -row_count=$(run_sql "SELECT COUNT(*) FROM ${DB}_DDL2.t2 WHERE a=5;" | awk '/COUNT/{print $2}') -if [ "$row_count" -ne "1" ]; then - fail=true - echo "TEST: [$TEST_NAME] fail on recover ts range test." -fi - -# record a=3 should be deleted -row_count=$(run_sql "SELECT COUNT(*) FROM ${DB}_DDL2.t2 WHERE a=3;" | awk '/COUNT/{print $2}') -if [ "$row_count" -ne "0" ]; then - fail=true - echo "TEST: [$TEST_NAME] fail on key not deleted." -fi - - -for i in $(seq $DB_COUNT); do - if [ "${row_count_ori[i]}" != "${row_count_new[i]}" ];then - fail=true - echo "TEST: [$TEST_NAME] fail on database $DB${i}" - fi - echo "database $DB${i} [original] row count: ${row_count_ori[i]}, [after br] row count: ${row_count_new[i]}" -done - -if $fail; then - echo "TEST: [$TEST_NAME] failed!" - exit 1 -fi - -for i in $(seq $DB_COUNT); do - run_sql "DROP DATABASE $DB${i};" -done - -run_sql "DROP DATABASE ${DB}_DDL1" -run_sql "DROP DATABASE ${DB}_DDL2" diff --git a/br/tests/br_log_restore/workload b/br/tests/br_log_restore/workload deleted file mode 100644 index 664fe7ee88228..0000000000000 --- a/br/tests/br_log_restore/workload +++ /dev/null @@ -1,12 +0,0 @@ -recordcount=1000 -operationcount=0 -workload=core - -readallfields=true - -readproportion=0 -updateproportion=0 -scanproportion=0 -insertproportion=0 - -requestdistribution=uniform diff --git a/br/tests/br_other/run.sh b/br/tests/br_other/run.sh index 313f2c5e273c0..79ffb9d2732e8 100644 --- a/br/tests/br_other/run.sh +++ b/br/tests/br_other/run.sh @@ -95,6 +95,8 @@ run_curl https://$PD_ADDR/pd/api/v1/config/schedule | jq '."max-merge-region-siz run_curl https://$PD_ADDR/pd/api/v1/config/schedule | jq '."max-merge-region-keys"' | grep -E "^0$" backup_fail=0 +# generate 1.sst to make another backup failed. +touch "$TEST_DIR/$DB/lock/1.sst" echo "another backup start expect to fail due to last backup add a lockfile" run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/lock" --concurrency 4 || backup_fail=1 if [ "$backup_fail" -ne "1" ];then diff --git a/ddl/ddl.go b/ddl/ddl.go index 1835eb9bfb28e..ebec94019105b 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -545,6 +545,10 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error { d.limitJobCh <- task // worker should restart to continue handling tasks in limitJobCh, and send back through task.err err := <-task.err + if err != nil { + // The transaction of enqueuing job is failed. + return errors.Trace(err) + } ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index f158c3203abd3..08eb1f2bbae1d 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/testleak" + "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/tikv" ) @@ -129,6 +130,17 @@ func getSchemaVer(c *C, ctx sessionctx.Context) int64 { return ver } +func getSchemaVerT(t *testing.T, ctx sessionctx.Context) int64 { + err := ctx.NewTxn(context.Background()) + require.NoError(t, err) + txn, err := ctx.Txn(true) + require.NoError(t, err) + m := meta.NewMeta(txn) + ver, err := m.GetSchemaVersion() + require.NoError(t, err) + return ver +} + type historyJobArgs struct { ver int64 db *model.DBInfo @@ -146,6 +158,16 @@ func checkEqualTable(c *C, t1, t2 *model.TableInfo) { c.Assert(t1.AutoIncID, DeepEquals, t2.AutoIncID) } +func checkEqualTableT(t *testing.T, t1, t2 *model.TableInfo) { + require.Equal(t, t1.ID, t2.ID) + require.Equal(t, t1.Name, t2.Name) + require.Equal(t, t1.Charset, t2.Charset) + require.Equal(t, t1.Collate, t2.Collate) + require.EqualValues(t, t1.PKIsHandle, t2.PKIsHandle) + require.EqualValues(t, t1.Comment, t2.Comment) + require.EqualValues(t, t1.AutoIncID, t2.AutoIncID) +} + func checkHistoryJob(c *C, job *model.Job) { c.Assert(job.State, Equals, model.JobStateSynced) } @@ -173,6 +195,29 @@ func checkHistoryJobArgs(c *C, ctx sessionctx.Context, id int64, args *historyJo } } +func checkHistoryJobArgsT(t *testing.T, ctx sessionctx.Context, id int64, args *historyJobArgs) { + txn, err := ctx.Txn(true) + require.NoError(t, err) + tt := meta.NewMeta(txn) + historyJob, err := tt.GetHistoryDDLJob(id) + require.NoError(t, err) + require.Greater(t, historyJob.BinlogInfo.FinishedTS, uint64(0)) + + if args.tbl != nil { + require.Equal(t, args.ver, historyJob.BinlogInfo.SchemaVersion) + checkEqualTableT(t, historyJob.BinlogInfo.TableInfo, args.tbl) + return + } + + // for handling schema job + require.Equal(t, args.ver, historyJob.BinlogInfo.SchemaVersion) + require.EqualValues(t, args.db, historyJob.BinlogInfo.DBInfo) + // only for creating schema job + if args.db != nil && len(args.tblIDs) == 0 { + return + } +} + func buildCreateIdxJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, unique bool, indexName string, colName string) *model.Job { return &model.Job{ SchemaID: dbInfo.ID, diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 6cc3a77461c56..617c42c639d6c 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -301,6 +301,11 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { return errors.Trace(err) } } + failpoint.Inject("mockAddBatchDDLJobsErr", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(errors.Errorf("mockAddBatchDDLJobsErr")) + } + }) return nil }) var jobs string @@ -310,7 +315,11 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerAddDDLJob, task.job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) } - logutil.BgLogger().Info("[ddl] add DDL jobs", zap.Int("batch count", len(tasks)), zap.String("jobs", jobs)) + if err != nil { + logutil.BgLogger().Warn("[ddl] add DDL jobs failed", zap.String("jobs", jobs), zap.Error(err)) + } else { + logutil.BgLogger().Info("[ddl] add DDL jobs", zap.Int("batch count", len(tasks)), zap.String("jobs", jobs)) + } } // getHistoryDDLJob gets a DDL job with job's ID from history queue. diff --git a/ddl/ddl_worker_test.go b/ddl/ddl_worker_test.go index d286e499ec12d..5321d12671b40 100644 --- a/ddl/ddl_worker_test.go +++ b/ddl/ddl_worker_test.go @@ -17,6 +17,7 @@ package ddl import ( "context" "sync" + "testing" "time" . "github.com/pingcap/check" @@ -36,6 +37,7 @@ import ( "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/testutil" + "github.com/stretchr/testify/require" ) var _ = Suite(&testDDLSuite{}) @@ -492,6 +494,32 @@ func (s *testDDLSuite) TestColumnError(c *C) { doDDLJobErr(c, dbInfo.ID, tblInfo.ID, model.ActionDropColumns, []interface{}{[]model.CIStr{model.NewCIStr("c5"), model.NewCIStr("c6")}, make([]bool, 2)}, ctx, d) } +func (s *testDDLSerialSuite) TestAddBatchJobError(c *C) { + store := testCreateStore(c, "test_add_batch_job_error") + defer func() { + err := store.Close() + c.Assert(err, IsNil) + }() + d, err := testNewDDLAndStart( + context.Background(), + WithStore(store), + WithLease(testLease), + ) + c.Assert(err, IsNil) + defer func() { + err := d.Stop() + c.Assert(err, IsNil) + }() + ctx := testNewContext(d) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/ddl/mockAddBatchDDLJobsErr", `return(true)`), IsNil) + // Test the job runner should not hang forever. + job := &model.Job{SchemaID: 1, TableID: 1} + err = d.doDDLJob(ctx, job) + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "mockAddBatchDDLJobsErr") + c.Assert(failpoint.Disable("github.com/pingcap/tidb/ddl/mockAddBatchDDLJobsErr"), IsNil) +} + func testCheckOwner(c *C, d *ddl, expectedVal bool) { c.Assert(d.isOwner(), Equals, expectedVal) } @@ -513,6 +541,37 @@ func testCheckJobDone(c *C, d *ddl, job *model.Job, isAdd bool) { c.Assert(err, IsNil) } +func testCheckJobDoneT(t *testing.T, d *ddl, job *model.Job, isAdd bool) { + err := kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { + tt := meta.NewMeta(txn) + historyJob, err := tt.GetHistoryDDLJob(job.ID) + require.NoError(t, err) + require.Equal(t, model.JobStateSynced, historyJob.State) + if isAdd { + require.Equal(t, model.StatePublic, historyJob.SchemaState) + } else { + require.Equal(t, model.StateNone, historyJob.SchemaState) + } + + return nil + }) + require.NoError(t, err) +} + +func testCheckJobCancelledT(t *testing.T, d *ddl, job *model.Job, state *model.SchemaState) { + err := kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { + tt := meta.NewMeta(txn) + historyJob, err := tt.GetHistoryDDLJob(job.ID) + require.NoError(t, err) + require.True(t, historyJob.IsCancelled() || historyJob.IsRollbackDone(), "history job %s", historyJob) + if state != nil { + require.Equal(t, *state, historyJob.SchemaState) + } + return nil + }) + require.NoError(t, err) +} + func testCheckJobCancelled(c *C, d *ddl, job *model.Job, state *model.SchemaState) { err := kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) @@ -544,6 +603,23 @@ func doDDLJobErrWithSchemaState(ctx sessionctx.Context, d *ddl, c *C, schemaID, return job } +func doDDLJobErrWithSchemaStateT(ctx sessionctx.Context, d *ddl, t *testing.T, schemaID, tableID int64, tp model.ActionType, + args []interface{}, state *model.SchemaState) *model.Job { + job := &model.Job{ + SchemaID: schemaID, + TableID: tableID, + Type: tp, + Args: args, + BinlogInfo: &model.HistoryInfo{}, + } + err := d.doDDLJob(ctx, job) + // TODO: Add the detail error check. + require.Error(t, err, "err:%v", err) + testCheckJobCancelledT(t, d, job, state) + + return job +} + func doDDLJobSuccess(ctx sessionctx.Context, d *ddl, c *C, schemaID, tableID int64, tp model.ActionType, args []interface{}) { job := &model.Job{ @@ -562,6 +638,11 @@ func doDDLJobErr(c *C, schemaID, tableID int64, tp model.ActionType, args []inte return doDDLJobErrWithSchemaState(ctx, d, c, schemaID, tableID, tp, args, nil) } +func doDDLJobErrT(t *testing.T, schemaID, tableID int64, tp model.ActionType, args []interface{}, + ctx sessionctx.Context, d *ddl) *model.Job { + return doDDLJobErrWithSchemaStateT(ctx, d, t, schemaID, tableID, tp, args, nil) +} + func checkCancelState(txn kv.Transaction, job *model.Job, test *testCancelJob) error { var checkErr error addIndexFirstReorg := (test.act == model.ActionAddIndex || test.act == model.ActionAddPrimaryKey) && diff --git a/ddl/restart_test.go b/ddl/restart_test.go index 617e073635c0f..e1cf6d15fee84 100644 --- a/ddl/restart_test.go +++ b/ddl/restart_test.go @@ -22,6 +22,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/util/mock" @@ -201,6 +202,38 @@ LOOP: } } +var _ = Suite(&testTableSuite{}) + +type testTableSuite struct { + store kv.Storage + dbInfo *model.DBInfo + + d *ddl +} + +func (s *testTableSuite) SetUpSuite(c *C) { + s.store = testCreateStore(c, "test_table") + ddl, err := testNewDDLAndStart( + context.Background(), + WithStore(s.store), + WithLease(testLease), + ) + c.Assert(err, IsNil) + s.d = ddl + + s.dbInfo, err = testSchemaInfo(s.d, "test_table") + c.Assert(err, IsNil) + testCreateSchema(c, testNewContext(s.d), s.d, s.dbInfo) +} + +func (s *testTableSuite) TearDownSuite(c *C) { + testDropSchema(c, testNewContext(s.d), s.d, s.dbInfo) + err := s.d.Stop() + c.Assert(err, IsNil) + err = s.store.Close() + c.Assert(err, IsNil) +} + func (s *testTableSuite) TestTableResume(c *C) { d := s.d diff --git a/ddl/schema_test.go b/ddl/schema_test.go index 37652f83efa0b..2ad14b417d8ec 100644 --- a/ddl/schema_test.go +++ b/ddl/schema_test.go @@ -16,6 +16,7 @@ package ddl import ( "context" + "testing" "time" . "github.com/pingcap/check" @@ -27,6 +28,7 @@ import ( "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/types" + "github.com/stretchr/testify/require" ) var _ = Suite(&testSchemaSuite{}) @@ -68,6 +70,23 @@ func testCreateSchema(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo return job } +func testCreateSchemaT(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo) *model.Job { + job := &model.Job{ + SchemaID: dbInfo.ID, + Type: model.ActionCreateSchema, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{dbInfo}, + } + err := d.doDDLJob(ctx, job) + require.NoError(t, err) + + v := getSchemaVerT(t, ctx) + dbInfo.State = model.StatePublic + checkHistoryJobArgsT(t, ctx, job.ID, &historyJobArgs{ver: v, db: dbInfo}) + dbInfo.State = model.StateNone + return job +} + func buildDropSchemaJob(dbInfo *model.DBInfo) *model.Job { return &model.Job{ SchemaID: dbInfo.ID, @@ -84,6 +103,14 @@ func testDropSchema(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo) return job, ver } +func testDropSchemaT(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo) (*model.Job, int64) { + job := buildDropSchemaJob(dbInfo) + err := d.doDDLJob(ctx, job) + require.NoError(t, err) + ver := getSchemaVerT(t, ctx) + return job, ver +} + func isDDLJobDone(c *C, t *meta.Meta) bool { job, err := t.GetDDLJobByIdx(0) c.Assert(err, IsNil) diff --git a/ddl/table_test.go b/ddl/table_test.go index 7c1a69a407e22..f366c090b1686 100644 --- a/ddl/table_test.go +++ b/ddl/table_test.go @@ -15,163 +15,22 @@ package ddl import ( - "bytes" "context" - "fmt" + "testing" - . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/meta/autoid" - "github.com/pingcap/tidb/parser/auth" "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" + "github.com/stretchr/testify/require" ) -var _ = Suite(&testTableSuite{}) - -type testTableSuite struct { - store kv.Storage - dbInfo *model.DBInfo - - d *ddl -} - -func testTableInfoWith2IndexOnFirstColumn(c *C, d *ddl, name string, num int) *model.TableInfo { - normalInfo, err := testTableInfo(d, name, num) - c.Assert(err, IsNil) - idxs := make([]*model.IndexInfo, 0, 2) - for i := range idxs { - idx := &model.IndexInfo{ - Name: model.NewCIStr(fmt.Sprintf("i%d", i+1)), - State: model.StatePublic, - Columns: []*model.IndexColumn{{Name: model.NewCIStr("c1")}}, - } - idxs = append(idxs, idx) - } - normalInfo.Indices = idxs - normalInfo.Columns[0].FieldType.Flen = 11 - return normalInfo -} - -// testTableInfo creates a test table with num int columns and with no index. -func testTableInfo(d *ddl, name string, num int) (*model.TableInfo, error) { - tblInfo := &model.TableInfo{ - Name: model.NewCIStr(name), - } - genIDs, err := d.genGlobalIDs(1) - - if err != nil { - return nil, err - } - tblInfo.ID = genIDs[0] - - cols := make([]*model.ColumnInfo, num) - for i := range cols { - col := &model.ColumnInfo{ - Name: model.NewCIStr(fmt.Sprintf("c%d", i+1)), - Offset: i, - DefaultValue: i + 1, - State: model.StatePublic, - } - - col.FieldType = *types.NewFieldType(mysql.TypeLong) - col.ID = allocateColumnID(tblInfo) - cols[i] = col - } - tblInfo.Columns = cols - tblInfo.Charset = "utf8" - tblInfo.Collate = "utf8_bin" - return tblInfo, nil -} - -// testTableInfoWithPartition creates a test table with num int columns and with no index. -func testTableInfoWithPartition(c *C, d *ddl, name string, num int) *model.TableInfo { - tblInfo, err := testTableInfo(d, name, num) - c.Assert(err, IsNil) - genIDs, err := d.genGlobalIDs(1) - c.Assert(err, IsNil) - pid := genIDs[0] - tblInfo.Partition = &model.PartitionInfo{ - Type: model.PartitionTypeRange, - Expr: tblInfo.Columns[0].Name.L, - Enable: true, - Definitions: []model.PartitionDefinition{{ - ID: pid, - Name: model.NewCIStr("p0"), - LessThan: []string{"maxvalue"}, - }}, - } - - return tblInfo -} - -// testTableInfoWithPartitionLessThan creates a test table with num int columns and one partition specified with lessthan. -func testTableInfoWithPartitionLessThan(c *C, d *ddl, name string, num int, lessthan string) *model.TableInfo { - tblInfo := testTableInfoWithPartition(c, d, name, num) - tblInfo.Partition.Definitions[0].LessThan = []string{lessthan} - return tblInfo -} - -func testAddedNewTablePartitionInfo(c *C, d *ddl, tblInfo *model.TableInfo, partName, lessthan string) *model.PartitionInfo { - genIDs, err := d.genGlobalIDs(1) - c.Assert(err, IsNil) - pid := genIDs[0] - // the new added partition should change the partition state to state none at the beginning. - return &model.PartitionInfo{ - Type: model.PartitionTypeRange, - Expr: tblInfo.Columns[0].Name.L, - Enable: true, - Definitions: []model.PartitionDefinition{{ - ID: pid, - Name: model.NewCIStr(partName), - LessThan: []string{lessthan}, - }}, - } -} - -// testViewInfo creates a test view with num int columns. -func testViewInfo(c *C, d *ddl, name string, num int) *model.TableInfo { - tblInfo := &model.TableInfo{ - Name: model.NewCIStr(name), - } - genIDs, err := d.genGlobalIDs(1) - c.Assert(err, IsNil) - tblInfo.ID = genIDs[0] - - cols := make([]*model.ColumnInfo, num) - viewCols := make([]model.CIStr, num) - - var stmtBuffer bytes.Buffer - stmtBuffer.WriteString("SELECT ") - for i := range cols { - col := &model.ColumnInfo{ - Name: model.NewCIStr(fmt.Sprintf("c%d", i+1)), - Offset: i, - State: model.StatePublic, - } - - col.ID = allocateColumnID(tblInfo) - cols[i] = col - viewCols[i] = col.Name - stmtBuffer.WriteString(cols[i].Name.L + ",") - } - stmtBuffer.WriteString("1 FROM t") - - view := model.ViewInfo{Cols: viewCols, Security: model.SecurityDefiner, Algorithm: model.AlgorithmMerge, - SelectStmt: stmtBuffer.String(), CheckOption: model.CheckOptionCascaded, Definer: &auth.UserIdentity{CurrentUser: true}} - - tblInfo.View = &view - tblInfo.Columns = cols - - return tblInfo -} - -func testCreateTable(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) *model.Job { +func testCreateTableT(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) *model.Job { job := &model.Job{ SchemaID: dbInfo.ID, TableID: tblInfo.ID, @@ -180,36 +39,16 @@ func testCreateTable(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, Args: []interface{}{tblInfo}, } err := d.doDDLJob(ctx, job) - c.Assert(err, IsNil) - - v := getSchemaVer(c, ctx) - tblInfo.State = model.StatePublic - checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) - tblInfo.State = model.StateNone - return job -} - -func testCreateView(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) *model.Job { - job := &model.Job{ - SchemaID: dbInfo.ID, - TableID: tblInfo.ID, - Type: model.ActionCreateView, - BinlogInfo: &model.HistoryInfo{}, - Args: []interface{}{tblInfo, false, 0}, - } - - c.Assert(tblInfo.IsView(), IsTrue) - err := d.doDDLJob(ctx, job) - c.Assert(err, IsNil) + require.NoError(t, err) - v := getSchemaVer(c, ctx) + v := getSchemaVerT(t, ctx) tblInfo.State = model.StatePublic - checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) + checkHistoryJobArgsT(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) tblInfo.State = model.StateNone return job } -func testRenameTable(c *C, ctx sessionctx.Context, d *ddl, newSchemaID, oldSchemaID int64, oldSchemaName model.CIStr, tblInfo *model.TableInfo) *model.Job { +func testRenameTable(t *testing.T, ctx sessionctx.Context, d *ddl, newSchemaID, oldSchemaID int64, oldSchemaName model.CIStr, tblInfo *model.TableInfo) *model.Job { job := &model.Job{ SchemaID: newSchemaID, TableID: tblInfo.ID, @@ -218,16 +57,16 @@ func testRenameTable(c *C, ctx sessionctx.Context, d *ddl, newSchemaID, oldSchem Args: []interface{}{oldSchemaID, tblInfo.Name, oldSchemaName}, } err := d.doDDLJob(ctx, job) - c.Assert(err, IsNil) + require.NoError(t, err) - v := getSchemaVer(c, ctx) + v := getSchemaVerT(t, ctx) tblInfo.State = model.StatePublic - checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) + checkHistoryJobArgsT(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) tblInfo.State = model.StateNone return job } -func testLockTable(c *C, ctx sessionctx.Context, d *ddl, newSchemaID int64, tblInfo *model.TableInfo, lockTp model.TableLockType) *model.Job { +func testLockTable(t *testing.T, ctx sessionctx.Context, d *ddl, newSchemaID int64, tblInfo *model.TableInfo, lockTp model.TableLockType) *model.Job { arg := &lockTablesArg{ LockTables: []model.TableLockTpInfo{{SchemaID: newSchemaID, TableID: tblInfo.ID, Tp: lockTp}}, SessionInfo: model.SessionInfo{ @@ -243,32 +82,33 @@ func testLockTable(c *C, ctx sessionctx.Context, d *ddl, newSchemaID int64, tblI Args: []interface{}{arg}, } err := d.doDDLJob(ctx, job) - c.Assert(err, IsNil) + require.NoError(t, err) - v := getSchemaVer(c, ctx) - checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v}) + v := getSchemaVerT(t, ctx) + checkHistoryJobArgsT(t, ctx, job.ID, &historyJobArgs{ver: v}) return job } -func checkTableLockedTest(c *C, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, serverID string, sessionID uint64, lockTp model.TableLockType) { +func checkTableLockedTest(t *testing.T, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, serverID string, sessionID uint64, lockTp model.TableLockType) { err := kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { - t := meta.NewMeta(txn) - info, err := t.GetTable(dbInfo.ID, tblInfo.ID) - c.Assert(err, IsNil) - - c.Assert(info, NotNil) - c.Assert(info.Lock, NotNil) - c.Assert(len(info.Lock.Sessions) == 1, IsTrue) - c.Assert(info.Lock.Sessions[0].ServerID, Equals, serverID) - c.Assert(info.Lock.Sessions[0].SessionID, Equals, sessionID) - c.Assert(info.Lock.Tp, Equals, lockTp) - c.Assert(info.Lock.State, Equals, model.TableLockStatePublic) + tt := meta.NewMeta(txn) + info, err := tt.GetTable(dbInfo.ID, tblInfo.ID) + require.NoError(t, err) + + require.NotNil(t, info) + require.NotNil(t, info.Lock) + require.Len(t, info.Lock.Sessions, 1) + require.Equal(t, serverID, info.Lock.Sessions[0].ServerID) + require.Equal(t, sessionID, info.Lock.Sessions[0].SessionID) + require.Equal(t, lockTp, info.Lock.Tp) + require.Equal(t, lockTp, info.Lock.Tp) + require.Equal(t, model.TableLockStatePublic, info.Lock.State) return nil }) - c.Assert(err, IsNil) + require.NoError(t, err) } -func testDropTable(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) *model.Job { +func testDropTableT(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) *model.Job { job := &model.Job{ SchemaID: dbInfo.ID, TableID: tblInfo.ID, @@ -276,16 +116,16 @@ func testDropTable(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, t BinlogInfo: &model.HistoryInfo{}, } err := d.doDDLJob(ctx, job) - c.Assert(err, IsNil) + require.NoError(t, err) - v := getSchemaVer(c, ctx) - checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) + v := getSchemaVerT(t, ctx) + checkHistoryJobArgsT(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) return job } -func testTruncateTable(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) *model.Job { +func testTruncateTable(t *testing.T, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) *model.Job { genIDs, err := d.genGlobalIDs(1) - c.Assert(err, IsNil) + require.NoError(t, err) newTableID := genIDs[0] job := &model.Job{ SchemaID: dbInfo.ID, @@ -295,35 +135,35 @@ func testTruncateTable(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInf Args: []interface{}{newTableID}, } err = d.doDDLJob(ctx, job) - c.Assert(err, IsNil) + require.NoError(t, err) - v := getSchemaVer(c, ctx) + v := getSchemaVerT(t, ctx) tblInfo.ID = newTableID - checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) + checkHistoryJobArgsT(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) return job } -func testCheckTableState(c *C, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, state model.SchemaState) { +func testCheckTableStateT(t *testing.T, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, state model.SchemaState) { err := kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { - t := meta.NewMeta(txn) - info, err := t.GetTable(dbInfo.ID, tblInfo.ID) - c.Assert(err, IsNil) + tt := meta.NewMeta(txn) + info, err := tt.GetTable(dbInfo.ID, tblInfo.ID) + require.NoError(t, err) if state == model.StateNone { - c.Assert(info, IsNil) + require.Nil(t, info) return nil } - c.Assert(info.Name, DeepEquals, tblInfo.Name) - c.Assert(info.State, Equals, state) + require.EqualValues(t, tblInfo.Name, info.Name) + require.Equal(t, state, info.State) return nil }) - c.Assert(err, IsNil) + require.NoError(t, err) } -func testGetTable(c *C, d *ddl, schemaID int64, tableID int64) table.Table { +func testGetTableT(t *testing.T, d *ddl, schemaID int64, tableID int64) table.Table { tbl, err := testGetTableWithError(d, schemaID, tableID) - c.Assert(err, IsNil) + require.NoError(t, err) return tbl } @@ -352,116 +192,109 @@ func testGetTableWithError(d *ddl, schemaID, tableID int64) (table.Table, error) return tbl, nil } -func (s *testTableSuite) SetUpSuite(c *C) { - s.store = testCreateStore(c, "test_table") +func TestTable(t *testing.T) { + store, err := mockstore.NewMockStore() + require.NoError(t, err) ddl, err := testNewDDLAndStart( context.Background(), - WithStore(s.store), + WithStore(store), WithLease(testLease), ) - c.Assert(err, IsNil) - s.d = ddl - - s.dbInfo, err = testSchemaInfo(s.d, "test_table") - c.Assert(err, IsNil) - testCreateSchema(c, testNewContext(s.d), s.d, s.dbInfo) -} - -func (s *testTableSuite) TearDownSuite(c *C) { - testDropSchema(c, testNewContext(s.d), s.d, s.dbInfo) - err := s.d.Stop() - c.Assert(err, IsNil) - err = s.store.Close() - c.Assert(err, IsNil) -} + require.NoError(t, err) -func (s *testTableSuite) TestTable(c *C) { - d := s.d + dbInfo, err := testSchemaInfo(ddl, "test_table") + require.NoError(t, err) + testCreateSchemaT(t, testNewContext(ddl), ddl, dbInfo) - ctx := testNewContext(d) + ctx := testNewContext(ddl) - tblInfo, err := testTableInfo(d, "t", 3) - c.Assert(err, IsNil) - job := testCreateTable(c, ctx, d, s.dbInfo, tblInfo) - testCheckTableState(c, d, s.dbInfo, tblInfo, model.StatePublic) - testCheckJobDone(c, d, job, true) + tblInfo, err := testTableInfo(ddl, "t", 3) + require.NoError(t, err) + job := testCreateTableT(t, ctx, ddl, dbInfo, tblInfo) + testCheckTableStateT(t, ddl, dbInfo, tblInfo, model.StatePublic) + testCheckJobDoneT(t, ddl, job, true) // Create an existing table. - newTblInfo, err := testTableInfo(d, "t", 3) - c.Assert(err, IsNil) - doDDLJobErr(c, s.dbInfo.ID, newTblInfo.ID, model.ActionCreateTable, []interface{}{newTblInfo}, ctx, d) + newTblInfo, err := testTableInfo(ddl, "t", 3) + require.NoError(t, err) + doDDLJobErrT(t, dbInfo.ID, newTblInfo.ID, model.ActionCreateTable, []interface{}{newTblInfo}, ctx, ddl) count := 2000 - tbl := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID) + tbl := testGetTableT(t, ddl, dbInfo.ID, tblInfo.ID) for i := 1; i <= count; i++ { _, err := tbl.AddRecord(ctx, types.MakeDatums(i, i, i)) - c.Assert(err, IsNil) + require.NoError(t, err) } - job = testDropTable(c, ctx, d, s.dbInfo, tblInfo) - testCheckJobDone(c, d, job, false) + job = testDropTableT(t, ctx, ddl, dbInfo, tblInfo) + testCheckJobDoneT(t, ddl, job, false) // for truncate table - tblInfo, err = testTableInfo(d, "tt", 3) - c.Assert(err, IsNil) - job = testCreateTable(c, ctx, d, s.dbInfo, tblInfo) - testCheckTableState(c, d, s.dbInfo, tblInfo, model.StatePublic) - testCheckJobDone(c, d, job, true) - job = testTruncateTable(c, ctx, d, s.dbInfo, tblInfo) - testCheckTableState(c, d, s.dbInfo, tblInfo, model.StatePublic) - testCheckJobDone(c, d, job, true) + tblInfo, err = testTableInfo(ddl, "tt", 3) + require.NoError(t, err) + job = testCreateTableT(t, ctx, ddl, dbInfo, tblInfo) + testCheckTableStateT(t, ddl, dbInfo, tblInfo, model.StatePublic) + testCheckJobDoneT(t, ddl, job, true) + job = testTruncateTable(t, ctx, ddl, dbInfo, tblInfo) + testCheckTableStateT(t, ddl, dbInfo, tblInfo, model.StatePublic) + testCheckJobDoneT(t, ddl, job, true) // for rename table - dbInfo1, err := testSchemaInfo(s.d, "test_rename_table") - c.Assert(err, IsNil) - testCreateSchema(c, testNewContext(s.d), s.d, dbInfo1) - job = testRenameTable(c, ctx, d, dbInfo1.ID, s.dbInfo.ID, s.dbInfo.Name, tblInfo) - testCheckTableState(c, d, dbInfo1, tblInfo, model.StatePublic) - testCheckJobDone(c, d, job, true) - - job = testLockTable(c, ctx, d, dbInfo1.ID, tblInfo, model.TableLockWrite) - testCheckTableState(c, d, dbInfo1, tblInfo, model.StatePublic) - testCheckJobDone(c, d, job, true) - checkTableLockedTest(c, d, dbInfo1, tblInfo, d.GetID(), ctx.GetSessionVars().ConnectionID, model.TableLockWrite) + dbInfo1, err := testSchemaInfo(ddl, "test_rename_table") + require.NoError(t, err) + testCreateSchemaT(t, testNewContext(ddl), ddl, dbInfo1) + job = testRenameTable(t, ctx, ddl, dbInfo1.ID, dbInfo.ID, dbInfo.Name, tblInfo) + testCheckTableStateT(t, ddl, dbInfo1, tblInfo, model.StatePublic) + testCheckJobDoneT(t, ddl, job, true) + + job = testLockTable(t, ctx, ddl, dbInfo1.ID, tblInfo, model.TableLockWrite) + testCheckTableStateT(t, ddl, dbInfo1, tblInfo, model.StatePublic) + testCheckJobDoneT(t, ddl, job, true) + checkTableLockedTest(t, ddl, dbInfo1, tblInfo, ddl.GetID(), ctx.GetSessionVars().ConnectionID, model.TableLockWrite) // for alter cache table - job = testAlterCacheTable(c, ctx, d, dbInfo1.ID, tblInfo) - testCheckTableState(c, d, dbInfo1, tblInfo, model.StatePublic) - testCheckJobDone(c, d, job, true) - checkTableCacheTest(c, d, dbInfo1, tblInfo) + job = testAlterCacheTable(t, ctx, ddl, dbInfo1.ID, tblInfo) + testCheckTableStateT(t, ddl, dbInfo1, tblInfo, model.StatePublic) + testCheckJobDoneT(t, ddl, job, true) + checkTableCacheTest(t, ddl, dbInfo1, tblInfo) // for alter no cache table - job = testAlterNoCacheTable(c, ctx, d, dbInfo1.ID, tblInfo) - testCheckTableState(c, d, dbInfo1, tblInfo, model.StatePublic) - testCheckJobDone(c, d, job, true) - checkTableNoCacheTest(c, d, dbInfo1, tblInfo) + job = testAlterNoCacheTable(t, ctx, ddl, dbInfo1.ID, tblInfo) + testCheckTableStateT(t, ddl, dbInfo1, tblInfo, model.StatePublic) + testCheckJobDoneT(t, ddl, job, true) + checkTableNoCacheTest(t, ddl, dbInfo1, tblInfo) + + testDropSchemaT(t, testNewContext(ddl), ddl, dbInfo) + err = ddl.Stop() + require.NoError(t, err) + err = store.Close() + require.NoError(t, err) } -func checkTableCacheTest(c *C, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) { +func checkTableCacheTest(t *testing.T, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) { err := kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { - t := meta.NewMeta(txn) - info, err := t.GetTable(dbInfo.ID, tblInfo.ID) - c.Assert(err, IsNil) - c.Assert(info, NotNil) - c.Assert(info.TableCacheStatusType, NotNil) - c.Assert(info.TableCacheStatusType, Equals, model.TableCacheStatusEnable) + tt := meta.NewMeta(txn) + info, err := tt.GetTable(dbInfo.ID, tblInfo.ID) + require.NoError(t, err) + require.NotNil(t, info) + require.NotNil(t, info.TableCacheStatusType) + require.Equal(t, model.TableCacheStatusEnable, info.TableCacheStatusType) return nil }) - c.Assert(err, IsNil) + require.NoError(t, err) } -func checkTableNoCacheTest(c *C, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) { +func checkTableNoCacheTest(t *testing.T, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) { err := kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { - t := meta.NewMeta(txn) - info, err := t.GetTable(dbInfo.ID, tblInfo.ID) - c.Assert(err, IsNil) - c.Assert(info, NotNil) - c.Assert(info.TableCacheStatusType, Equals, model.TableCacheStatusDisable) + tt := meta.NewMeta(txn) + info, err := tt.GetTable(dbInfo.ID, tblInfo.ID) + require.NoError(t, err) + require.NotNil(t, info) + require.Equal(t, model.TableCacheStatusDisable, info.TableCacheStatusType) return nil }) - c.Assert(err, IsNil) + require.NoError(t, err) } -func testAlterCacheTable(c *C, ctx sessionctx.Context, d *ddl, newSchemaID int64, tblInfo *model.TableInfo) *model.Job { - +func testAlterCacheTable(t *testing.T, ctx sessionctx.Context, d *ddl, newSchemaID int64, tblInfo *model.TableInfo) *model.Job { job := &model.Job{ SchemaID: newSchemaID, TableID: tblInfo.ID, @@ -470,14 +303,14 @@ func testAlterCacheTable(c *C, ctx sessionctx.Context, d *ddl, newSchemaID int64 Args: []interface{}{}, } err := d.doDDLJob(ctx, job) - c.Assert(err, IsNil) + require.NoError(t, err) - v := getSchemaVer(c, ctx) - checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v}) + v := getSchemaVerT(t, ctx) + checkHistoryJobArgsT(t, ctx, job.ID, &historyJobArgs{ver: v}) return job } -func testAlterNoCacheTable(c *C, ctx sessionctx.Context, d *ddl, newSchemaID int64, tblInfo *model.TableInfo) *model.Job { +func testAlterNoCacheTable(t *testing.T, ctx sessionctx.Context, d *ddl, newSchemaID int64, tblInfo *model.TableInfo) *model.Job { job := &model.Job{ SchemaID: newSchemaID, @@ -487,28 +320,9 @@ func testAlterNoCacheTable(c *C, ctx sessionctx.Context, d *ddl, newSchemaID int Args: []interface{}{}, } err := d.doDDLJob(ctx, job) - c.Assert(err, IsNil) + require.NoError(t, err) - v := getSchemaVer(c, ctx) - checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v}) + v := getSchemaVerT(t, ctx) + checkHistoryJobArgsT(t, ctx, job.ID, &historyJobArgs{ver: v}) return job } - -// for drop indexes -func createTestTableForDropIndexes(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, name string, num int) *model.TableInfo { - tableInfo, err := testTableInfo(d, name, num) - c.Assert(err, IsNil) - var idxs []*model.IndexInfo - for i := 0; i < num; i++ { - idxName := model.NewCIStr(fmt.Sprintf("i%d", i+1)) - idx := &model.IndexInfo{ - Name: idxName, - State: model.StatePublic, - Columns: []*model.IndexColumn{{Name: model.NewCIStr(fmt.Sprintf("c%d", i+1))}}, - } - idxs = append(idxs, idx) - } - tableInfo.Indices = idxs - testCreateTable(c, ctx, d, dbInfo, tableInfo) - return tableInfo -} diff --git a/ddl/util_test.go b/ddl/util_test.go new file mode 100644 index 0000000000000..85baca2ffdd43 --- /dev/null +++ b/ddl/util_test.go @@ -0,0 +1,257 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "bytes" + "context" + "fmt" + + . "github.com/pingcap/check" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" + "github.com/pingcap/tidb/parser/auth" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/types" +) + +func testTableInfoWith2IndexOnFirstColumn(c *C, d *ddl, name string, num int) *model.TableInfo { + normalInfo, err := testTableInfo(d, name, num) + c.Assert(err, IsNil) + idxs := make([]*model.IndexInfo, 0, 2) + for i := range idxs { + idx := &model.IndexInfo{ + Name: model.NewCIStr(fmt.Sprintf("i%d", i+1)), + State: model.StatePublic, + Columns: []*model.IndexColumn{{Name: model.NewCIStr("c1")}}, + } + idxs = append(idxs, idx) + } + normalInfo.Indices = idxs + normalInfo.Columns[0].FieldType.Flen = 11 + return normalInfo +} + +// testTableInfo creates a test table with num int columns and with no index. +func testTableInfo(d *ddl, name string, num int) (*model.TableInfo, error) { + tblInfo := &model.TableInfo{ + Name: model.NewCIStr(name), + } + genIDs, err := d.genGlobalIDs(1) + + if err != nil { + return nil, err + } + tblInfo.ID = genIDs[0] + + cols := make([]*model.ColumnInfo, num) + for i := range cols { + col := &model.ColumnInfo{ + Name: model.NewCIStr(fmt.Sprintf("c%d", i+1)), + Offset: i, + DefaultValue: i + 1, + State: model.StatePublic, + } + + col.FieldType = *types.NewFieldType(mysql.TypeLong) + col.ID = allocateColumnID(tblInfo) + cols[i] = col + } + tblInfo.Columns = cols + tblInfo.Charset = "utf8" + tblInfo.Collate = "utf8_bin" + return tblInfo, nil +} + +// testTableInfoWithPartition creates a test table with num int columns and with no index. +func testTableInfoWithPartition(c *C, d *ddl, name string, num int) *model.TableInfo { + tblInfo, err := testTableInfo(d, name, num) + c.Assert(err, IsNil) + genIDs, err := d.genGlobalIDs(1) + c.Assert(err, IsNil) + pid := genIDs[0] + tblInfo.Partition = &model.PartitionInfo{ + Type: model.PartitionTypeRange, + Expr: tblInfo.Columns[0].Name.L, + Enable: true, + Definitions: []model.PartitionDefinition{{ + ID: pid, + Name: model.NewCIStr("p0"), + LessThan: []string{"maxvalue"}, + }}, + } + + return tblInfo +} + +// testTableInfoWithPartitionLessThan creates a test table with num int columns and one partition specified with lessthan. +func testTableInfoWithPartitionLessThan(c *C, d *ddl, name string, num int, lessthan string) *model.TableInfo { + tblInfo := testTableInfoWithPartition(c, d, name, num) + tblInfo.Partition.Definitions[0].LessThan = []string{lessthan} + return tblInfo +} + +func testAddedNewTablePartitionInfo(c *C, d *ddl, tblInfo *model.TableInfo, partName, lessthan string) *model.PartitionInfo { + genIDs, err := d.genGlobalIDs(1) + c.Assert(err, IsNil) + pid := genIDs[0] + // the new added partition should change the partition state to state none at the beginning. + return &model.PartitionInfo{ + Type: model.PartitionTypeRange, + Expr: tblInfo.Columns[0].Name.L, + Enable: true, + Definitions: []model.PartitionDefinition{{ + ID: pid, + Name: model.NewCIStr(partName), + LessThan: []string{lessthan}, + }}, + } +} + +// testViewInfo creates a test view with num int columns. +func testViewInfo(c *C, d *ddl, name string, num int) *model.TableInfo { + tblInfo := &model.TableInfo{ + Name: model.NewCIStr(name), + } + genIDs, err := d.genGlobalIDs(1) + c.Assert(err, IsNil) + tblInfo.ID = genIDs[0] + + cols := make([]*model.ColumnInfo, num) + viewCols := make([]model.CIStr, num) + + var stmtBuffer bytes.Buffer + stmtBuffer.WriteString("SELECT ") + for i := range cols { + col := &model.ColumnInfo{ + Name: model.NewCIStr(fmt.Sprintf("c%d", i+1)), + Offset: i, + State: model.StatePublic, + } + + col.ID = allocateColumnID(tblInfo) + cols[i] = col + viewCols[i] = col.Name + stmtBuffer.WriteString(cols[i].Name.L + ",") + } + stmtBuffer.WriteString("1 FROM t") + + view := model.ViewInfo{Cols: viewCols, Security: model.SecurityDefiner, Algorithm: model.AlgorithmMerge, + SelectStmt: stmtBuffer.String(), CheckOption: model.CheckOptionCascaded, Definer: &auth.UserIdentity{CurrentUser: true}} + + tblInfo.View = &view + tblInfo.Columns = cols + + return tblInfo +} + +func testCreateTable(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) *model.Job { + job := &model.Job{ + SchemaID: dbInfo.ID, + TableID: tblInfo.ID, + Type: model.ActionCreateTable, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{tblInfo}, + } + err := d.doDDLJob(ctx, job) + c.Assert(err, IsNil) + + v := getSchemaVer(c, ctx) + tblInfo.State = model.StatePublic + checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) + tblInfo.State = model.StateNone + return job +} + +func testCreateView(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) *model.Job { + job := &model.Job{ + SchemaID: dbInfo.ID, + TableID: tblInfo.ID, + Type: model.ActionCreateView, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{tblInfo, false, 0}, + } + + c.Assert(tblInfo.IsView(), IsTrue) + err := d.doDDLJob(ctx, job) + c.Assert(err, IsNil) + + v := getSchemaVer(c, ctx) + tblInfo.State = model.StatePublic + checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) + tblInfo.State = model.StateNone + return job +} + +func testDropTable(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) *model.Job { + job := &model.Job{ + SchemaID: dbInfo.ID, + TableID: tblInfo.ID, + Type: model.ActionDropTable, + BinlogInfo: &model.HistoryInfo{}, + } + err := d.doDDLJob(ctx, job) + c.Assert(err, IsNil) + + v := getSchemaVer(c, ctx) + checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) + return job +} + +func testCheckTableState(c *C, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, state model.SchemaState) { + err := kv.RunInNewTxn(context.Background(), d.store, false, func(ctx context.Context, txn kv.Transaction) error { + t := meta.NewMeta(txn) + info, err := t.GetTable(dbInfo.ID, tblInfo.ID) + c.Assert(err, IsNil) + + if state == model.StateNone { + c.Assert(info, IsNil) + return nil + } + + c.Assert(info.Name, DeepEquals, tblInfo.Name) + c.Assert(info.State, Equals, state) + return nil + }) + c.Assert(err, IsNil) +} + +func testGetTable(c *C, d *ddl, schemaID int64, tableID int64) table.Table { + tbl, err := testGetTableWithError(d, schemaID, tableID) + c.Assert(err, IsNil) + return tbl +} + +// for drop indexes +func createTestTableForDropIndexes(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, name string, num int) *model.TableInfo { + tableInfo, err := testTableInfo(d, name, num) + c.Assert(err, IsNil) + var idxs []*model.IndexInfo + for i := 0; i < num; i++ { + idxName := model.NewCIStr(fmt.Sprintf("i%d", i+1)) + idx := &model.IndexInfo{ + Name: idxName, + State: model.StatePublic, + Columns: []*model.IndexColumn{{Name: model.NewCIStr(fmt.Sprintf("c%d", i+1))}}, + } + idxs = append(idxs, idx) + } + tableInfo.Indices = idxs + testCreateTable(c, ctx, d, dbInfo, tableInfo) + return tableInfo +} diff --git a/server/plan_replayer.go b/server/plan_replayer.go index a363781f0b8c9..1d9a6a957f06e 100644 --- a/server/plan_replayer.go +++ b/server/plan_replayer.go @@ -16,7 +16,7 @@ package server import ( "fmt" - "io" + "io/ioutil" "net/http" "os" "path/filepath" @@ -69,15 +69,18 @@ func handleDownloadFile(handler downloadFileHandler, w http.ResponseWriter, req params := mux.Vars(req) name := params[pFileName] path := handler.filePath - if isExists(path) { - w.Header().Set("Content-Type", "application/zip") - w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s.zip\"", handler.downloadedFilename)) + exist, err := isExists(path) + if err != nil { + writeError(w, err) + return + } + if exist { file, err := os.Open(path) if err != nil { writeError(w, err) return } - _, err = io.Copy(w, file) + content, err := ioutil.ReadAll(file) if err != nil { writeError(w, err) return @@ -92,7 +95,13 @@ func handleDownloadFile(handler downloadFileHandler, w http.ResponseWriter, req writeError(w, err) return } - w.WriteHeader(http.StatusOK) + _, err = w.Write(content) + if err != nil { + writeError(w, err) + return + } + w.Header().Set("Content-Type", "application/zip") + w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s.zip\"", handler.downloadedFilename)) return } if handler.infoGetter == nil { @@ -126,10 +135,7 @@ func handleDownloadFile(handler downloadFileHandler, w http.ResponseWriter, req if resp.StatusCode != http.StatusOK { continue } - // find dump file in one remote tidb-server, return file directly - w.Header().Set("Content-Type", "application/zip") - w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s.zip\"", handler.downloadedFilename)) - _, err = io.Copy(w, resp.Body) + content, err := ioutil.ReadAll(resp.Body) if err != nil { writeError(w, err) return @@ -139,11 +145,18 @@ func handleDownloadFile(handler downloadFileHandler, w http.ResponseWriter, req writeError(w, err) return } - w.WriteHeader(http.StatusOK) + _, err = w.Write(content) + if err != nil { + writeError(w, err) + return + } + // find dump file in one remote tidb-server, return file directly + w.Header().Set("Content-Type", "application/zip") + w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s.zip\"", handler.downloadedFilename)) return } // we can't find dump file in any tidb-server, return 404 directly - logutil.BgLogger().Info("can't find dump file in any remote server", zap.String("filename", name)) + logutil.BgLogger().Error("can't find dump file in any remote server", zap.String("filename", name)) w.WriteHeader(http.StatusNotFound) } @@ -157,10 +170,13 @@ type downloadFileHandler struct { downloadedFilename string } -func isExists(path string) bool { +func isExists(path string) (bool, error) { _, err := os.Stat(path) - if err != nil && !os.IsExist(err) { - return false + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, err } - return true + return true, nil }