Skip to content

Commit

Permalink
Merge branch 'remove_lock_index' of github.com:wjhuang2016/tidb into …
Browse files Browse the repository at this point in the history
…remove_lock_index
  • Loading branch information
wjhuang2016 committed Nov 15, 2021
2 parents c70f8ad + be439e2 commit 062915e
Show file tree
Hide file tree
Showing 13 changed files with 467 additions and 15 deletions.
9 changes: 6 additions & 3 deletions br/pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,13 @@ func (ss *Schemas) BackupSchemas(
metaWriter.StartWriteMetasAsync(ctx, op)
for _, s := range ss.schemas {
schema := s
// Because schema.dbInfo is a pointer that many tables point to.
// Remove "add Temporary-prefix into dbName" from closure to prevent concurrent operations.
if utils.IsSysDB(schema.dbInfo.Name.L) {
schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O)
}

workerPool.ApplyOnErrorGroup(errg, func() error {
if utils.IsSysDB(schema.dbInfo.Name.L) {
schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O)
}
logger := log.With(
zap.String("db", schema.dbInfo.Name.O),
zap.String("table", schema.tableInfo.Name.O),
Expand Down
42 changes: 42 additions & 0 deletions br/pkg/backup/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ package backup_test

import (
"context"
"fmt"
"math"
"strings"
"sync/atomic"

"github.com/golang/protobuf/proto"
Expand All @@ -16,6 +18,7 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
Expand Down Expand Up @@ -260,3 +263,42 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchemaWithBrokenStats(c *
c.Assert(schemas2[0].Info, DeepEquals, schemas[0].Info)
c.Assert(schemas2[0].DB, DeepEquals, schemas[0].DB)
}

func (s *testBackupSchemaSuite) TestBackupSchemasForSystemTable(c *C) {
tk := testkit.NewTestKit(c, s.mock.Storage)
es2 := s.GetRandomStorage(c)

systemTablesCount := 32
tablePrefix := "systable"
tk.MustExec("use mysql")
for i := 1; i <= systemTablesCount; i++ {
query := fmt.Sprintf("create table %s%d (a char(1));", tablePrefix, i)
tk.MustExec(query)
}

f, err := filter.Parse([]string{"mysql.systable*"})
c.Assert(err, IsNil)
_, backupSchemas, err := backup.BuildBackupRangeAndSchema(s.mock.Storage, f, math.MaxUint64)
c.Assert(err, IsNil)
c.Assert(backupSchemas.Len(), Equals, systemTablesCount)

ctx := context.Background()
cipher := backuppb.CipherInfo{
CipherType: encryptionpb.EncryptionMethod_PLAINTEXT,
}
updateCh := new(simpleProgress)

metaWriter2 := metautil.NewMetaWriter(es2, metautil.MetaFileSize, false, &cipher)
err = backupSchemas.BackupSchemas(ctx, metaWriter2, s.mock.Storage, nil,
math.MaxUint64, 1, variable.DefChecksumTableConcurrency, true, updateCh)
c.Assert(err, IsNil)
err = metaWriter2.FlushBackupMeta(ctx)
c.Assert(err, IsNil)

schemas2 := s.GetSchemasFromMeta(c, es2)
c.Assert(schemas2, HasLen, systemTablesCount)
for _, schema := range schemas2 {
c.Assert(schema.DB.Name, Equals, utils.TemporaryDBName("mysql"))
c.Assert(strings.HasPrefix(schema.Info.Name.O, tablePrefix), Equals, true)
}
}
50 changes: 48 additions & 2 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ type BatchPointGetExec struct {
// virtualColumnRetFieldTypes records the RetFieldTypes of virtual columns.
virtualColumnRetFieldTypes []*types.FieldType

snapshot kv.Snapshot
stats *runtimeStatsWithSnapshot
snapshot kv.Snapshot
stats *runtimeStatsWithSnapshot
cacheTable kv.MemBuffer
}

// buildVirtualColumnInfo saves virtual column indices and sort them in definition order
Expand Down Expand Up @@ -115,6 +116,9 @@ func (e *BatchPointGetExec) Open(context.Context) error {
} else {
snapshot = e.ctx.GetSnapshotWithTS(e.snapshotTS)
}
if e.cacheTable != nil {
snapshot = cacheTableSnapshot{snapshot, e.cacheTable}
}
if e.runtimeStats != nil {
snapshotStats := &txnsnapshot.SnapshotRuntimeStats{}
e.stats = &runtimeStatsWithSnapshot{
Expand Down Expand Up @@ -162,6 +166,48 @@ func (e *BatchPointGetExec) Open(context.Context) error {
return nil
}

// CacheTable always use memBuffer in session as snapshot.
// cacheTableSnapshot inherits kv.Snapshot and override the BatchGet methods and Get methods.
type cacheTableSnapshot struct {
kv.Snapshot
memBuffer kv.MemBuffer
}

func (s cacheTableSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string][]byte, error) {
values := make(map[string][]byte)
if s.memBuffer == nil {
return values, nil
}

for _, key := range keys {
val, err := s.memBuffer.Get(ctx, key)
if kv.ErrNotExist.Equal(err) {
continue
}

if err != nil {
return nil, err
}

if len(val) == 0 {
continue
}

values[string(key)] = val
}

return values, nil
}

func (s cacheTableSnapshot) Get(ctx context.Context, key kv.Key) ([]byte, error) {
return s.memBuffer.Get(ctx, key)
}

// MockNewCacheTableSnapShot only serves for test.
func MockNewCacheTableSnapShot(snapshot kv.Snapshot, memBuffer kv.MemBuffer) *cacheTableSnapshot {
return &cacheTableSnapshot{snapshot, memBuffer}
}

// Close implements the Executor interface.
func (e *BatchPointGetExec) Close() error {
if e.runtimeStats != nil && e.snapshot != nil {
Expand Down
31 changes: 31 additions & 0 deletions executor/batch_point_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
package executor_test

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
)

func TestBatchPointGetExec(t *testing.T) {
Expand Down Expand Up @@ -337,3 +341,30 @@ func TestBatchPointGetIssue25167(t *testing.T) {
tk.MustExec("insert into t values (1)")
tk.MustQuery("select * from t as of timestamp @a where a in (1,2,3)").Check(testkit.Rows())
}

func TestCacheSnapShot(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
se := tk.Session()
ctx := context.Background()
txn, err := se.GetStore().Begin(tikv.WithStartTS(0))
memBuffer := txn.GetMemBuffer()
require.NoError(t, err)
var keys []kv.Key
for i := 0; i < 2; i++ {
keys = append(keys, []byte(string(rune(i))))
}
err = memBuffer.Set(keys[0], []byte("1111"))
require.NoError(t, err)
err = memBuffer.Set(keys[1], []byte("2222"))
require.NoError(t, err)
cacheTableSnapShot := executor.MockNewCacheTableSnapShot(nil, memBuffer)
get, err := cacheTableSnapShot.Get(ctx, keys[0])
require.NoError(t, err)
require.Equal(t, get, []byte("1111"))
batchGet, err := cacheTableSnapShot.BatchGet(ctx, keys)
require.NoError(t, err)
require.Equal(t, batchGet[string(keys[0])], []byte("1111"))
require.Equal(t, batchGet[string(keys[1])], []byte("2222"))
}
34 changes: 33 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor/aggfuncs"
Expand Down Expand Up @@ -4362,7 +4363,9 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
partTblID: plan.PartTblID,
columns: plan.Columns,
}

if plan.TblInfo.TableCacheStatusType == model.TableCacheStatusEnable {
e.cacheTable = b.getCacheTable(plan.TblInfo, startTS)
}
if plan.TblInfo.TempTableType != model.TempTableNone {
// Temporary table should not do any lock operations
e.lock = false
Expand Down Expand Up @@ -4661,3 +4664,32 @@ func (b *executorBuilder) validCanReadTemporaryTable(tbl *model.TableInfo) error

return nil
}

func (b *executorBuilder) getCacheTable(tblInfo *model.TableInfo, startTS uint64) kv.MemBuffer {
tbl, ok := b.is.TableByID(tblInfo.ID)
if !ok {
b.err = errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(b.ctx.GetSessionVars().CurrentDB, tblInfo.Name))
return nil
}
cacheData := tbl.(table.CachedTable).TryReadFromCache(startTS)
if cacheData != nil {
b.ctx.GetSessionVars().StmtCtx.ReadFromTableCache = true
return cacheData
}
go func() {
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Error("panic in the recoverable goroutine",
zap.Reflect("r", r),
zap.Stack("stack trace"))
}
}()
if !b.ctx.GetSessionVars().StmtCtx.InExplainStmt {
err := tbl.(table.CachedTable).UpdateLockForRead(b.ctx.GetStore(), startTS)
if err != nil {
log.Warn("Update Lock Info Error")
}
}
}()
return nil
}
10 changes: 9 additions & 1 deletion executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
readReplicaScope: b.readReplicaScope,
isStaleness: b.isStaleness,
}

if p.TblInfo.TableCacheStatusType == model.TableCacheStatusEnable {
e.cacheTable = b.getCacheTable(p.TblInfo, startTS)
}
e.base().initCap = 1
e.base().maxChunkSize = 1
e.Init(p, startTS)
Expand Down Expand Up @@ -96,7 +100,8 @@ type PointGetExecutor struct {
// virtualColumnRetFieldTypes records the RetFieldTypes of virtual columns.
virtualColumnRetFieldTypes []*types.FieldType

stats *runtimeStatsWithSnapshot
stats *runtimeStatsWithSnapshot
cacheTable kv.MemBuffer
}

// Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field
Expand Down Expand Up @@ -150,6 +155,9 @@ func (e *PointGetExecutor) Open(context.Context) error {
} else {
e.snapshot = e.ctx.GetSnapshotWithTS(snapshotTS)
}
if e.cacheTable != nil {
e.snapshot = cacheTableSnapshot{e.snapshot, e.cacheTable}
}
if err := e.verifyTxnScope(); err != nil {
return err
}
Expand Down
12 changes: 10 additions & 2 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1477,8 +1477,16 @@ func IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx sessionctx.Context, p Plan) (bo
// If the PointGetPlan needs to read data using unique index (double read), we
// can't use max uint64, because using math.MaxUint64 can't guarantee repeatable-read
// and the data and index would be inconsistent!
isPointGet := v.IndexInfo == nil || (v.IndexInfo.Primary && v.TblInfo.IsCommonHandle)
return isPointGet, nil
// If the PointGetPlan needs to read data from Cache Table, we can't use max uint64,
// because math.MaxUint64 always make cacheData invalid.
noSecondRead := v.IndexInfo == nil || (v.IndexInfo.Primary && v.TblInfo.IsCommonHandle)
if !noSecondRead {
return false, nil
}
if v.TblInfo != nil && (v.TblInfo.TableCacheStatusType != model.TableCacheStatusDisable) {
return false, nil
}
return true, nil
default:
return false, nil
}
Expand Down
25 changes: 23 additions & 2 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4642,8 +4642,9 @@ func (s *testIntegrationSerialSuite) TestPushDownGroupConcatToTiFlash(c *C) {

var input []string
var output []struct {
SQL string
Plan []string
SQL string
Plan []string
Warning []string
}
s.testData.GetTestCases(c, &input, &output)
for i, tt := range input {
Expand All @@ -4653,6 +4654,26 @@ func (s *testIntegrationSerialSuite) TestPushDownGroupConcatToTiFlash(c *C) {
})
res := tk.MustQuery(tt)
res.Check(testkit.Rows(output[i].Plan...))

comment := Commentf("case:%v sql:%s", i, tt)
warnings := tk.Se.GetSessionVars().StmtCtx.GetWarnings()
s.testData.OnRecord(func() {
if len(warnings) > 0 {
output[i].Warning = make([]string, len(warnings))
for j, warning := range warnings {
output[i].Warning[j] = warning.Err.Error()
}
}
})
if len(output[i].Warning) == 0 {
c.Assert(len(warnings), Equals, 0, comment)
} else {
c.Assert(len(warnings), Equals, len(output[i].Warning), comment)
for j, warning := range warnings {
c.Assert(warning.Level, Equals, stmtctx.WarnLevelWarning, comment)
c.Assert(warning.Err.Error(), Equals, output[i].Warning[j], comment)
}
}
}
}

Expand Down
12 changes: 12 additions & 0 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1458,6 +1458,18 @@ func CheckAggCanPushCop(sctx sessionctx.Context, aggFuncs []*aggregation.AggFunc
ret = false
break
}
orderBySize := len(aggFunc.OrderByItems)
if orderBySize > 0 {
exprs := make([]expression.Expression, 0, orderBySize)
for _, item := range aggFunc.OrderByItems {
exprs = append(exprs, item.Expr)
}
if !expression.CanExprsPushDownWithExtraInfo(sc, exprs, client, storeType, false) {
reason = "arguments of AggFunc `" + aggFunc.Name + "` contains unsupported exprs in order-by clause"
ret = false
break
}
}
pb := aggregation.AggFuncToPBExpr(sctx, client, aggFunc)
if pb == nil {
reason = "AggFunc `" + aggFunc.Name + "` can not be converted to pb expr"
Expand Down
3 changes: 2 additions & 1 deletion planner/core/testdata/integration_serial_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@
"desc format = 'brief' select /*+ hash_agg(),agg_to_cop() */ group_concat(col_0, col_0,id) from ts group by id",
"desc format = 'brief' select /*+ hash_agg(),agg_to_cop() */ group_concat(distinct col_0 order by id<10) from ts",
"desc format = 'brief' select /*+ hash_agg(),agg_to_cop() */ group_concat(distinct col_0 order by id<10) from ts group by col_1",
"desc format = 'brief' select /*+ hash_agg(),agg_to_cop() */ group_concat(distinct col_0>10 order by id<10) from ts group by col_1"
"desc format = 'brief' select /*+ hash_agg(),agg_to_cop() */ group_concat(distinct col_0>10 order by id<10) from ts group by col_1",
"desc format = 'brief' select /*+ hash_agg(),agg_to_cop() */ group_concat(distinct col_0 order by col_0<=>null) from ts"
]
},
{
Expand Down
Loading

0 comments on commit 062915e

Please sign in to comment.