Skip to content

Commit

Permalink
Merge branch 'master' into 24378-ListInDisk
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge committed May 19, 2021
2 parents 6b077d0 + 0a1c3c0 commit 33f7e91
Show file tree
Hide file tree
Showing 9 changed files with 290 additions and 17 deletions.
95 changes: 95 additions & 0 deletions executor/partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,101 @@ func (s *partitionTableSuite) TestDirectReadingWithAgg(c *C) {
}
}

func (s *partitionTableSuite) TestIdexMerge(c *C) {
if israce.RaceEnabled {
c.Skip("exhaustive types test, skip race test")
}

tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create database test_idx_merge")
tk.MustExec("use test_idx_merge")
tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'")

// list partition table
tk.MustExec(`create table tlist(a int, b int, primary key(a) clustered, index idx_b(b)) partition by list(a)(
partition p0 values in (1, 2, 3, 4),
partition p1 values in (5, 6, 7, 8),
partition p2 values in (9, 10, 11, 12));`)

// range partition table
tk.MustExec(`create table trange(a int, b int, primary key(a) clustered, index idx_b(b)) partition by range(a) (
partition p0 values less than(300),
partition p1 values less than (500),
partition p2 values less than(1100));`)

// hash partition table
tk.MustExec(`create table thash(a int, b int, primary key(a) clustered, index idx_b(b)) partition by hash(a) partitions 4;`)

// regular table
tk.MustExec("create table tregular1(a int, b int, primary key(a) clustered)")
tk.MustExec("create table tregular2(a int, b int, primary key(a) clustered)")

// generate some random data to be inserted
vals := make([]string, 0, 2000)
for i := 0; i < 2000; i++ {
vals = append(vals, fmt.Sprintf("(%v, %v)", rand.Intn(1100), rand.Intn(2000)))
}

tk.MustExec("insert ignore into trange values " + strings.Join(vals, ","))
tk.MustExec("insert ignore into thash values " + strings.Join(vals, ","))
tk.MustExec("insert ignore into tregular1 values " + strings.Join(vals, ","))

vals = make([]string, 0, 2000)
for i := 0; i < 2000; i++ {
vals = append(vals, fmt.Sprintf("(%v, %v)", rand.Intn(12)+1, rand.Intn(20)))
}

tk.MustExec("insert ignore into tlist values " + strings.Join(vals, ","))
tk.MustExec("insert ignore into tregular2 values " + strings.Join(vals, ","))

// test range partition
for i := 0; i < 100; i++ {
x1 := rand.Intn(1099)
x2 := rand.Intn(1099)

queryPartition1 := fmt.Sprintf("select /*+ use_index_merge(trange) */ * from trange where a > %v or b < %v;", x1, x2)
queryRegular1 := fmt.Sprintf("select /*+ use_index_merge(tregular1) */ * from tregular1 where a > %v or b < %v;", x1, x2)
c.Assert(tk.HasPlan(queryPartition1, "IndexMerge"), IsTrue) // check if IndexLookUp is used
tk.MustQuery(queryPartition1).Sort().Check(tk.MustQuery(queryRegular1).Sort().Rows())

queryPartition2 := fmt.Sprintf("select /*+ use_index_merge(trange) */ * from trange where a > %v or b > %v;", x1, x2)
queryRegular2 := fmt.Sprintf("select /*+ use_index_merge(tregular1) */ * from tregular1 where a > %v or b > %v;", x1, x2)
c.Assert(tk.HasPlan(queryPartition2, "IndexMerge"), IsTrue) // check if IndexLookUp is used
tk.MustQuery(queryPartition2).Sort().Check(tk.MustQuery(queryRegular2).Sort().Rows())
}

// test hash partition
for i := 0; i < 100; i++ {
x1 := rand.Intn(1099)
x2 := rand.Intn(1099)

queryPartition1 := fmt.Sprintf("select /*+ use_index_merge(thash) */ * from thash where a > %v or b < %v;", x1, x2)
queryRegular1 := fmt.Sprintf("select /*+ use_index_merge(tregualr1) */ * from tregular1 where a > %v or b < %v;", x1, x2)
c.Assert(tk.HasPlan(queryPartition1, "IndexMerge"), IsTrue) // check if IndexLookUp is used
tk.MustQuery(queryPartition1).Sort().Check(tk.MustQuery(queryRegular1).Sort().Rows())

queryPartition2 := fmt.Sprintf("select /*+ use_index_merge(thash) */ * from thash where a > %v or b > %v;", x1, x2)
queryRegular2 := fmt.Sprintf("select /*+ use_index_merge(tregular1) */ * from tregular1 where a > %v or b > %v;", x1, x2)
c.Assert(tk.HasPlan(queryPartition2, "IndexMerge"), IsTrue) // check if IndexLookUp is used
tk.MustQuery(queryPartition2).Sort().Check(tk.MustQuery(queryRegular2).Sort().Rows())
}

// test list partition
for i := 0; i < 100; i++ {
x1 := rand.Intn(12) + 1
x2 := rand.Intn(12) + 1
queryPartition1 := fmt.Sprintf("select /*+ use_index_merge(tlist) */ * from tlist where a > %v or b < %v;", x1, x2)
queryRegular1 := fmt.Sprintf("select /*+ use_index_merge(tregular2) */ * from tregular2 where a > %v or b < %v;", x1, x2)
c.Assert(tk.HasPlan(queryPartition1, "IndexMerge"), IsTrue) // check if IndexLookUp is used
tk.MustQuery(queryPartition1).Sort().Check(tk.MustQuery(queryRegular1).Sort().Rows())

queryPartition2 := fmt.Sprintf("select /*+ use_index_merge(tlist) */ * from tlist where a > %v or b > %v;", x1, x2)
queryRegular2 := fmt.Sprintf("select /*+ use_index_merge(tregular2) */ * from tregular2 where a > %v or b > %v;", x1, x2)
c.Assert(tk.HasPlan(queryPartition2, "IndexMerge"), IsTrue) // check if IndexLookUp is used
tk.MustQuery(queryPartition2).Sort().Check(tk.MustQuery(queryRegular2).Sort().Rows())
}
}

func (s *globalIndexSuite) TestGlobalIndexScan(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists p")
Expand Down
90 changes: 90 additions & 0 deletions expression/builtin_cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1804,6 +1804,7 @@ func BuildCastFunction4Union(ctx sessionctx.Context, expr Expression, tp *types.

// BuildCastFunction builds a CAST ScalarFunction from the Expression.
func BuildCastFunction(ctx sessionctx.Context, expr Expression, tp *types.FieldType) (res Expression) {
expr = TryPushCastIntoControlFunctionForHybridType(ctx, expr, tp)
var fc functionClass
switch tp.EvalType() {
case types.ETInt:
Expand Down Expand Up @@ -1983,3 +1984,92 @@ func WrapWithCastAsJSON(ctx sessionctx.Context, expr Expression) Expression {
}
return BuildCastFunction(ctx, expr, tp)
}

// TryPushCastIntoControlFunctionForHybridType try to push cast into control function for Hybrid Type.
// If necessary, it will rebuild control function using changed args.
// When a hybrid type is the output of a control function, the result may be as a numeric type to subsequent calculation
// We should perform the `Cast` operation early to avoid using the wrong type for calculation
// For example, the condition `if(1, e, 'a') = 1`, `if` function will output `e` and compare with `1`.
// If the evaltype is ETString, it will get wrong result. So we can rewrite the condition to
// `IfInt(1, cast(e as int), cast('a' as int)) = 1` to get the correct result.
func TryPushCastIntoControlFunctionForHybridType(ctx sessionctx.Context, expr Expression, tp *types.FieldType) (res Expression) {
sf, ok := expr.(*ScalarFunction)
if !ok {
return expr
}

var wrapCastFunc func(ctx sessionctx.Context, expr Expression) Expression
switch tp.EvalType() {
case types.ETInt:
wrapCastFunc = WrapWithCastAsInt
case types.ETReal:
wrapCastFunc = WrapWithCastAsReal
default:
return expr
}

isHybrid := func(ft *types.FieldType) bool {
// todo: compatible with mysql control function using bit type. issue 24725
return ft.Hybrid() && ft.Tp != mysql.TypeBit
}

args := sf.GetArgs()
switch sf.FuncName.L {
case ast.If:
if isHybrid(args[1].GetType()) || isHybrid(args[2].GetType()) {
args[1] = wrapCastFunc(ctx, args[1])
args[2] = wrapCastFunc(ctx, args[2])
f, err := funcs[ast.If].getFunction(ctx, args)
if err != nil {
return expr
}
sf.RetType, sf.Function = f.getRetTp(), f
return sf
}
case ast.Case:
hasHybrid := false
for i := 0; i < len(args)-1; i += 2 {
hasHybrid = hasHybrid || isHybrid(args[i+1].GetType())
}
if len(args)%2 == 1 {
hasHybrid = hasHybrid || isHybrid(args[len(args)-1].GetType())
}
if !hasHybrid {
return expr
}

for i := 0; i < len(args)-1; i += 2 {
args[i+1] = wrapCastFunc(ctx, args[i+1])
}
if len(args)%2 == 1 {
args[len(args)-1] = wrapCastFunc(ctx, args[len(args)-1])
}
f, err := funcs[ast.Case].getFunction(ctx, args)
if err != nil {
return expr
}
sf.RetType, sf.Function = f.getRetTp(), f
return sf
case ast.Elt:
hasHybrid := false
for i := 1; i < len(args); i++ {
hasHybrid = hasHybrid || isHybrid(args[i].GetType())
}
if !hasHybrid {
return expr
}

for i := 1; i < len(args); i++ {
args[i] = wrapCastFunc(ctx, args[i])
}
f, err := funcs[ast.Elt].getFunction(ctx, args)
if err != nil {
return expr
}
sf.RetType, sf.Function = f.getRetTp(), f
return sf
default:
return expr
}
return expr
}
73 changes: 73 additions & 0 deletions expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9435,3 +9435,76 @@ func (s *testIntegrationSuite) TestEnumIndex(c *C) {
tk.MustQuery("select /*+ use_index(t,idx) */ col3 from t where col2 = 'b' and col1 is not null;").Check(
testkit.Rows("2"))
}

func (s *testIntegrationSuite) TestControlFunctionWithEnumOrSet(c *C) {
defer s.cleanEnv(c)

// issue 23114
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists e;")
tk.MustExec("create table e(e enum('c', 'b', 'a'));")
tk.MustExec("insert into e values ('a'),('b'),('a'),('b');")
tk.MustQuery("select e from e where if(e>1, e, e);").Sort().Check(
testkit.Rows("a", "a", "b", "b"))
tk.MustQuery("select e from e where case e when 1 then e else e end;").Sort().Check(
testkit.Rows("a", "a", "b", "b"))
tk.MustQuery("select e from e where case 1 when e then e end;").Check(testkit.Rows())

tk.MustQuery("select if(e>1,e,e)='a' from e").Sort().Check(
testkit.Rows("0", "0", "1", "1"))
tk.MustQuery("select if(e>1,e,e)=1 from e").Sort().Check(
testkit.Rows("0", "0", "0", "0"))
// if and if
tk.MustQuery("select if(e>2,e,e) and if(e<=2,e,e) from e;").Sort().Check(
testkit.Rows("1", "1", "1", "1"))
tk.MustQuery("select if(e>2,e,e) and (if(e<3,0,e) or if(e>=2,0,e)) from e;").Sort().Check(
testkit.Rows("0", "0", "1", "1"))
tk.MustQuery("select * from e where if(e>2,e,e) and if(e<=2,e,e);").Sort().Check(
testkit.Rows("a", "a", "b", "b"))
tk.MustQuery("select * from e where if(e>2,e,e) and (if(e<3,0,e) or if(e>=2,0,e));").Sort().Check(
testkit.Rows("a", "a"))

// issue 24494
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t(a int,b enum(\"b\",\"y\",\"1\"));")
tk.MustExec("insert into t values(0,\"y\"),(1,\"b\"),(null,null),(2,\"1\");")
tk.MustQuery("SELECT count(*) FROM t where if(a,b ,null);").Check(testkit.Rows("2"))

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t(a int,b enum(\"b\"),c enum(\"c\"));")
tk.MustExec("insert into t values(1,1,1),(2,1,1),(1,1,1),(2,1,1);")
tk.MustQuery("select a from t where if(a=1,b,c)=\"b\";").Check(testkit.Rows("1", "1"))
tk.MustQuery("select a from t where if(a=1,b,c)=\"c\";").Check(testkit.Rows("2", "2"))
tk.MustQuery("select a from t where if(a=1,b,c)=1;").Sort().Check(testkit.Rows("1", "1", "2", "2"))
tk.MustQuery("select a from t where if(a=1,b,c);").Sort().Check(testkit.Rows("1", "1", "2", "2"))

tk.MustExec("drop table if exists e;")
tk.MustExec("create table e(e enum('c', 'b', 'a'));")
tk.MustExec("insert into e values(3)")
tk.MustQuery("select elt(1,e) = 'a' from e").Check(testkit.Rows("1"))
tk.MustQuery("select elt(1,e) = 3 from e").Check(testkit.Rows("1"))
tk.MustQuery("select e from e where elt(1,e)").Check(testkit.Rows("a"))

// test set type
tk.MustExec("drop table if exists s;")
tk.MustExec("create table s(s set('c', 'b', 'a'));")
tk.MustExec("insert into s values ('a'),('b'),('a'),('b');")
tk.MustQuery("select s from s where if(s>1, s, s);").Sort().Check(
testkit.Rows("a", "a", "b", "b"))
tk.MustQuery("select s from s where case s when 1 then s else s end;").Sort().Check(
testkit.Rows("a", "a", "b", "b"))
tk.MustQuery("select s from s where case 1 when s then s end;").Check(testkit.Rows())

tk.MustQuery("select if(s>1,s,s)='a' from s").Sort().Check(
testkit.Rows("0", "0", "1", "1"))
tk.MustQuery("select if(s>1,s,s)=4 from s").Sort().Check(
testkit.Rows("0", "0", "1", "1"))

tk.MustExec("drop table if exists s;")
tk.MustExec("create table s(s set('c', 'b', 'a'));")
tk.MustExec("insert into s values('a')")
tk.MustQuery("select elt(1,s) = 'a' from s").Check(testkit.Rows("1"))
tk.MustQuery("select elt(1,s) = 4 from s").Check(testkit.Rows("1"))
tk.MustQuery("select s from s where elt(1,s)").Check(testkit.Rows("a"))
}
13 changes: 13 additions & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,19 @@ func (b *PlanBuilder) buildSelection(ctx context.Context, p LogicalPlan, where a
if len(cnfExpres) == 0 {
return p, nil
}
// check expr field types.
for i, expr := range cnfExpres {
if expr.GetType().EvalType() == types.ETString {
tp := &types.FieldType{
Tp: mysql.TypeDouble,
Flag: expr.GetType().Flag,
Flen: mysql.MaxRealWidth,
Decimal: types.UnspecifiedLength,
}
types.SetBinChsClnFlag(tp)
cnfExpres[i] = expression.TryPushCastIntoControlFunctionForHybridType(b.ctx, expr, tp)
}
}
selection.Conditions = cnfExpres
selection.SetChildren(p)
return selection, nil
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2652,6 +2652,7 @@ var builtinGlobalVariable = []string{
variable.TiDBAllowFallbackToTiKV,
variable.TiDBEnableDynamicPrivileges,
variable.CTEMaxRecursionDepth,
variable.TiDBDMLBatchSize,
}

// loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session.
Expand Down
10 changes: 10 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4476,3 +4476,13 @@ func (s *testTxnStateSuite) TestRollbacking(c *C) {
c.Assert(tk.Se.TxnInfo().State, Equals, txninfo.TxnRollingBack)
<-ch
}

func (s *testSessionSuite) TestReadDMLBatchSize(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("set global tidb_dml_batch_size=1000")
se, err := session.CreateSession(s.store)
c.Assert(err, IsNil)
// `select 1` to load the global variables.
_, _ = se.Execute(context.TODO(), "select 1")
c.Assert(se.GetSessionVars().DMLBatchSize, Equals, 1000)
}
2 changes: 1 addition & 1 deletion sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ var defaultSysVars = []*SysVar{
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBDMLBatchSize, Value: strconv.Itoa(DefDMLBatchSize), Type: TypeUnsigned, MinValue: 0, MaxValue: math.MaxUint64, SetSession: func(s *SessionVars, val string) error {
s.DMLBatchSize = int(tidbOptInt64(val, DefOptCorrelationExpFactor))
s.DMLBatchSize = int(tidbOptInt64(val, DefDMLBatchSize))
return nil
}},
{Scope: ScopeSession, Name: TiDBCurrentTS, Value: strconv.Itoa(DefCurretTS), ReadOnly: true},
Expand Down
14 changes: 5 additions & 9 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,15 +739,11 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
return
}
bo := retry.NewBackofferWithVars(context.Background(), pessimisticLockMaxBackoff, c.txn.vars)
now, err := c.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
now, err := c.store.getTimestampWithRetry(bo, c.txn.GetScope())
if err != nil {
err1 := bo.Backoff(retry.BoPDRPC, err)
if err1 != nil {
logutil.Logger(bo.GetCtx()).Warn("keepAlive get tso fail",
zap.Error(err))
return
}
continue
logutil.Logger(bo.GetCtx()).Warn("keepAlive get tso fail",
zap.Error(err))
return
}

uptime := uint64(oracle.ExtractPhysical(now) - oracle.ExtractPhysical(c.startTS))
Expand Down Expand Up @@ -999,7 +995,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
// from PD and plus one as our MinCommitTS.
if commitTSMayBeCalculated && c.needLinearizability() {
failpoint.Inject("getMinCommitTSFromTSO", nil)
latestTS, err := c.store.oracle.GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
latestTS, err := c.store.getTimestampWithRetry(retry.NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars), c.txn.GetScope())
// If we fail to get a timestamp from PD, we just propagate the failure
// instead of falling back to the normal 2PC because a normal 2PC will
// also be likely to fail due to the same timestamp issue.
Expand Down
9 changes: 2 additions & 7 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,6 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
// locks have been cleaned before GC.
expiredLocks := locks

callerStartTS, err := lr.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
if err != nil {
return false, errors.Trace(err)
}

txnInfos := make(map[uint64]uint64)
startTime := time.Now()
for _, l := range expiredLocks {
Expand All @@ -243,7 +238,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
metrics.LockResolverCountWithExpired.Inc()

// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, false, l)
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, false, l)
if err != nil {
return false, err
}
Expand All @@ -257,7 +252,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
continue
}
if _, ok := errors.Cause(err).(*nonAsyncCommitLock); ok {
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, true, l)
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, true, l)
if err != nil {
return false, err
}
Expand Down

0 comments on commit 33f7e91

Please sign in to comment.