diff --git a/cmd/ddltest/ddl_test.go b/cmd/ddltest/ddl_test.go index 52009b10de142..36922638950a8 100644 --- a/cmd/ddltest/ddl_test.go +++ b/cmd/ddltest/ddl_test.go @@ -143,7 +143,7 @@ func (s *TestDDLSuite) SetUpSuite(c *C) { s.procs = make([]*server, *serverNum) // Set server restart retry count. - s.retryCount = 5 + s.retryCount = 20 createLogFiles(c, *serverNum) err = s.startServers() diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 528a6087638fc..a3f8bb7f9c622 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -5895,8 +5895,8 @@ func buildPlacementSpecReplicasAndConstraint(replicas uint64, cnstr string) ([]* } rules = append(rules, &placement.Rule{ - Count: int(replicas), - LabelConstraints: labelConstraints, + Count: int(replicas), + Constraints: labelConstraints, }) return rules, nil @@ -5925,8 +5925,8 @@ func buildPlacementSpecReplicasAndConstraint(replicas uint64, cnstr string) ([]* } rules = append(rules, &placement.Rule{ - Count: cnt, - LabelConstraints: labelConstraints, + Count: cnt, + Constraints: labelConstraints, }) } @@ -6051,14 +6051,14 @@ func (d *ddl) AlterTableAlterPartition(ctx sessionctx.Context, ident ast.Ident, newRules := bundle.Rules[:0] for i, rule := range bundle.Rules { // merge all empty constraints - if len(rule.LabelConstraints) == 0 { + if len(rule.Constraints) == 0 { extraCnt[rule.Role] += rule.Count continue } // refer to tidb#22065. // add -engine=tiflash to every rule to avoid schedules to tiflash instances. // placement rules in SQL is not compatible with `set tiflash replica` yet - if err := rule.LabelConstraints.Add(placement.Constraint{ + if err := rule.Constraints.Add(placement.Constraint{ Op: placement.NotIn, Key: placement.EngineLabelKey, Values: []string{placement.EngineLabelTiFlash}, @@ -6083,7 +6083,7 @@ func (d *ddl) AlterTableAlterPartition(ctx sessionctx.Context, ident ast.Ident, Count: cnt, StartKeyHex: startKey, EndKeyHex: endKey, - LabelConstraints: []placement.Constraint{{ + Constraints: []placement.Constraint{{ Op: placement.NotIn, Key: placement.EngineLabelKey, Values: []string{placement.EngineLabelTiFlash}, diff --git a/ddl/placement/errors.go b/ddl/placement/errors.go index 19797022a609c..95fce4591c961 100644 --- a/ddl/placement/errors.go +++ b/ddl/placement/errors.go @@ -24,4 +24,10 @@ var ( ErrUnsupportedConstraint = errors.New("unsupported label constraint") // ErrConflictingConstraints is from constraints.go. ErrConflictingConstraints = errors.New("conflicting label constraints") + // ErrInvalidConstraintsMapcnt is from rule.go. + ErrInvalidConstraintsMapcnt = errors.New("label constraints in map syntax have invalid replicas") + // ErrInvalidConstraintsFormat is from rule.go. + ErrInvalidConstraintsFormat = errors.New("invalid label constraints format") + // ErrInvalidConstraintsRelicas is from rule.go. + ErrInvalidConstraintsRelicas = errors.New("label constraints with invalid REPLICAS") ) diff --git a/ddl/placement/rule.go b/ddl/placement/rule.go new file mode 100644 index 0000000000000..134bdd5a610f9 --- /dev/null +++ b/ddl/placement/rule.go @@ -0,0 +1,132 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package placement + +import ( + "fmt" + "strings" + + "github.com/go-yaml/yaml" +) + +// PeerRoleType is the expected peer type of the placement rule. +type PeerRoleType string + +const ( + // Voter can either match a leader peer or follower peer. + Voter PeerRoleType = "voter" + // Leader matches a leader. + Leader PeerRoleType = "leader" + // Follower matches a follower. + Follower PeerRoleType = "follower" + // Learner matches a learner. + Learner PeerRoleType = "learner" +) + +// Rule is the core placement rule struct. Check https://github.com/tikv/pd/blob/master/server/schedule/placement/rule.go. +type Rule struct { + GroupID string `json:"group_id"` + ID string `json:"id"` + Index int `json:"index,omitempty"` + Override bool `json:"override,omitempty"` + StartKeyHex string `json:"start_key"` + EndKeyHex string `json:"end_key"` + Role PeerRoleType `json:"role"` + Count int `json:"count"` + Constraints Constraints `json:"label_constraints,omitempty"` + LocationLabels []string `json:"location_labels,omitempty"` + IsolationLevel string `json:"isolation_level,omitempty"` +} + +// NewRules constructs []*Rule from a yaml-compatible representation of +// array or map of constraints. It converts 'CONSTRAINTS' field in RFC +// https://github.com/pingcap/tidb/blob/master/docs/design/2020-06-24-placement-rules-in-sql.md to structs. +func NewRules(replicas uint64, cnstr string) ([]*Rule, error) { + rules := []*Rule{} + + cnstbytes := []byte(cnstr) + + constraints1 := []string{} + err1 := yaml.UnmarshalStrict(cnstbytes, &constraints1) + if err1 == nil { + // can not emit REPLICAS with an array or empty label + if replicas == 0 { + return rules, fmt.Errorf("%w: should be positive", ErrInvalidConstraintsRelicas) + } + + labelConstraints, err := NewConstraints(constraints1) + if err != nil { + return rules, err + } + + rules = append(rules, &Rule{ + Count: int(replicas), + Constraints: labelConstraints, + }) + + return rules, nil + } + + constraints2 := map[string]int{} + err2 := yaml.UnmarshalStrict(cnstbytes, &constraints2) + if err2 == nil { + ruleCnt := 0 + for labels, cnt := range constraints2 { + if cnt <= 0 { + return rules, fmt.Errorf("%w: count of labels '%s' should be positive, but got %d", ErrInvalidConstraintsMapcnt, labels, cnt) + } + ruleCnt += cnt + } + + if replicas == 0 { + replicas = uint64(ruleCnt) + } + + if int(replicas) < ruleCnt { + return rules, fmt.Errorf("%w: should be larger or equal to the number of total replicas, but REPLICAS=%d < total=%d", ErrInvalidConstraintsRelicas, replicas, ruleCnt) + } + + for labels, cnt := range constraints2 { + labelConstraints, err := NewConstraints(strings.Split(labels, ",")) + if err != nil { + return rules, err + } + + rules = append(rules, &Rule{ + Count: cnt, + Constraints: labelConstraints, + }) + } + + remain := int(replicas) - ruleCnt + if remain > 0 { + rules = append(rules, &Rule{ + Count: remain, + }) + } + + return rules, nil + } + + return nil, fmt.Errorf("%w: should be [constraint1, ...] (error %s), {constraint1: cnt1, ...} (error %s), or any yaml compatible representation", ErrInvalidConstraintsFormat, err1, err2) +} + +// Clone is used to duplicate a RuleOp for safe modification. +// Note that it is a shallow copy: LocationLabels and Constraints +// is not cloned. +func (r *Rule) Clone() *Rule { + n := &Rule{} + *n = *r + return n +} diff --git a/ddl/placement/rule_test.go b/ddl/placement/rule_test.go new file mode 100644 index 0000000000000..85dd492f348e7 --- /dev/null +++ b/ddl/placement/rule_test.go @@ -0,0 +1,206 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package placement + +import ( + "encoding/json" + "errors" + + . "github.com/pingcap/check" +) + +var _ = Suite(&testRuleSuite{}) + +type testRuleSuite struct{} + +func (t *testRuleSuite) TestClone(c *C) { + rule := &Rule{ID: "434"} + newRule := rule.Clone() + newRule.ID = "121" + + c.Assert(rule, DeepEquals, &Rule{ID: "434"}) + c.Assert(newRule, DeepEquals, &Rule{ID: "121"}) +} + +func matchRule(r1 *Rule, t2 []*Rule) bool { + for _, r2 := range t2 { + if ok, _ := DeepEquals.Check([]interface{}{r1, r2}, nil); ok { + return true + } + } + return false +} + +func matchRules(t1, t2 []*Rule, prefix string, c *C) { + expected, err := json.Marshal(t1) + c.Assert(err, IsNil) + got, err := json.Marshal(t2) + c.Assert(err, IsNil) + comment := Commentf("%s, expected %s\nbut got %s", prefix, expected, got) + c.Assert(len(t1), Equals, len(t2), comment) + for _, r1 := range t1 { + c.Assert(matchRule(r1, t2), IsTrue, comment) + } +} + +func (t *testRuleSuite) TestNewRules(c *C) { + type TestCase struct { + name string + input string + replicas uint64 + output []*Rule + err error + } + tests := []TestCase{} + + tests = append(tests, TestCase{ + name: "empty constraints", + input: "", + replicas: 3, + output: []*Rule{ + { + Count: 3, + Constraints: Constraints{}, + }, + }, + }) + + tests = append(tests, TestCase{ + name: "zero replicas", + input: "", + replicas: 0, + err: ErrInvalidConstraintsRelicas, + }) + + labels, err := NewConstraints([]string{"+zone=sh", "+zone=sh"}) + c.Assert(err, IsNil) + tests = append(tests, TestCase{ + name: "normal array constraints", + input: `["+zone=sh", "+zone=sh"]`, + replicas: 3, + output: []*Rule{ + { + Count: 3, + Constraints: labels, + }, + }, + }) + + labels1, err := NewConstraints([]string{"+zone=sh", "-zone=bj"}) + c.Assert(err, IsNil) + labels2, err := NewConstraints([]string{"+zone=sh"}) + c.Assert(err, IsNil) + tests = append(tests, TestCase{ + name: "normal object constraints", + input: `{"+zone=sh,-zone=bj":2, "+zone=sh": 1}`, + replicas: 3, + output: []*Rule{ + { + Count: 2, + Constraints: labels1, + }, + { + Count: 1, + Constraints: labels2, + }, + }, + }) + + tests = append(tests, TestCase{ + name: "normal object constraints, with extra count", + input: "{'+zone=sh,-zone=bj':2, '+zone=sh': 1}", + replicas: 4, + output: []*Rule{ + { + Count: 2, + Constraints: labels1, + }, + { + Count: 1, + Constraints: labels2, + }, + { + Count: 1, + }, + }, + }) + + tests = append(tests, TestCase{ + name: "normal object constraints, without count", + input: "{'+zone=sh,-zone=bj':2, '+zone=sh': 1}", + output: []*Rule{ + { + Count: 2, + Constraints: labels1, + }, + { + Count: 1, + Constraints: labels2, + }, + }, + }) + + tests = append(tests, TestCase{ + name: "zero count in object constraints", + input: `{"+zone=sh,-zone=bj":0, "+zone=sh": 1}`, + replicas: 3, + err: ErrInvalidConstraintsMapcnt, + }) + + tests = append(tests, TestCase{ + name: "overlarge total count in object constraints", + input: `{"+ne=sh,-zone=bj":1, "+zone=sh": 4}`, + replicas: 3, + err: ErrInvalidConstraintsRelicas, + }) + + tests = append(tests, TestCase{ + name: "invalid array", + input: `["+ne=sh", "+zone=sh"`, + replicas: 3, + err: ErrInvalidConstraintsFormat, + }) + + tests = append(tests, TestCase{ + name: "invalid array constraints", + input: `["ne=sh", "+zone=sh"]`, + replicas: 3, + err: ErrInvalidConstraintFormat, + }) + + tests = append(tests, TestCase{ + name: "invalid map", + input: `{+ne=sh,-zone=bj:1, "+zone=sh": 4`, + replicas: 5, + err: ErrInvalidConstraintsFormat, + }) + + tests = append(tests, TestCase{ + name: "invalid map constraints", + input: `{"nesh,-zone=bj":1, "+zone=sh": 4}`, + replicas: 6, + err: ErrInvalidConstraintFormat, + }) + + for _, t := range tests { + comment := Commentf("%s", t.name) + output, err := NewRules(t.replicas, t.input) + if t.err == nil { + c.Assert(err, IsNil, comment) + matchRules(t.output, output, comment.CheckCommentString(), c) + } else { + c.Assert(errors.Is(err, t.err), IsTrue, comment) + } + } +} diff --git a/ddl/placement/types.go b/ddl/placement/types.go index 3bb9da96e3890..72093a2c19c78 100644 --- a/ddl/placement/types.go +++ b/ddl/placement/types.go @@ -22,42 +22,6 @@ import ( // After all, placement rules are communicated using an HTTP API. Loose // coupling is a good feature. -// PeerRoleType is the expected peer type of the placement rule. -type PeerRoleType string - -const ( - // Voter can either match a leader peer or follower peer. - Voter PeerRoleType = "voter" - // Leader matches a leader. - Leader PeerRoleType = "leader" - // Follower matches a follower. - Follower PeerRoleType = "follower" - // Learner matches a learner. - Learner PeerRoleType = "learner" -) - -// Rule is the placement rule. Check https://github.com/tikv/pd/blob/master/server/schedule/placement/rule.go. -type Rule struct { - GroupID string `json:"group_id"` - ID string `json:"id"` - Index int `json:"index,omitempty"` - Override bool `json:"override,omitempty"` - StartKeyHex string `json:"start_key"` - EndKeyHex string `json:"end_key"` - Role PeerRoleType `json:"role"` - Count int `json:"count"` - LabelConstraints Constraints `json:"label_constraints,omitempty"` - LocationLabels []string `json:"location_labels,omitempty"` - IsolationLevel string `json:"isolation_level,omitempty"` -} - -// Clone is used to duplicate a RuleOp for safe modification. -func (r *Rule) Clone() *Rule { - n := &Rule{} - *n = *r - return n -} - // Bundle is a group of all rules and configurations. It is used to support rule cache. type Bundle struct { ID string `json:"group_id"` diff --git a/ddl/placement/types_test.go b/ddl/placement/types_test.go index 77153cb29b692..93ed1a5a80f43 100644 --- a/ddl/placement/types_test.go +++ b/ddl/placement/types_test.go @@ -18,7 +18,6 @@ import ( ) var _ = Suite(&testBundleSuite{}) -var _ = Suite(&testRuleSuite{}) type testBundleSuite struct{} @@ -49,14 +48,3 @@ func (t *testBundleSuite) TestClone(c *C) { c.Assert(bundle, DeepEquals, &Bundle{ID: GroupID(1), Rules: []*Rule{{ID: "434"}}}) c.Assert(newBundle, DeepEquals, &Bundle{ID: GroupID(2), Rules: []*Rule{{ID: "121"}}}) } - -type testRuleSuite struct{} - -func (t *testRuleSuite) TestClone(c *C) { - rule := &Rule{ID: "434"} - newRule := rule.Clone() - newRule.ID = "121" - - c.Assert(rule, DeepEquals, &Rule{ID: "434"}) - c.Assert(newRule, DeepEquals, &Rule{ID: "121"}) -} diff --git a/ddl/placement/utils.go b/ddl/placement/utils.go index 16c0a424dde53..5b12f10e2d243 100644 --- a/ddl/placement/utils.go +++ b/ddl/placement/utils.go @@ -61,7 +61,7 @@ func BuildPlacementCopyBundle(oldBundle *Bundle, newID int64) *Bundle { func GetLeaderDCByBundle(bundle *Bundle, dcLabelKey string) (string, bool) { for _, rule := range bundle.Rules { if isValidLeaderRule(rule, dcLabelKey) { - return rule.LabelConstraints[0].Values[0], true + return rule.Constraints[0].Values[0], true } } return "", false @@ -69,7 +69,7 @@ func GetLeaderDCByBundle(bundle *Bundle, dcLabelKey string) (string, bool) { func isValidLeaderRule(rule *Rule, dcLabelKey string) bool { if rule.Role == Leader && rule.Count == 1 { - for _, con := range rule.LabelConstraints { + for _, con := range rule.Constraints { if con.Op == In && con.Key == dcLabelKey && len(con.Values) == 1 { return true } diff --git a/ddl/placement/utils_test.go b/ddl/placement/utils_test.go index 964382846485e..10941e0663455 100644 --- a/ddl/placement/utils_test.go +++ b/ddl/placement/utils_test.go @@ -58,7 +58,7 @@ func (t *testUtilsSuite) TestGetLeaderDCByBundle(c *C) { { ID: "12", Role: Leader, - LabelConstraints: []Constraint{ + Constraints: []Constraint{ { Key: "zone", Op: In, @@ -84,7 +84,7 @@ func (t *testUtilsSuite) TestGetLeaderDCByBundle(c *C) { { ID: "12", Role: Voter, - LabelConstraints: []Constraint{ + Constraints: []Constraint{ { Key: "zone", Op: In, @@ -110,7 +110,7 @@ func (t *testUtilsSuite) TestGetLeaderDCByBundle(c *C) { { ID: "11", Role: Leader, - LabelConstraints: []Constraint{ + Constraints: []Constraint{ { Key: "zone", Op: In, @@ -127,7 +127,7 @@ func (t *testUtilsSuite) TestGetLeaderDCByBundle(c *C) { { ID: "12", Role: Voter, - LabelConstraints: []Constraint{ + Constraints: []Constraint{ { Key: "zone", Op: In, @@ -153,7 +153,7 @@ func (t *testUtilsSuite) TestGetLeaderDCByBundle(c *C) { { ID: "11", Role: Leader, - LabelConstraints: []Constraint{ + Constraints: []Constraint{ { Key: "fake", Op: In, @@ -179,7 +179,7 @@ func (t *testUtilsSuite) TestGetLeaderDCByBundle(c *C) { { ID: "11", Role: Leader, - LabelConstraints: []Constraint{ + Constraints: []Constraint{ { Key: "zone", Op: NotIn, @@ -205,7 +205,7 @@ func (t *testUtilsSuite) TestGetLeaderDCByBundle(c *C) { { ID: "11", Role: Leader, - LabelConstraints: []Constraint{ + Constraints: []Constraint{ { Key: "zone", Op: In, diff --git a/ddl/placement_rule_test.go b/ddl/placement_rule_test.go index b051092a776e9..a9a916cb5a199 100644 --- a/ddl/placement_rule_test.go +++ b/ddl/placement_rule_test.go @@ -52,7 +52,7 @@ func (s *testPlacementSuite) TestPlacementBuild(c *C) { { Role: placement.Voter, Count: 3, - LabelConstraints: []placement.Constraint{ + Constraints: []placement.Constraint{ {Key: "zone", Op: "in", Values: []string{"sh"}}, }, }, @@ -67,9 +67,9 @@ func (s *testPlacementSuite) TestPlacementBuild(c *C) { Constraints: "", }}, output: []*placement.Rule{{ - Role: placement.Voter, - Count: 3, - LabelConstraints: []placement.Constraint{}, + Role: placement.Voter, + Count: 3, + Constraints: []placement.Constraint{}, }}, }, @@ -83,14 +83,14 @@ func (s *testPlacementSuite) TestPlacementBuild(c *C) { { Role: placement.Voter, Count: 1, - LabelConstraints: []placement.Constraint{ + Constraints: []placement.Constraint{ {Key: "zone", Op: "in", Values: []string{"sh"}}, }, }, { Role: placement.Voter, Count: 2, - LabelConstraints: []placement.Constraint{ + Constraints: []placement.Constraint{ {Key: "zone", Op: "in", Values: []string{"sh"}}, }, }, @@ -108,7 +108,7 @@ func (s *testPlacementSuite) TestPlacementBuild(c *C) { { Role: placement.Voter, Count: 3, - LabelConstraints: []placement.Constraint{ + Constraints: []placement.Constraint{ {Key: "zone", Op: "notIn", Values: []string{"sh"}}, {Key: "zone", Op: "notIn", Values: []string{"bj"}}, }, @@ -127,7 +127,7 @@ func (s *testPlacementSuite) TestPlacementBuild(c *C) { { Role: placement.Voter, Count: 3, - LabelConstraints: []placement.Constraint{ + Constraints: []placement.Constraint{ {Key: "zone", Op: "in", Values: []string{"sh"}}, {Key: "zone", Op: "notIn", Values: []string{"bj"}}, }, @@ -154,7 +154,7 @@ func (s *testPlacementSuite) TestPlacementBuild(c *C) { { Role: placement.Voter, Count: 3, - LabelConstraints: []placement.Constraint{ + Constraints: []placement.Constraint{ {Key: "zone", Op: "in", Values: []string{"sh"}}, {Key: "zone", Op: "notIn", Values: []string{"bj"}}, }, @@ -162,7 +162,7 @@ func (s *testPlacementSuite) TestPlacementBuild(c *C) { { Role: placement.Follower, Count: 2, - LabelConstraints: []placement.Constraint{ + Constraints: []placement.Constraint{ {Key: "zone", Op: "notIn", Values: []string{"sh"}}, {Key: "zone", Op: "in", Values: []string{"bj"}}, }, @@ -189,7 +189,7 @@ func (s *testPlacementSuite) TestPlacementBuild(c *C) { { Role: placement.Voter, Count: 2, - LabelConstraints: []placement.Constraint{ + Constraints: []placement.Constraint{ {Key: "zone", Op: "notIn", Values: []string{"sh"}}, {Key: "zone", Op: "in", Values: []string{"bj"}}, }, @@ -214,14 +214,14 @@ func (s *testPlacementSuite) TestPlacementBuild(c *C) { }, output: []*placement.Rule{ { - Role: placement.Voter, - Count: 1, - LabelConstraints: []placement.Constraint{{Key: "zone", Op: "notIn", Values: []string{"sh"}}}, + Role: placement.Voter, + Count: 1, + Constraints: []placement.Constraint{{Key: "zone", Op: "notIn", Values: []string{"sh"}}}, }, { - Role: placement.Voter, - Count: 1, - LabelConstraints: []placement.Constraint{{Key: "zone", Op: "in", Values: []string{"bj"}}}, + Role: placement.Voter, + Count: 1, + Constraints: []placement.Constraint{{Key: "zone", Op: "in", Values: []string{"bj"}}}, }, { Role: placement.Voter, @@ -306,7 +306,7 @@ func (s *testPlacementSuite) TestPlacementBuild(c *C) { { Role: placement.Voter, Count: 3, - LabelConstraints: []placement.Constraint{ + Constraints: []placement.Constraint{ {Key: "zone", Op: "in", Values: []string{"sh"}}, {Key: "zone", Op: "notIn", Values: []string{"bj"}}, }, diff --git a/ddl/placement_sql_test.go b/ddl/placement_sql_test.go index e77b0ba99d5cf..fb7158681714f 100644 --- a/ddl/placement_sql_test.go +++ b/ddl/placement_sql_test.go @@ -404,7 +404,7 @@ PARTITION BY RANGE (c) ( GroupID: groupID, Role: placement.Leader, Count: 1, - LabelConstraints: []placement.Constraint{ + Constraints: []placement.Constraint{ { Key: placement.DCLabelKey, Op: placement.In, @@ -423,7 +423,7 @@ PARTITION BY RANGE (c) ( GroupID: groupID, Role: placement.Follower, Count: 3, - LabelConstraints: []placement.Constraint{ + Constraints: []placement.Constraint{ { Key: placement.DCLabelKey, Op: placement.In, @@ -619,7 +619,7 @@ PARTITION BY RANGE (c) ( GroupID: groupID, Role: placement.Leader, Count: 1, - LabelConstraints: []placement.Constraint{ + Constraints: []placement.Constraint{ { Key: placement.DCLabelKey, Op: placement.In, diff --git a/executor/executor_test.go b/executor/executor_test.go index 65af164174e6f..d67bb6b48b8f4 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -350,7 +350,14 @@ func (s *testSuiteP1) TestShow(c *C) { "Trigger Tables To use triggers", "Create tablespace Server Admin To create/alter/drop tablespaces", "Update Tables To update existing rows", - "Usage Server Admin No privileges - allow connect only")) + "Usage Server Admin No privileges - allow connect only", + "BACKUP_ADMIN Server Admin ", + "SYSTEM_VARIABLES_ADMIN Server Admin ", + "ROLE_ADMIN Server Admin ", + "CONNECTION_ADMIN Server Admin ", + "RESTRICTED_TABLES_ADMIN Server Admin ", + "RESTRICTED_STATUS_ADMIN Server Admin ", + )) c.Assert(len(tk.MustQuery("show table status").Rows()), Equals, 1) } diff --git a/executor/infoschema_reader.go b/executor/infoschema_reader.go index 4d56cc55accac..4f788a3d7bd1d 100644 --- a/executor/infoschema_reader.go +++ b/executor/infoschema_reader.go @@ -1915,7 +1915,7 @@ func (e *memtableRetriever) setDataForPlacementPolicy(ctx sessionctx.Context) er continue } for _, rule := range bundle.Rules { - constraint, err := rule.LabelConstraints.Restore() + constraint, err := rule.Constraints.Restore() if err != nil { return errors.Wrapf(err, "Restore rule %s in bundle %s failed", rule.ID, bundle.ID) } diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index ab5a19ca823e9..df0c52cfb55cc 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -227,6 +227,85 @@ func (s *partitionTableSuite) TestPartitionInfoDisable(c *C) { tk.MustQuery("select * from t_info_null where (date = '2020-10-02' or date = '2020-10-06') and app = 'xxx' and media = '19003006'").Check(testkit.Rows()) } +func (s *partitionTableSuite) TestOrderByandLimit(c *C) { + if israce.RaceEnabled { + c.Skip("exhaustive types test, skip race test") + } + + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create database test_orderby_limit") + tk.MustExec("use test_orderby_limit") + tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") + + // range partition table + tk.MustExec(`create table trange(a int, b int, index idx_a(a)) partition by range(a) ( + partition p0 values less than(300), + partition p1 values less than (500), + partition p2 values less than(1100));`) + + // hash partition table + tk.MustExec("create table thash(a int, b int, index idx_a(a), index idx_b(b)) partition by hash(a) partitions 4;") + + // regular table + tk.MustExec("create table tregular(a int, b int, index idx_a(a))") + + // generate some random data to be inserted + vals := make([]string, 0, 2000) + for i := 0; i < 2000; i++ { + vals = append(vals, fmt.Sprintf("(%v, %v)", rand.Intn(1100), rand.Intn(2000))) + } + tk.MustExec("insert into trange values " + strings.Join(vals, ",")) + tk.MustExec("insert into thash values " + strings.Join(vals, ",")) + tk.MustExec("insert into tregular values " + strings.Join(vals, ",")) + + // test indexLookUp + for i := 0; i < 100; i++ { + // explain select * from t where a > {y} use index(idx_a) order by a limit {x}; // check if IndexLookUp is used + // select * from t where a > {y} use index(idx_a) order by a limit {x}; // it can return the correct result + x := rand.Intn(1099) + y := rand.Intn(2000) + 1 + queryPartition := fmt.Sprintf("select * from trange use index(idx_a) where a > %v order by a, b limit %v;", x, y) + queryRegular := fmt.Sprintf("select * from tregular use index(idx_a) where a > %v order by a, b limit %v;", x, y) + c.Assert(tk.HasPlan(queryPartition, "IndexLookUp"), IsTrue) // check if IndexLookUp is used + tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) + } + + // test tableReader + for i := 0; i < 100; i++ { + // explain select * from t where a > {y} ignore index(idx_a) order by a limit {x}; // check if IndexLookUp is used + // select * from t where a > {y} ignore index(idx_a) order by a limit {x}; // it can return the correct result + x := rand.Intn(1099) + y := rand.Intn(2000) + 1 + queryPartition := fmt.Sprintf("select * from trange ignore index(idx_a) where a > %v order by a, b limit %v;", x, y) + queryRegular := fmt.Sprintf("select * from tregular ignore index(idx_a) where a > %v order by a, b limit %v;", x, y) + c.Assert(tk.HasPlan(queryPartition, "TableReader"), IsTrue) // check if tableReader is used + tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) + } + + // test indexReader + for i := 0; i < 100; i++ { + // explain select a from t where a > {y} use index(idx_a) order by a limit {x}; // check if IndexLookUp is used + // select a from t where a > {y} use index(idx_a) order by a limit {x}; // it can return the correct result + x := rand.Intn(1099) + y := rand.Intn(2000) + 1 + queryPartition := fmt.Sprintf("select a from trange use index(idx_a) where a > %v order by a limit %v;", x, y) + queryRegular := fmt.Sprintf("select a from tregular use index(idx_a) where a > %v order by a limit %v;", x, y) + c.Assert(tk.HasPlan(queryPartition, "IndexReader"), IsTrue) // check if indexReader is used + tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) + } + + // test indexMerge + for i := 0; i < 100; i++ { + // explain select /*+ use_index_merge(t) */ * from t where a > 2 or b < 5 order by a limit {x}; // check if IndexMerge is used + // select /*+ use_index_merge(t) */ * from t where a > 2 or b < 5 order by a limit {x}; // can return the correct value + y := rand.Intn(2000) + 1 + queryPartition := fmt.Sprintf("select /*+ use_index_merge(thash) */ * from thash where a > 2 or b < 5 order by a, b limit %v;", y) + queryRegular := fmt.Sprintf("select * from tregular where a > 2 or b < 5 order by a, b limit %v;", y) + c.Assert(tk.HasPlan(queryPartition, "IndexMerge"), IsTrue) // check if indexMerge is used + tk.MustQuery(queryPartition).Sort().Check(tk.MustQuery(queryRegular).Sort().Rows()) + } +} + func (s *partitionTableSuite) TestBatchGetandPointGetwithHashPartition(c *C) { if israce.RaceEnabled { c.Skip("exhaustive types test, skip race test") diff --git a/executor/show.go b/executor/show.go index 2bd9b786fcffb..ab08e5ba4cbf5 100644 --- a/executor/show.go +++ b/executor/show.go @@ -1412,6 +1412,10 @@ func (e *ShowExec) fetchShowPrivileges() error { e.appendRow([]interface{}{"Create tablespace", "Server Admin", "To create/alter/drop tablespaces"}) e.appendRow([]interface{}{"Update", "Tables", "To update existing rows"}) e.appendRow([]interface{}{"Usage", "Server Admin", "No privileges - allow connect only"}) + + for _, priv := range privileges.GetDynamicPrivileges() { + e.appendRow([]interface{}{priv, "Server Admin", ""}) + } return nil } diff --git a/executor/write_test.go b/executor/write_test.go index 27ea70ae748a5..b832e52a9935c 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -1554,7 +1554,7 @@ func (s *testSuite8) TestUpdate(c *C) { _, err = tk.Exec("UPDATE t SET c2=16777215 WHERE c1>= -8388608 AND c1 < -9 ORDER BY c1 LIMIT 2") c.Assert(err, IsNil) - tk.MustExec("update (select * from t) t set c1 = 1111111") + tk.MustGetErrCode("update (select * from t) t set c1 = 1111111", mysql.ErrNonUpdatableTable) // test update ignore for bad null error tk.MustExec("drop table if exists t;") @@ -1604,8 +1604,7 @@ func (s *testSuite8) TestUpdate(c *C) { tk.MustExec("drop view v") tk.MustExec("create sequence seq") - _, err = tk.Exec("update seq set minvalue=1") - c.Assert(err.Error(), Equals, "update sequence seq is not supported now.") + tk.MustGetErrCode("update seq set minvalue=1", mysql.ErrBadField) tk.MustExec("drop sequence seq") tk.MustExec("drop table if exists t1, t2") diff --git a/expression/integration_test.go b/expression/integration_test.go index 8e0f70823f327..80e39b76ce746 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -8755,7 +8755,7 @@ PARTITION BY RANGE (c) ( GroupID: groupID, Role: placement.Leader, Count: 1, - LabelConstraints: []placement.Constraint{ + Constraints: []placement.Constraint{ { Key: placement.DCLabelKey, Op: placement.In, @@ -9086,9 +9086,15 @@ func (s *testIntegrationSuite) TestEnumPushDown(c *C) { func (s *testIntegrationSuite) TestJiraSetInnoDBDefaultRowFormat(c *C) { // For issue #23541 // JIRA needs to be able to set this to be happy. + // See: https://nova.moe/run-jira-on-tidb/ tk := testkit.NewTestKit(c, s.store) tk.MustExec("set global innodb_default_row_format = dynamic") tk.MustExec("set global innodb_default_row_format = 'dynamic'") + tk.MustQuery("SHOW VARIABLES LIKE 'innodb_default_row_format'").Check(testkit.Rows("innodb_default_row_format dynamic")) + tk.MustQuery("SHOW VARIABLES LIKE 'character_set_server'").Check(testkit.Rows("character_set_server utf8mb4")) + tk.MustQuery("SHOW VARIABLES LIKE 'innodb_file_format'").Check(testkit.Rows("innodb_file_format Barracuda")) + tk.MustQuery("SHOW VARIABLES LIKE 'innodb_large_prefix'").Check(testkit.Rows("innodb_large_prefix ON")) + } func (s *testIntegrationSerialSuite) TestCollationForBinaryLiteral(c *C) { @@ -9344,4 +9350,14 @@ func (s *testIntegrationSuite) TestEnumIndex(c *C) { testkit.Rows( "OFJHCEKCQGT:MXI7P3[YO4N0DF=2XJWJ4Z9Z;HQ8TMUTZV8YLQAHWJ4BDZHR3A -30 ", "ZOHBSCRMZPOI`IVTSEZAIDAF7DS@1TT20AP9 -30 ")) + + // issue 24576 + tk.MustExec("use test") + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t(col1 enum('a','b','c'), col2 enum('a','b','c'), col3 int, index idx(col1,col2));") + tk.MustExec("insert into t values(1,1,1),(2,2,2),(3,3,3);") + tk.MustQuery("select /*+ use_index(t,idx) */ col3 from t where col2 between 'b' and 'b' and col1 is not null;").Check( + testkit.Rows("2")) + tk.MustQuery("select /*+ use_index(t,idx) */ col3 from t where col2 = 'b' and col1 is not null;").Check( + testkit.Rows("2")) } diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index 6cc24300c1be4..ebe4a0620256f 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -1449,7 +1449,7 @@ func (s *testTableSuite) TestPlacementPolicy(c *C) { ID: "0", Role: "voter", Count: 3, - LabelConstraints: []placement.Constraint{ + Constraints: []placement.Constraint{ { Key: "zone", Op: "in", diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 9e8eaa9204af9..62e5b032c86df 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -196,6 +196,20 @@ func (s *testIntegrationSuite) TestIssue22298(c *C) { tk.MustGetErrMsg(`select * from t where 0 and c = 10;`, "[planner:1054]Unknown column 'c' in 'where clause'") } +func (s *testIntegrationSuite) TestIssue24571(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec(`create view v as select 1 as b;`) + tk.MustExec(`create table t (a int);`) + tk.MustExec(`update v, t set a=2;`) + tk.MustGetErrCode(`update v, t set b=2;`, mysql.ErrNonUpdatableTable) + tk.MustExec("create database db1") + tk.MustExec("use db1") + tk.MustExec("update test.t, (select 1 as a) as t set test.t.a=1;") + // bug in MySQL: ERROR 1288 (HY000): The target table t of the UPDATE is not updatable + tk.MustExec("update (select 1 as a) as t, test.t set test.t.a=1;") +} + func (s *testIntegrationSuite) TestIssue22828(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 7ff0e2ac6c6aa..64bc0c41407e1 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -4239,17 +4239,6 @@ func (b *PlanBuilder) buildUpdate(ctx context.Context, update *ast.UpdateStmt) ( b.popTableHints() }() - // update subquery table should be forbidden - var notUpdatableTbl []string - notUpdatableTbl = extractTableSourceAsNames(update.TableRefs.TableRefs, notUpdatableTbl, true) - for _, asName := range notUpdatableTbl { - for _, assign := range update.List { - if assign.Column.Table.L == asName { - return nil, ErrNonUpdatableTable.GenWithStackByArgs(asName, "UPDATE") - } - } - } - b.inUpdateStmt = true b.isForUpdateRead = true @@ -4265,12 +4254,6 @@ func (b *PlanBuilder) buildUpdate(ctx context.Context, update *ast.UpdateStmt) ( if dbName == "" { dbName = b.ctx.GetSessionVars().CurrentDB } - if t.TableInfo.IsView() { - return nil, errors.Errorf("update view %s is not supported now.", t.Name.O) - } - if t.TableInfo.IsSequence() { - return nil, errors.Errorf("update sequence %s is not supported now.", t.Name.O) - } b.visitInfo = appendVisitInfo(b.visitInfo, mysql.SelectPriv, dbName, t.Name.L, "", nil) } @@ -4314,6 +4297,10 @@ func (b *PlanBuilder) buildUpdate(ctx context.Context, update *ast.UpdateStmt) ( proj.SetChildren(p) p = proj + // update subquery table should be forbidden + var notUpdatableTbl []string + notUpdatableTbl = extractTableSourceAsNames(update.TableRefs.TableRefs, notUpdatableTbl, true) + var updateTableList []*ast.TableName updateTableList = extractTableList(update.TableRefs.TableRefs, updateTableList, true) orderedList, np, allAssignmentsAreConstant, err := b.buildUpdateLists(ctx, updateTableList, update.List, p, notUpdatableTbl) @@ -4417,6 +4404,21 @@ func (b *PlanBuilder) buildUpdateLists(ctx context.Context, tableList []*ast.Tab columnsIdx[assign.Column] = idx } name := p.OutputNames()[idx] + for _, tl := range tableList { + if (tl.Schema.L == "" || tl.Schema.L == name.DBName.L) && (tl.Name.L == name.TblName.L) { + if tl.TableInfo.IsView() || tl.TableInfo.IsSequence() { + return nil, nil, false, ErrNonUpdatableTable.GenWithStackByArgs(name.TblName.O, "UPDATE") + } + // may be a subquery + if tl.Schema.L == "" { + for _, nTbl := range notUpdatableTbl { + if nTbl == name.TblName.L { + return nil, nil, false, ErrNonUpdatableTable.GenWithStackByArgs(name.TblName.O, "UPDATE") + } + } + } + } + } columnFullName := fmt.Sprintf("%s.%s.%s", name.DBName.L, name.TblName.L, name.ColName.L) // We save a flag for the column in map `modifyColumns` // This flag indicated if assign keyword `DEFAULT` to the column @@ -4439,9 +4441,10 @@ func (b *PlanBuilder) buildUpdateLists(ctx context.Context, tableList []*ast.Tab break } } - if !updatable { + if !updatable || tn.TableInfo.IsView() || tn.TableInfo.IsSequence() { continue } + tableInfo := tn.TableInfo tableVal, found := b.is.TableByID(tableInfo.ID) if !found { diff --git a/planner/core/logical_plan_test.go b/planner/core/logical_plan_test.go index 11a116bb4fac8..921f1c99b34ec 100644 --- a/planner/core/logical_plan_test.go +++ b/planner/core/logical_plan_test.go @@ -1459,7 +1459,6 @@ func (s *testPlanSuite) TestNameResolver(c *C) { {"delete a from (select * from t ) as a, t", "[planner:1288]The target table a of the DELETE is not updatable"}, {"delete b from (select * from t ) as a, t", "[planner:1109]Unknown table 'b' in MULTI DELETE"}, {"select '' as fakeCol from t group by values(fakeCol)", "[planner:1054]Unknown column '' in 'VALUES() function'"}, - {"update t, (select * from t) as b set b.a = t.a", "[planner:1288]The target table b of the UPDATE is not updatable"}, {"select row_number() over () from t group by 1", "[planner:1056]Can't group on 'row_number() over ()'"}, {"select row_number() over () as x from t group by 1", "[planner:1056]Can't group on 'x'"}, {"select sum(a) as x from t group by 1", "[planner:1056]Can't group on 'x'"}, diff --git a/privilege/privileges/privileges.go b/privilege/privileges/privileges.go index c5ec2f8394385..6ac58e04e44e4 100644 --- a/privilege/privileges/privileges.go +++ b/privilege/privileges/privileges.go @@ -535,3 +535,14 @@ func RegisterDynamicPrivilege(privNameInUpper string) error { dynamicPrivs = append(dynamicPrivs, privNameInUpper) return nil } + +// GetDynamicPrivileges returns the list of registered DYNAMIC privileges +// for use in meta data commands (i.e. SHOW PRIVILEGES) +func GetDynamicPrivileges() []string { + dynamicPrivLock.Lock() + defer dynamicPrivLock.Unlock() + + privCopy := make([]string, len(dynamicPrivs)) + copy(privCopy, dynamicPrivs) + return privCopy +} diff --git a/privilege/privileges/privileges_test.go b/privilege/privileges/privileges_test.go index 3038aad397076..c7a825a4d894d 100644 --- a/privilege/privileges/privileges_test.go +++ b/privilege/privileges/privileges_test.go @@ -1427,3 +1427,15 @@ func (s *testPrivilegeSuite) TestViewDefiner(c *C) { tk.MustExec("select * from test_view") tk.MustExec("select * from test_view2") } + +func (s *testPrivilegeSuite) TestDynamicPrivsRegistration(c *C) { + se := newSession(c, s.store, s.dbName) + pm := privilege.GetPrivilegeManager(se) + + count := len(privileges.GetDynamicPrivileges()) + + c.Assert(pm.IsDynamicPrivilege("ACDC_ADMIN"), IsFalse) + privileges.RegisterDynamicPrivilege("ACDC_ADMIN") + c.Assert(pm.IsDynamicPrivilege("ACDC_ADMIN"), IsTrue) + c.Assert(len(privileges.GetDynamicPrivileges()), Equals, count+1) +} diff --git a/session/session_test.go b/session/session_test.go index 97a416466d930..4870215f33c9e 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -3318,7 +3318,7 @@ PARTITION BY RANGE (c) ( GroupID: groupID, Role: placement.Leader, Count: 1, - LabelConstraints: []placement.Constraint{ + Constraints: []placement.Constraint{ { Key: placement.DCLabelKey, Op: placement.In, diff --git a/sessionctx/variable/noop.go b/sessionctx/variable/noop.go index c510d9c73ce3a..1ad37b512d807 100644 --- a/sessionctx/variable/noop.go +++ b/sessionctx/variable/noop.go @@ -312,7 +312,7 @@ var noopSysVars = []*SysVar{ {Scope: ScopeNone, Name: "datetime_format", Value: "%Y-%m-%d %H:%i:%s"}, {Scope: ScopeGlobal, Name: "log_syslog", Value: ""}, {Scope: ScopeGlobal | ScopeSession, Name: "transaction_alloc_block_size", Value: "8192"}, - {Scope: ScopeGlobal, Name: "innodb_large_prefix", Type: TypeBool, Value: Off}, + {Scope: ScopeGlobal, Name: "innodb_large_prefix", Type: TypeBool, Value: On}, {Scope: ScopeNone, Name: "performance_schema_max_cond_classes", Value: "80"}, {Scope: ScopeGlobal, Name: "innodb_io_capacity", Value: "200"}, {Scope: ScopeGlobal, Name: "max_binlog_cache_size", Value: "18446744073709547520"}, diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 6d20a9cd0d15e..7f05f80139c12 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -170,8 +170,6 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { txn.KVTxn.GetSnapshot().SetIsStatenessReadOnly(val.(bool)) case kv.MatchStoreLabels: txn.KVTxn.GetSnapshot().SetMatchStoreLabels(val.([]*metapb.StoreLabel)) - default: - txn.KVTxn.SetOption(opt, val) } } @@ -182,7 +180,7 @@ func (txn *tikvTxn) GetOption(opt int) interface{} { case kv.TxnScope: return txn.KVTxn.GetScope() default: - return txn.KVTxn.GetOption(opt) + return nil } } @@ -190,8 +188,6 @@ func (txn *tikvTxn) DelOption(opt int) { switch opt { case kv.CollectRuntimeStats: txn.KVTxn.GetSnapshot().SetRuntimeStats(nil) - default: - txn.KVTxn.DelOption(opt) } } diff --git a/store/gcworker/gc_worker.go b/store/gcworker/gc_worker.go index e0aa993558b6c..b408f279be98a 100644 --- a/store/gcworker/gc_worker.go +++ b/store/gcworker/gc_worker.go @@ -409,12 +409,21 @@ func (w *GCWorker) calcSafePointByMinStartTS(ctx context.Context, safePoint uint return safePoint } - if globalMinStartTS < safePoint { + // If the lock.ts <= max_ts(safePoint), it will be collected and resolved by the gc worker, + // the locks of ongoing pessimistic transactions could be resolved by the gc worker and then + // the transaction is aborted, decrement the value by 1 to avoid this. + globalMinStartAllowedTS := globalMinStartTS + if globalMinStartTS > 0 { + globalMinStartAllowedTS = globalMinStartTS - 1 + } + + if globalMinStartAllowedTS < safePoint { logutil.Logger(ctx).Info("[gc worker] gc safepoint blocked by a running session", zap.String("uuid", w.uuid), zap.Uint64("globalMinStartTS", globalMinStartTS), + zap.Uint64("globalMinStartAllowedTS", globalMinStartAllowedTS), zap.Uint64("safePoint", safePoint)) - safePoint = globalMinStartTS + safePoint = globalMinStartAllowedTS } return safePoint } diff --git a/store/gcworker/gc_worker_test.go b/store/gcworker/gc_worker_test.go index bc09651e0d379..39abe369f82fb 100644 --- a/store/gcworker/gc_worker_test.go +++ b/store/gcworker/gc_worker_test.go @@ -262,7 +262,7 @@ func (s *testGCWorkerSuite) TestMinStartTS(c *C) { strconv.FormatUint(now-oracle.EncodeTSO(20000), 10)) c.Assert(err, IsNil) sp = s.gcWorker.calcSafePointByMinStartTS(ctx, now-oracle.EncodeTSO(10000)) - c.Assert(sp, Equals, now-oracle.EncodeTSO(20000)) + c.Assert(sp, Equals, now-oracle.EncodeTSO(20000)-1) } func (s *testGCWorkerSuite) TestPrepareGC(c *C) { @@ -1589,3 +1589,52 @@ func (s *testGCWorkerSuite) TestGCPlacementRules(c *C) { c.Assert(pid, Equals, int64(1)) c.Assert(err, IsNil) } + +func (s *testGCWorkerSuite) TestGCWithPendingTxn(c *C) { + ctx := context.Background() + gcSafePointCacheInterval = 0 + err := s.gcWorker.saveValueToSysTable(gcEnableKey, booleanFalse) + c.Assert(err, IsNil) + + k1 := []byte("tk1") + v1 := []byte("v1") + txn, err := s.store.Begin() + c.Assert(err, IsNil) + txn.SetOption(kv.Pessimistic, true) + lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + + // Lock the key. + err = txn.Set(k1, v1) + c.Assert(err, IsNil) + err = txn.LockKeys(ctx, lockCtx, k1) + c.Assert(err, IsNil) + + // Prepare to run gc with txn's startTS as the safepoint ts. + spkv := s.tikvStore.GetSafePointKV() + err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), strconv.FormatUint(txn.StartTS(), 10)) + c.Assert(err, IsNil) + s.mustSetTiDBServiceSafePoint(c, txn.StartTS(), txn.StartTS()) + veryLong := gcDefaultLifeTime * 100 + err = s.gcWorker.saveTime(gcLastRunTimeKey, oracle.GetTimeFromTS(s.mustAllocTs(c)).Add(-veryLong)) + c.Assert(err, IsNil) + s.gcWorker.lastFinish = time.Now().Add(-veryLong) + s.oracle.AddOffset(time.Minute * 10) + err = s.gcWorker.saveValueToSysTable(gcEnableKey, booleanTrue) + c.Assert(err, IsNil) + + // Trigger the tick let the gc job start. + err = s.gcWorker.leaderTick(ctx) + c.Assert(err, IsNil) + // Wait for GC finish + select { + case err = <-s.gcWorker.done: + s.gcWorker.gcIsRunning = false + break + case <-time.After(time.Second * 10): + err = errors.New("receive from s.gcWorker.done timeout") + } + c.Assert(err, IsNil) + + err = txn.Commit(ctx) + c.Assert(err, IsNil) +} diff --git a/store/tikv/commit.go b/store/tikv/commit.go index 449081860c029..10c60d9f6d4bd 100644 --- a/store/tikv/commit.go +++ b/store/tikv/commit.go @@ -48,7 +48,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch CommitVersion: c.commitTS, }, pb.Context{Priority: c.priority, SyncLog: c.syncLog}) - sender := NewRegionRequestSender(c.store.regionCache, c.store.client) + sender := NewRegionRequestSender(c.store.regionCache, c.store.GetTiKVClient()) resp, err := sender.SendReq(bo, req, batch.region, ReadTimeoutShort) // If we fail to receive response for the request that commits primary key, it will be undetermined whether this diff --git a/store/tikv/kv.go b/store/tikv/kv.go index f61db4168ef7d..bbf8517a42a8c 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -66,10 +66,13 @@ var oracleUpdateInterval = 2000 // KVStore contains methods to interact with a TiKV cluster. type KVStore struct { - clusterID uint64 - uuid string - oracle oracle.Oracle - client Client + clusterID uint64 + uuid string + oracle oracle.Oracle + clientMu struct { + sync.RWMutex + client Client + } pdClient pd.Client regionCache *RegionCache lockResolver *LockResolver @@ -133,7 +136,6 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client clusterID: pdClient.GetClusterID(context.TODO()), uuid: uuid, oracle: o, - client: reqCollapse{client}, pdClient: pdClient, regionCache: NewRegionCache(pdClient), kv: spkv, @@ -142,6 +144,7 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client closed: make(chan struct{}), replicaReadSeed: rand.Uint32(), } + store.clientMu.client = reqCollapse{client} store.lockResolver = newLockResolver(store) go store.runSafePointChecker() @@ -205,7 +208,7 @@ func (s *KVStore) Close() error { s.pdClient.Close() close(s.closed) - if err := s.client.Close(); err != nil { + if err := s.GetTiKVClient().Close(); err != nil { return errors.Trace(err) } @@ -312,7 +315,7 @@ func (s *KVStore) SupportDeleteRange() (supported bool) { // SendReq sends a request to region. func (s *KVStore) SendReq(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration) (*tikvrpc.Response, error) { - sender := NewRegionRequestSender(s.regionCache, s.client) + sender := NewRegionRequestSender(s.regionCache, s.GetTiKVClient()) return sender.SendReq(bo, req, regionID, timeout) } @@ -343,12 +346,16 @@ func (s *KVStore) SetOracle(oracle oracle.Oracle) { // SetTiKVClient resets the client instance. func (s *KVStore) SetTiKVClient(client Client) { - s.client = client + s.clientMu.Lock() + defer s.clientMu.Unlock() + s.clientMu.client = client } // GetTiKVClient gets the client instance. func (s *KVStore) GetTiKVClient() (client Client) { - return s.client + s.clientMu.RLock() + defer s.clientMu.RUnlock() + return s.clientMu.client } func (s *KVStore) getSafeTS(storeID uint64) uint64 { diff --git a/store/tikv/prewrite.go b/store/tikv/prewrite.go index 7097ba5dbcd3e..ffb47e1fb46fa 100644 --- a/store/tikv/prewrite.go +++ b/store/tikv/prewrite.go @@ -157,7 +157,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff req := c.buildPrewriteRequest(batch, txnSize) for { - sender := NewRegionRequestSender(c.store.regionCache, c.store.client) + sender := NewRegionRequestSender(c.store.regionCache, c.store.GetTiKVClient()) resp, err := sender.SendReq(bo, req, batch.region, ReadTimeoutShort) // If we fail to receive response for async commit prewrite, it will be undetermined whether this diff --git a/store/tikv/scan.go b/store/tikv/scan.go index 6c43b7bdee7cd..035291a783aec 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -164,7 +164,7 @@ func (s *Scanner) getData(bo *Backoffer) error { zap.String("nextEndKey", kv.StrKey(s.nextEndKey)), zap.Bool("reverse", s.reverse), zap.Uint64("txnStartTS", s.startTS())) - sender := NewRegionRequestSender(s.snapshot.store.regionCache, s.snapshot.store.client) + sender := NewRegionRequestSender(s.snapshot.store.regionCache, s.snapshot.store.GetTiKVClient()) var reqEndKey, reqStartKey []byte var loc *KeyLocation var err error diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index 38ce24917d1cf..c33a89efc19be 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -123,7 +123,7 @@ func (s *KVStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bool Priority: kvrpcpb.CommandPri_Normal, }) - sender := NewRegionRequestSender(s.regionCache, s.client) + sender := NewRegionRequestSender(s.regionCache, s.GetTiKVClient()) resp, err := sender.SendReq(bo, req, batch.regionID, ReadTimeoutShort) batchResp := singleBatchResp{resp: resp} diff --git a/store/tikv/txn.go b/store/tikv/txn.go index e444f5adda7f6..aafaa2b323d24 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -221,22 +221,6 @@ func (txn *KVTxn) Delete(k []byte) error { return txn.us.GetMemBuffer().Delete(k) } -// SetOption sets an option with a value, when val is nil, uses the default -// value of this option. -func (txn *KVTxn) SetOption(opt int, val interface{}) { - txn.us.SetOption(opt, val) -} - -// GetOption returns the option -func (txn *KVTxn) GetOption(opt int) interface{} { - return txn.us.GetOption(opt) -} - -// DelOption deletes an option. -func (txn *KVTxn) DelOption(opt int) { - txn.us.DelOption(opt) -} - // SetSchemaLeaseChecker sets a hook to check schema version. func (txn *KVTxn) SetSchemaLeaseChecker(checker SchemaLeaseChecker) { txn.schemaLeaseChecker = checker diff --git a/store/tikv/unionstore/union_store.go b/store/tikv/unionstore/union_store.go index f9a077d1c1352..08354975e38c5 100644 --- a/store/tikv/unionstore/union_store.go +++ b/store/tikv/unionstore/union_store.go @@ -59,7 +59,6 @@ type uSnapshot interface { type KVUnionStore struct { memBuffer *MemDB snapshot uSnapshot - opts options } // NewUnionStore builds a new unionStore. @@ -67,7 +66,6 @@ func NewUnionStore(snapshot uSnapshot) *KVUnionStore { return &KVUnionStore{ snapshot: snapshot, memBuffer: newMemDB(), - opts: make(map[int]interface{}), } } @@ -131,30 +129,8 @@ func (us *KVUnionStore) UnmarkPresumeKeyNotExists(k []byte) { us.memBuffer.UpdateFlags(k, kv.DelPresumeKeyNotExists) } -// SetOption implements the unionStore SetOption interface. -func (us *KVUnionStore) SetOption(opt int, val interface{}) { - us.opts[opt] = val -} - -// DelOption implements the unionStore DelOption interface. -func (us *KVUnionStore) DelOption(opt int) { - delete(us.opts, opt) -} - -// GetOption implements the unionStore GetOption interface. -func (us *KVUnionStore) GetOption(opt int) interface{} { - return us.opts[opt] -} - // SetEntrySizeLimit sets the size limit for each entry and total buffer. func (us *KVUnionStore) SetEntrySizeLimit(entryLimit, bufferLimit uint64) { us.memBuffer.entrySizeLimit = entryLimit us.memBuffer.bufferSizeLimit = bufferLimit } - -type options map[int]interface{} - -func (opts options) Get(opt int) (interface{}, bool) { - v, ok := opts[opt] - return v, ok -} diff --git a/util/ranger/points.go b/util/ranger/points.go index 9c33ccef7feb3..46a4283dd3222 100644 --- a/util/ranger/points.go +++ b/util/ranger/points.go @@ -459,7 +459,7 @@ func handleEnumFromBinOp(sc *stmtctx.StatementContext, ft *types.FieldType, val tmpEnum := types.Enum{} for i := range ft.Elems { tmpEnum.Name = ft.Elems[i] - tmpEnum.Value = uint64(i) + tmpEnum.Value = uint64(i) + 1 d := types.NewMysqlEnumDatum(tmpEnum) if v, err := d.CompareDatum(sc, &val); err == nil { switch op {