Skip to content

Commit

Permalink
*: Enable plan cache for partitioned tables (#49161)
Browse files Browse the repository at this point in the history
close #33031, close #45532
  • Loading branch information
mjonss authored Mar 12, 2024
1 parent 2d57833 commit ccbab5e
Show file tree
Hide file tree
Showing 54 changed files with 1,700 additions and 1,077 deletions.
11 changes: 9 additions & 2 deletions pkg/ddl/tests/partition/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1455,12 +1455,15 @@ func TestTruncatePartitionWithGlobalIndex(t *testing.T) {
tk3 := testkit.NewTestKit(t, store)
tk3.MustExec(`begin`)
tk3.MustExec(`use test`)
tk3.MustQuery(`explain format='brief' select b from test_global use index(idx_b) where b = 15`).CheckContain("IndexRangeScan")
tk3.MustQuery(`explain format='brief' select c from test_global use index(idx_c) where c = 15`).CheckContain("IndexRangeScan")
tk3.MustQuery(`select b from test_global use index(idx_b) where b = 15`).Check(testkit.Rows())
tk3.MustQuery(`select c from test_global use index(idx_c) where c = 15`).Check(testkit.Rows())
// Here it will fail with
// the partition is not in public.
err := tk3.ExecToErr(`insert into test_global values (15,15,15)`)
assert.NotNil(t, err)
require.Error(t, err)
require.ErrorContains(t, err, "the partition is in not in public")
tk2.MustExec(`commit`)
tk3.MustExec(`commit`)
<-syncChan
Expand All @@ -1477,6 +1480,10 @@ func TestTruncatePartitionWithGlobalIndex(t *testing.T) {
require.NotNil(t, idxInfo)
cnt = checkGlobalIndexCleanUpDone(t, tk.Session(), tt.Meta(), idxInfo, pid)
require.Equal(t, 3, cnt)
tk.MustQuery(`select b from test_global use index(idx_b) where b = 15`).Check(testkit.Rows())
tk.MustQuery(`select c from test_global use index(idx_c) where c = 15`).Check(testkit.Rows())
tk3.MustQuery(`explain format='brief' select b from test_global use index(idx_b) where b = 15`).CheckContain("Point_Get")
tk3.MustQuery(`explain format='brief' select c from test_global use index(idx_c) where c = 15`).CheckContain("Point_Get")
}

func TestGlobalIndexUpdateInTruncatePartition(t *testing.T) {
Expand Down Expand Up @@ -3438,7 +3445,7 @@ func TestExchangeValidateHandleNullValue(t *testing.T) {
tk.MustExec(`alter table t3 EXCHANGE PARTITION p0 WITH TABLE t4`)

tk.MustExec(`CREATE TABLE t5 (id int, c varchar(128)) partition by range (id)(
partition p0 values less than (10),
partition p0 values less than (10),
partition p1 values less than (20),
partition p2 values less than (maxvalue))`)
tk.MustExec(`CREATE TABLE t6 (id int, c varchar(128))`)
Expand Down
18 changes: 10 additions & 8 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
}
a.Ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityHigh

var pointExecutor *PointGetExecutor
var executor exec.Executor
useMaxTS := startTs == math.MaxUint64

// try to reuse point get executor
Expand All @@ -293,24 +293,26 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
pointGetPlan := a.PsStmt.PreparedAst.CachedPlan.(*plannercore.PointGetPlan)
exec.Init(pointGetPlan)
a.PsStmt.Executor = exec
pointExecutor = exec
executor = exec
}
}

if pointExecutor == nil {
if executor == nil {
b := newExecutorBuilder(a.Ctx, a.InfoSchema)
pointExecutor = b.build(a.Plan).(*PointGetExecutor)
executor = b.build(a.Plan)
if b.err != nil {
return nil, b.err
}
pointExecutor, ok := executor.(*PointGetExecutor)

if useMaxTS {
// Don't cache the executor for non point-get (table dual) or partitioned tables
if ok && useMaxTS && pointExecutor.partitionDefIdx == nil {
a.PsStmt.Executor = pointExecutor
}
}

if err = exec.Open(ctx, pointExecutor); err != nil {
terror.Log(exec.Close(pointExecutor))
if err = exec.Open(ctx, executor); err != nil {
terror.Log(exec.Close(executor))
return nil, err
}

Expand All @@ -330,7 +332,7 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
}

return &recordSet{
executor: pointExecutor,
executor: executor,
stmt: a,
txnStartTS: startTs,
}, nil
Expand Down
136 changes: 54 additions & 82 deletions pkg/executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/core"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
driver "github.com/pingcap/tidb/pkg/store/driver/txn"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil/consistency"
"github.com/pingcap/tidb/pkg/util/rowcodec"
"github.com/tikv/client-go/v2/tikvrpc"
Expand All @@ -46,26 +46,27 @@ type BatchPointGetExec struct {
exec.BaseExecutor
indexUsageReporter *exec.IndexUsageReporter

tblInfo *model.TableInfo
idxInfo *model.IndexInfo
handles []kv.Handle
physIDs []int64
partExpr *tables.PartitionExpr
partPos int
tblInfo *model.TableInfo
idxInfo *model.IndexInfo
handles []kv.Handle
// table/partition IDs for handle or index read
// (can be secondary unique key,
// and need lookup through handle)
planPhysIDs []int64
singlePart bool
partTblID int64
idxVals [][]types.Datum
txn kv.Transaction
lock bool
waitTime int64
inited uint32
values [][]byte
index int
rowDecoder *rowcodec.ChunkDecoder
keepOrder bool
desc bool
batchGetter kv.BatchGetter
// If != 0 then it is a single partition under Static Prune mode.
singlePartID int64
partitionNames []model.CIStr
idxVals [][]types.Datum
txn kv.Transaction
lock bool
waitTime int64
inited uint32
values [][]byte
index int
rowDecoder *rowcodec.ChunkDecoder
keepOrder bool
desc bool
batchGetter kv.BatchGetter

columns []*model.ColumnInfo
// virtualColumnIndex records all the indices of virtual columns and sort them in definition
Expand Down Expand Up @@ -208,15 +209,6 @@ func (e *BatchPointGetExec) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}

func datumsContainNull(vals []types.Datum) bool {
for _, val := range vals {
if val.IsNull() {
return true
}
}
return false
}

func (e *BatchPointGetExec) initialize(ctx context.Context) error {
var handleVals map[string][]byte
var indexKeys []kv.Key
Expand All @@ -228,30 +220,13 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
dedup := make(map[hack.MutableString]struct{})
toFetchIndexKeys := make([]kv.Key, 0, len(e.idxVals))
for i, idxVals := range e.idxVals {
// For all x, 'x IN (null)' evaluate to null, so the query get no result.
if datumsContainNull(idxVals) {
continue
}

var physID int64
if e.partPos == core.GlobalWithoutColumnPos {
physID = e.tblInfo.ID
} else {
if len(e.planPhysIDs) > 0 {
physID = e.planPhysIDs[i]
} else {
physID, err = core.GetPhysID(e.tblInfo, e.partExpr, idxVals[e.partPos])
if err != nil {
continue
}
}
physID := e.tblInfo.ID
if e.singlePartID != 0 {
physID = e.singlePartID
} else if len(e.planPhysIDs) > i {
physID = e.planPhysIDs[i]
}

// If this BatchPointGetExec is built only for the specific table partition, skip those filters not matching this partition.
if e.singlePart && e.partTblID != physID {
continue
}
idxKey, err1 := EncodeUniqueIndexKey(e.Ctx(), e.tblInfo, e.idxInfo, idxVals, physID)
idxKey, err1 := plannercore.EncodeUniqueIndexKey(e.Ctx(), e.tblInfo, e.idxInfo, idxVals, physID)
if err1 != nil && !kv.ErrNotExist.Equal(err1) {
return err1
}
Expand All @@ -266,6 +241,10 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
toFetchIndexKeys = append(toFetchIndexKeys, idxKey)
}
if e.keepOrder {
// TODO: if multiple partitions, then the IDs needs to be
// in the same order as the index keys
// and should skip table id part when comparing
intest.Assert(e.singlePartID != 0 || len(e.planPhysIDs) <= 1 || e.idxInfo.Global)
slices.SortFunc(toFetchIndexKeys, func(i, j kv.Key) int {
if e.desc {
return j.Cmp(i)
Expand Down Expand Up @@ -296,7 +275,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {

e.handles = make([]kv.Handle, 0, len(toFetchIndexKeys))
if e.tblInfo.Partition != nil {
e.physIDs = make([]int64, 0, len(toFetchIndexKeys))
e.planPhysIDs = e.planPhysIDs[:0]
}
for _, key := range toFetchIndexKeys {
handleVal := handleVals[string(key)]
Expand All @@ -307,10 +286,6 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
if err1 != nil {
return err1
}
e.handles = append(e.handles, handle)
if rc {
indexKeys = append(indexKeys, key)
}
if e.tblInfo.Partition != nil {
var pid int64
if e.idxInfo.Global {
Expand All @@ -319,15 +294,25 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
if err != nil {
return err
}
e.physIDs = append(e.physIDs, pid)
if e.singlePartID != 0 && e.singlePartID != pid {
continue
}
if !matchPartitionNames(pid, e.partitionNames, e.tblInfo.GetPartitionInfo()) {
continue
}
e.planPhysIDs = append(e.planPhysIDs, pid)
} else {
pid = tablecodec.DecodeTableID(key)
e.physIDs = append(e.physIDs, pid)
e.planPhysIDs = append(e.planPhysIDs, pid)
}
if e.lock {
e.UpdateDeltaForTableID(pid)
}
}
e.handles = append(e.handles, handle)
if rc {
indexKeys = append(indexKeys, key)
}
}

// The injection is used to simulate following scenario:
Expand Down Expand Up @@ -373,36 +358,23 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
}
}
slices.SortFunc(e.handles, less)
// TODO: if partitioned table, sorting the handles would also
// need to have the physIDs rearranged in the same order!
intest.Assert(e.singlePartID != 0 || len(e.planPhysIDs) <= 1)
}

keys := make([]kv.Key, 0, len(e.handles))
newHandles := make([]kv.Handle, 0, len(e.handles))
for i, handle := range e.handles {
var tID int64
if len(e.physIDs) > 0 {
tID = e.physIDs[i]
tID := e.tblInfo.ID
if e.singlePartID != 0 {
tID = e.singlePartID
} else if len(e.planPhysIDs) > 0 {
// Direct handle read
tID = e.planPhysIDs[i]
} else {
if handle.IsInt() {
d := types.NewIntDatum(handle.IntValue())
tID, err = core.GetPhysID(e.tblInfo, e.partExpr, d)
if err != nil {
continue
}
} else {
_, d, err1 := codec.DecodeOne(handle.EncodedCol(e.partPos))
if err1 != nil {
return err1
}
tID, err = core.GetPhysID(e.tblInfo, e.partExpr, d)
if err != nil {
continue
}
}
}
// If this BatchPointGetExec is built only for the specific table partition, skip those handles not matching this partition.
if e.singlePart && e.partTblID != tID {
if tID <= 0 {
// not matching any partition
continue
}
key := tablecodec.EncodeRowKeyWithHandle(tID, handle)
Expand Down
Loading

0 comments on commit ccbab5e

Please sign in to comment.