Skip to content

Commit

Permalink
planner: refactor a few code of plan cache (#54464)
Browse files Browse the repository at this point in the history
ref #54057
  • Loading branch information
qw4990 committed Jul 5, 2024
1 parent 0c9a679 commit d19fc99
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 215 deletions.
22 changes: 11 additions & 11 deletions pkg/planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,21 +221,21 @@ func GetPlanFromPlanCache(ctx context.Context, sctx sessionctx.Context,
}
}

var matchOpts *PlanCacheMatchOpts
var paramTypes []*types.FieldType
if stmtCtx.UseCache() {
var cacheVal kvcache.Value
var hit, isPointPlan bool
if stmt.PointGet.pointPlan != nil { // if it's PointGet Plan, no need to use MatchOpts
if stmt.PointGet.pointPlan != nil { // if it's PointGet Plan, no need to use paramTypes
cacheVal = &PlanCacheValue{
Plan: stmt.PointGet.pointPlan,
OutputColumns: stmt.PointGet.columnNames,
stmtHints: stmt.PointGet.pointPlanHints,
}
isPointPlan, hit = true, true
} else {
matchOpts = GetMatchOpts(sctx, params)
paramTypes = parseParamTypes(sctx, params)
// TODO: consider instance-level plan cache
cacheVal, hit = sctx.GetSessionPlanCache().Get(cacheKey, matchOpts)
cacheVal, hit = sctx.GetSessionPlanCache().Get(cacheKey, paramTypes)
}
if hit {
if intest.InTest && ctx.Value(PlanCacheKeyTestBeforeAdjust{}) != nil {
Expand All @@ -249,11 +249,11 @@ func GetPlanFromPlanCache(ctx context.Context, sctx sessionctx.Context,
}
}
}
if matchOpts == nil {
matchOpts = GetMatchOpts(sctx, params)
if paramTypes == nil {
paramTypes = parseParamTypes(sctx, params)
}

return generateNewPlan(ctx, sctx, isNonPrepared, is, stmt, cacheKey, matchOpts)
return generateNewPlan(ctx, sctx, isNonPrepared, is, stmt, cacheKey, paramTypes)
}

func adjustCachedPlan(sctx sessionctx.Context, cachedVal *PlanCacheValue, isNonPrepared, isPointPlan bool,
Expand Down Expand Up @@ -288,7 +288,7 @@ func adjustCachedPlan(sctx sessionctx.Context, cachedVal *PlanCacheValue, isNonP
// generateNewPlan call the optimizer to generate a new plan for current statement
// and try to add it to cache
func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared bool, is infoschema.InfoSchema,
stmt *PlanCacheStmt, cacheKey string, matchOpts *PlanCacheMatchOpts) (base.Plan, []*types.FieldName, error) {
stmt *PlanCacheStmt, cacheKey string, paramTypes []*types.FieldType) (base.Plan, []*types.FieldName, error) {
stmtAst := stmt.PreparedAst
sessVars := sctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
Expand All @@ -303,18 +303,18 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared

// check whether this plan is cacheable.
if stmtCtx.UseCache() {
if cacheable, reason := isPlanCacheable(sctx.GetPlanCtx(), p, len(matchOpts.ParamTypes), len(stmt.limits), stmt.hasSubquery); !cacheable {
if cacheable, reason := isPlanCacheable(sctx.GetPlanCtx(), p, len(paramTypes), len(stmt.limits), stmt.hasSubquery); !cacheable {
stmtCtx.SetSkipPlanCache(reason)
}
}

// put this plan into the plan cache.
if stmtCtx.UseCache() {
cached := NewPlanCacheValue(p, names, matchOpts, &stmtCtx.StmtHints)
cached := NewPlanCacheValue(p, names, paramTypes, &stmtCtx.StmtHints)
stmt.NormalizedPlan, stmt.PlanDigest = NormalizePlan(p)
stmtCtx.SetPlan(p)
stmtCtx.SetPlanDigest(stmt.NormalizedPlan, stmt.PlanDigest)
sctx.GetSessionPlanCache().Put(cacheKey, cached, matchOpts)
sctx.GetSessionPlanCache().Put(cacheKey, cached, paramTypes)
if _, ok := p.(*PointGetPlan); ok {
stmt.PointGet.pointPlan = p
stmt.PointGet.columnNames = names
Expand Down
18 changes: 7 additions & 11 deletions pkg/planner/core/plan_cache_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,18 @@ func (pc *instancePlanCache) getHead(key string, create bool) *instancePCNode {
return nil
}

// Get gets the cached value according to key and opts.
func (pc *instancePlanCache) Get(sctx sessionctx.Context, key string, opts any) (value any, ok bool) {
// Get gets the cached value according to key and paramTypes.
func (pc *instancePlanCache) Get(key string, paramTypes any) (value any, ok bool) {
headNode := pc.getHead(key, false)
if headNode == nil { // cache miss
return nil, false
}
return pc.getPlanFromList(sctx, headNode, opts)
return pc.getPlanFromList(headNode, paramTypes)
}

func (*instancePlanCache) getPlanFromList(sctx sessionctx.Context, headNode *instancePCNode, opts any) (any, bool) {
func (*instancePlanCache) getPlanFromList(headNode *instancePCNode, paramTypes any) (any, bool) {
for node := headNode.next.Load(); node != nil; node = node.next.Load() {
var matchOpts *PlanCacheMatchOpts
if opts != nil {
matchOpts = opts.(*PlanCacheMatchOpts)
}
if matchCachedPlan(sctx, node.value, matchOpts) { // v.Plan is read-only, no need to lock
if checkTypesCompatibility4PC(node.value.paramTypes, paramTypes) { // v.Plan is read-only, no need to lock
node.lastUsed.Store(time.Now()) // atomically update the lastUsed field
return node.value, true
}
Expand All @@ -99,7 +95,7 @@ func (*instancePlanCache) getPlanFromList(sctx sessionctx.Context, headNode *ins

// Put puts the key and values into the cache.
// Due to some thread-safety issues, this Put operation might fail, use the returned succ to indicate it.
func (pc *instancePlanCache) Put(sctx sessionctx.Context, key string, value, opts any) (succ bool) {
func (pc *instancePlanCache) Put(key string, value, paramTypes any) (succ bool) {
vMem := value.(*PlanCacheValue).MemoryUsage()
if vMem+pc.totCost.Load() > pc.hardMemLimit.Load() {
return // do nothing if it exceeds the hard limit
Expand All @@ -108,7 +104,7 @@ func (pc *instancePlanCache) Put(sctx sessionctx.Context, key string, value, opt
if headNode == nil {
return false // for safety
}
if _, ok := pc.getPlanFromList(sctx, headNode, opts); ok {
if _, ok := pc.getPlanFromList(headNode, paramTypes); ok {
return // some other thread has inserted the same plan before
}

Expand Down
176 changes: 88 additions & 88 deletions pkg/planner/core/plan_cache_instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@ import (
"github.com/stretchr/testify/require"
)

func _put(sctx sessionctx.Context, pc sessionctx.InstancePlanCache, testKey, memUsage, statsHash int64) (succ bool) {
v := &PlanCacheValue{testKey: testKey, memoryUsage: memUsage, matchOpts: &PlanCacheMatchOpts{}}
return pc.Put(sctx, fmt.Sprintf("%v-%v", testKey, statsHash), v, &PlanCacheMatchOpts{})
func _put(pc sessionctx.InstancePlanCache, testKey, memUsage, statsHash int64) (succ bool) {
v := &PlanCacheValue{testKey: testKey, memoryUsage: memUsage}
return pc.Put(fmt.Sprintf("%v-%v", testKey, statsHash), v, nil)
}

func _hit(t *testing.T, sctx sessionctx.Context, pc sessionctx.InstancePlanCache, testKey, statsHash int) {
v, ok := pc.Get(sctx, fmt.Sprintf("%v-%v", testKey, statsHash), &PlanCacheMatchOpts{})
func _hit(t *testing.T, pc sessionctx.InstancePlanCache, testKey, statsHash int) {
v, ok := pc.Get(fmt.Sprintf("%v-%v", testKey, statsHash), nil)
require.True(t, ok)
require.Equal(t, v.(*PlanCacheValue).testKey, int64(testKey))
}

func _miss(t *testing.T, sctx sessionctx.Context, pc sessionctx.InstancePlanCache, testKey, statsHash int) {
_, ok := pc.Get(sctx, fmt.Sprintf("%v-%v", testKey, statsHash), &PlanCacheMatchOpts{})
func _miss(t *testing.T, pc sessionctx.InstancePlanCache, testKey, statsHash int) {
_, ok := pc.Get(fmt.Sprintf("%v-%v", testKey, statsHash), nil)
require.False(t, ok)
}

Expand All @@ -50,64 +50,64 @@ func TestInstancePlanCacheBasic(t *testing.T) {
}()

pc := NewInstancePlanCache(1000, 1000)
_put(sctx, pc, 1, 100, 0)
_put(sctx, pc, 2, 100, 0)
_put(sctx, pc, 3, 100, 0)
_put(pc, 1, 100, 0)
_put(pc, 2, 100, 0)
_put(pc, 3, 100, 0)
require.Equal(t, pc.MemUsage(), int64(300))
_hit(t, sctx, pc, 1, 0)
_hit(t, sctx, pc, 2, 0)
_hit(t, sctx, pc, 3, 0)
_hit(t, pc, 1, 0)
_hit(t, pc, 2, 0)
_hit(t, pc, 3, 0)

// exceed the hard limit during Put
pc = NewInstancePlanCache(250, 250)
_put(sctx, pc, 1, 100, 0)
_put(sctx, pc, 2, 100, 0)
_put(sctx, pc, 3, 100, 0)
_put(pc, 1, 100, 0)
_put(pc, 2, 100, 0)
_put(pc, 3, 100, 0)
require.Equal(t, pc.MemUsage(), int64(200))
_hit(t, sctx, pc, 1, 0)
_hit(t, sctx, pc, 2, 0)
_miss(t, sctx, pc, 3, 0)
_hit(t, pc, 1, 0)
_hit(t, pc, 2, 0)
_miss(t, pc, 3, 0)

// can't Put 2 same values
pc = NewInstancePlanCache(250, 250)
_put(sctx, pc, 1, 100, 0)
_put(sctx, pc, 1, 101, 0)
_put(pc, 1, 100, 0)
_put(pc, 1, 101, 0)
require.Equal(t, pc.MemUsage(), int64(100)) // the second one will be ignored

// eviction
pc = NewInstancePlanCache(320, 500)
_put(sctx, pc, 1, 100, 0)
_put(sctx, pc, 2, 100, 0)
_put(sctx, pc, 3, 100, 0)
_put(sctx, pc, 4, 100, 0)
_put(sctx, pc, 5, 100, 0)
_hit(t, sctx, pc, 1, 0) // access 1-3 to refresh their last_used
_hit(t, sctx, pc, 2, 0)
_hit(t, sctx, pc, 3, 0)
_put(pc, 1, 100, 0)
_put(pc, 2, 100, 0)
_put(pc, 3, 100, 0)
_put(pc, 4, 100, 0)
_put(pc, 5, 100, 0)
_hit(t, pc, 1, 0) // access 1-3 to refresh their last_used
_hit(t, pc, 2, 0)
_hit(t, pc, 3, 0)
require.Equal(t, pc.Evict(), true)
require.Equal(t, pc.MemUsage(), int64(300))
_hit(t, sctx, pc, 1, 0) // access 1-3 to refresh their last_used
_hit(t, sctx, pc, 2, 0)
_hit(t, sctx, pc, 3, 0)
_miss(t, sctx, pc, 4, 0) // 4-5 have been evicted
_miss(t, sctx, pc, 5, 0)
_hit(t, pc, 1, 0) // access 1-3 to refresh their last_used
_hit(t, pc, 2, 0)
_hit(t, pc, 3, 0)
_miss(t, pc, 4, 0) // 4-5 have been evicted
_miss(t, pc, 5, 0)

// no need to eviction if mem < softLimit
pc = NewInstancePlanCache(320, 500)
_put(sctx, pc, 1, 100, 0)
_put(sctx, pc, 2, 100, 0)
_put(sctx, pc, 3, 100, 0)
_put(pc, 1, 100, 0)
_put(pc, 2, 100, 0)
_put(pc, 3, 100, 0)
require.Equal(t, pc.Evict(), false)
require.Equal(t, pc.MemUsage(), int64(300))
_hit(t, sctx, pc, 1, 0)
_hit(t, sctx, pc, 2, 0)
_hit(t, sctx, pc, 3, 0)
_hit(t, pc, 1, 0)
_hit(t, pc, 2, 0)
_hit(t, pc, 3, 0)

// empty head should be dropped after eviction
pc = NewInstancePlanCache(1, 500)
_put(sctx, pc, 1, 100, 0)
_put(sctx, pc, 2, 100, 0)
_put(sctx, pc, 3, 100, 0)
_put(pc, 1, 100, 0)
_put(pc, 2, 100, 0)
_put(pc, 3, 100, 0)
require.Equal(t, pc.MemUsage(), int64(300))
pcImpl := pc.(*instancePlanCache)
numHeads := 0
Expand All @@ -129,58 +129,58 @@ func TestInstancePlanCacheWithMatchOpts(t *testing.T) {

// same key with different statsHash
pc := NewInstancePlanCache(1000, 1000)
_put(sctx, pc, 1, 100, 1)
_put(sctx, pc, 1, 100, 2)
_put(sctx, pc, 1, 100, 3)
_hit(t, sctx, pc, 1, 1)
_hit(t, sctx, pc, 1, 2)
_hit(t, sctx, pc, 1, 3)
_miss(t, sctx, pc, 1, 4)
_miss(t, sctx, pc, 2, 1)
_put(pc, 1, 100, 1)
_put(pc, 1, 100, 2)
_put(pc, 1, 100, 3)
_hit(t, pc, 1, 1)
_hit(t, pc, 1, 2)
_hit(t, pc, 1, 3)
_miss(t, pc, 1, 4)
_miss(t, pc, 2, 1)

// multiple keys with same statsHash
pc = NewInstancePlanCache(1000, 1000)
_put(sctx, pc, 1, 100, 1)
_put(sctx, pc, 1, 100, 2)
_put(sctx, pc, 2, 100, 1)
_put(sctx, pc, 2, 100, 2)
_hit(t, sctx, pc, 1, 1)
_hit(t, sctx, pc, 1, 2)
_miss(t, sctx, pc, 1, 3)
_hit(t, sctx, pc, 2, 1)
_hit(t, sctx, pc, 2, 2)
_miss(t, sctx, pc, 2, 3)
_miss(t, sctx, pc, 3, 1)
_miss(t, sctx, pc, 3, 2)
_miss(t, sctx, pc, 3, 3)
_put(pc, 1, 100, 1)
_put(pc, 1, 100, 2)
_put(pc, 2, 100, 1)
_put(pc, 2, 100, 2)
_hit(t, pc, 1, 1)
_hit(t, pc, 1, 2)
_miss(t, pc, 1, 3)
_hit(t, pc, 2, 1)
_hit(t, pc, 2, 2)
_miss(t, pc, 2, 3)
_miss(t, pc, 3, 1)
_miss(t, pc, 3, 2)
_miss(t, pc, 3, 3)

// hard limit can take effect in this case
pc = NewInstancePlanCache(200, 200)
_put(sctx, pc, 1, 100, 1)
_put(sctx, pc, 1, 100, 2)
_put(sctx, pc, 1, 100, 3) // the third one will be ignored
_put(pc, 1, 100, 1)
_put(pc, 1, 100, 2)
_put(pc, 1, 100, 3) // the third one will be ignored
require.Equal(t, pc.MemUsage(), int64(200))
_hit(t, sctx, pc, 1, 1)
_hit(t, sctx, pc, 1, 2)
_miss(t, sctx, pc, 1, 3)
_hit(t, pc, 1, 1)
_hit(t, pc, 1, 2)
_miss(t, pc, 1, 3)

// eviction this case
pc = NewInstancePlanCache(300, 500)
_put(sctx, pc, 1, 100, 1)
_put(sctx, pc, 1, 100, 2)
_put(sctx, pc, 1, 100, 3)
_put(sctx, pc, 1, 100, 4)
_put(sctx, pc, 1, 100, 5)
_hit(t, sctx, pc, 1, 1) // refresh 1-3's last_used
_hit(t, sctx, pc, 1, 2)
_hit(t, sctx, pc, 1, 3)
_put(pc, 1, 100, 1)
_put(pc, 1, 100, 2)
_put(pc, 1, 100, 3)
_put(pc, 1, 100, 4)
_put(pc, 1, 100, 5)
_hit(t, pc, 1, 1) // refresh 1-3's last_used
_hit(t, pc, 1, 2)
_hit(t, pc, 1, 3)
require.True(t, pc.Evict())
require.Equal(t, pc.MemUsage(), int64(300))
_hit(t, sctx, pc, 1, 1)
_hit(t, sctx, pc, 1, 2)
_hit(t, sctx, pc, 1, 3)
_miss(t, sctx, pc, 1, 4)
_miss(t, sctx, pc, 1, 5)
_hit(t, pc, 1, 1)
_hit(t, pc, 1, 2)
_hit(t, pc, 1, 3)
_miss(t, pc, 1, 4)
_miss(t, pc, 1, 5)
}

func TestInstancePlanCacheConcurrentRead(t *testing.T) {
Expand All @@ -195,7 +195,7 @@ func TestInstancePlanCacheConcurrentRead(t *testing.T) {
for k := 0; k < 100; k++ {
for statsHash := 0; statsHash < 100; statsHash++ {
if rand.Intn(10) < 7 {
_put(sctx, pc, int64(k), 1, int64(statsHash))
_put(pc, int64(k), 1, int64(statsHash))
flag[k][statsHash] = true
}
}
Expand All @@ -209,9 +209,9 @@ func TestInstancePlanCacheConcurrentRead(t *testing.T) {
for i := 0; i < 10000; i++ {
k, statsHash := rand.Intn(100), rand.Intn(100)
if flag[k][statsHash] {
_hit(t, sctx, pc, k, statsHash)
_hit(t, pc, k, statsHash)
} else {
_miss(t, sctx, pc, k, statsHash)
_miss(t, pc, k, statsHash)
}
time.Sleep(time.Nanosecond * 10)
}
Expand All @@ -235,7 +235,7 @@ func TestInstancePlanCacheConcurrentWriteRead(t *testing.T) {
defer wg.Done()
for i := 0; i < 1000; i++ {
k, statsHash := rand.Intn(100), rand.Intn(100)
if _put(sctx, pc, int64(k), 1, int64(statsHash)) {
if _put(pc, int64(k), 1, int64(statsHash)) {
flag[k][statsHash].Store(true)
}
time.Sleep(time.Nanosecond * 10)
Expand All @@ -249,7 +249,7 @@ func TestInstancePlanCacheConcurrentWriteRead(t *testing.T) {
for i := 0; i < 2000; i++ {
k, statsHash := rand.Intn(100), rand.Intn(100)
if flag[k][statsHash].Load() {
_hit(t, sctx, pc, k, statsHash)
_hit(t, pc, k, statsHash)
}
time.Sleep(time.Nanosecond * 5)
}
Expand Down
Loading

0 comments on commit d19fc99

Please sign in to comment.