Skip to content

Commit

Permalink
session: read local dc replicas automatically for stale read (#25525)
Browse files Browse the repository at this point in the history
* fix select

Signed-off-by: yisaer <disxiaofei@163.com>
  • Loading branch information
Yisaer committed Jun 18, 2021
1 parent 9a3ed70 commit 799591a
Show file tree
Hide file tree
Showing 20 changed files with 251 additions and 84 deletions.
61 changes: 41 additions & 20 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sort"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/ddl/placement"
Expand All @@ -33,20 +34,38 @@ import (
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/oracle"
)

// RequestBuilder is used to build a "kv.Request".
// It is called before we issue a kv request by "Select".
type RequestBuilder struct {
kv.Request
// txnScope indicates the value of txn_scope
txnScope string
is infoschema.InfoSchema
err error
is infoschema.InfoSchema
err error
}

// Build builds a "kv.Request".
func (builder *RequestBuilder) Build() (*kv.Request, error) {
if builder.TxnScope == "" {
builder.TxnScope = oracle.GlobalTxnScope
}
if builder.IsStaleness && builder.TxnScope != kv.GlobalTxnScope {
builder.MatchStoreLabels = []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Value: builder.TxnScope,
},
}
}
failpoint.Inject("assertRequestBuilderStalenessOption", func(val failpoint.Value) {
assertScope := val.(string)
if len(assertScope) > 0 {
if builder.IsStaleness && assertScope != builder.TxnScope {
panic("batch point get staleness option fail")
}
}
})
err := builder.verifyTxnScope()
if err != nil {
builder.err = err
Expand Down Expand Up @@ -229,16 +248,6 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
builder.Request.TaskID = sv.StmtCtx.TaskID
builder.Request.Priority = builder.getKVPriority(sv)
builder.Request.ReplicaRead = sv.GetReplicaRead()
builder.txnScope = sv.TxnCtx.TxnScope
builder.IsStaleness = sv.TxnCtx.IsStaleness
if builder.IsStaleness && builder.txnScope != kv.GlobalTxnScope {
builder.MatchStoreLabels = []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Value: builder.txnScope,
},
}
}
builder.SetResourceGroupTag(sv.StmtCtx)
return builder
}
Expand Down Expand Up @@ -284,10 +293,10 @@ func (builder *RequestBuilder) SetResourceGroupTag(sc *stmtctx.StatementContext)
}

func (builder *RequestBuilder) verifyTxnScope() error {
if builder.txnScope == "" {
builder.txnScope = kv.GlobalTxnScope
if builder.TxnScope == "" {
builder.TxnScope = kv.GlobalTxnScope
}
if builder.txnScope == kv.GlobalTxnScope || builder.is == nil {
if builder.TxnScope == kv.GlobalTxnScope || builder.is == nil {
return nil
}
visitPhysicalTableID := make(map[int64]struct{})
Expand All @@ -301,7 +310,7 @@ func (builder *RequestBuilder) verifyTxnScope() error {
}

for phyTableID := range visitPhysicalTableID {
valid := VerifyTxnScope(builder.txnScope, phyTableID, builder.is)
valid := VerifyTxnScope(builder.TxnScope, phyTableID, builder.is)
if !valid {
var tblName string
var partName string
Expand All @@ -313,17 +322,29 @@ func (builder *RequestBuilder) verifyTxnScope() error {
tblInfo, _ = builder.is.TableByID(phyTableID)
tblName = tblInfo.Meta().Name.String()
}
err := fmt.Errorf("table %v can not be read by %v txn_scope", tblName, builder.txnScope)
err := fmt.Errorf("table %v can not be read by %v txn_scope", tblName, builder.TxnScope)
if len(partName) > 0 {
err = fmt.Errorf("table %v's partition %v can not be read by %v txn_scope",
tblName, partName, builder.txnScope)
tblName, partName, builder.TxnScope)
}
return err
}
}
return nil
}

// SetTxnScope sets request TxnScope
func (builder *RequestBuilder) SetTxnScope(scope string) *RequestBuilder {
builder.TxnScope = scope
return builder
}

// SetIsStaleness sets request IsStaleness
func (builder *RequestBuilder) SetIsStaleness(is bool) *RequestBuilder {
builder.IsStaleness = is
return builder
}

// TableHandleRangesToKVRanges convert table handle ranges to "KeyRanges" for multiple tables.
func TableHandleRangesToKVRanges(sc *stmtctx.StatementContext, tid []int64, isCommonHandle bool, ranges []*ranger.Range, fb *statistics.QueryFeedback) ([]kv.KeyRange, error) {
if !isCommonHandle {
Expand Down
9 changes: 9 additions & 0 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/oracle"
)

var _ = Suite(&testSuite{})
Expand Down Expand Up @@ -323,6 +324,7 @@ func (s *testSuite) TestRequestBuilder1(c *C) {
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
TxnScope: oracle.GlobalTxnScope,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -399,6 +401,7 @@ func (s *testSuite) TestRequestBuilder2(c *C) {
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
TxnScope: oracle.GlobalTxnScope,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -446,6 +449,7 @@ func (s *testSuite) TestRequestBuilder3(c *C) {
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
TxnScope: oracle.GlobalTxnScope,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -493,6 +497,7 @@ func (s *testSuite) TestRequestBuilder4(c *C) {
NotFillCache: false,
SyncLog: false,
ReplicaRead: kv.ReplicaReadLeader,
TxnScope: oracle.GlobalTxnScope,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -536,6 +541,7 @@ func (s *testSuite) TestRequestBuilder5(c *C) {
NotFillCache: true,
SyncLog: false,
Streaming: false,
TxnScope: oracle.GlobalTxnScope,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -569,6 +575,7 @@ func (s *testSuite) TestRequestBuilder6(c *C) {
NotFillCache: true,
SyncLog: false,
Streaming: false,
TxnScope: oracle.GlobalTxnScope,
}

c.Assert(actual, DeepEquals, expect)
Expand Down Expand Up @@ -603,6 +610,7 @@ func (s *testSuite) TestRequestBuilder7(c *C) {
SyncLog: false,
Streaming: false,
ReplicaRead: replicaRead,
TxnScope: oracle.GlobalTxnScope,
}

c.Assert(actual, DeepEquals, expect)
Expand All @@ -624,6 +632,7 @@ func (s *testSuite) TestRequestBuilder8(c *C) {
Priority: 0,
MemTracker: (*memory.Tracker)(nil),
SchemaVar: 0,
TxnScope: oracle.GlobalTxnScope,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down
7 changes: 5 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ type ExecStmt struct {
SnapshotTS uint64
// ExplicitStaleness means whether the 'SELECT' clause are using 'AS OF TIMESTAMP' to perform stale read explicitly.
ExplicitStaleness bool
// TxnScope indicates the scope the store selector scope the request visited
TxnScope string
// InfoSchema stores a reference to the schema information.
InfoSchema infoschema.InfoSchema
// Plan stores a reference to the final physical plan.
Expand Down Expand Up @@ -245,7 +247,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
}
}
if a.PsStmt.Executor == nil {
b := newExecutorBuilder(a.Ctx, is, a.Ti, a.SnapshotTS, a.ExplicitStaleness)
b := newExecutorBuilder(a.Ctx, is, a.Ti, a.SnapshotTS, a.ExplicitStaleness, a.TxnScope)
newExecutor := b.build(a.Plan)
if b.err != nil {
return nil, b.err
Expand Down Expand Up @@ -291,6 +293,7 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
a.InfoSchema = ret.InfoSchema
a.SnapshotTS = ret.LastSnapshotTS
a.ExplicitStaleness = ret.ExplicitStaleness
a.TxnScope = ret.TxnScope
p, names, err := planner.Optimize(ctx, a.Ctx, a.StmtNode, a.InfoSchema)
if err != nil {
return 0, err
Expand Down Expand Up @@ -792,7 +795,7 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
}

b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti, a.SnapshotTS, a.ExplicitStaleness)
b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti, a.SnapshotTS, a.ExplicitStaleness, a.TxnScope)
e := b.build(a.Plan)
if b.err != nil {
return nil, errors.Trace(b.err)
Expand Down
20 changes: 15 additions & 5 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type BatchPointGetExec struct {
partTblID int64
idxVals [][]types.Datum
startTS uint64
txnScope string
isStaleness bool
snapshotTS uint64
txn kv.Transaction
lock bool
Expand Down Expand Up @@ -124,14 +126,22 @@ func (e *BatchPointGetExec) Open(context.Context) error {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
snapshot.SetOption(kv.TaskID, stmtCtx.TaskID)
snapshot.SetOption(kv.TxnScope, e.ctx.GetSessionVars().TxnCtx.TxnScope)
isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness
snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness)
if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != kv.GlobalTxnScope {
snapshot.SetOption(kv.TxnScope, e.txnScope)
snapshot.SetOption(kv.IsStalenessReadOnly, e.isStaleness)
failpoint.Inject("assertBatchPointStalenessOption", func(val failpoint.Value) {
assertScope := val.(string)
if len(assertScope) > 0 {
if e.isStaleness && assertScope != e.txnScope {
panic("batch point get staleness option fail")
}
}
})

if e.isStaleness && e.txnScope != kv.GlobalTxnScope {
snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Value: e.ctx.GetSessionVars().TxnCtx.TxnScope,
Value: e.txnScope,
},
})
}
Expand Down
11 changes: 6 additions & 5 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/stringutil"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap/zapcore"
)

Expand Down Expand Up @@ -290,7 +291,7 @@ func buildHashAggExecutor(ctx sessionctx.Context, src Executor, schema *expressi
plan.SetSchema(schema)
plan.Init(ctx, nil, 0)
plan.SetChildren(nil)
b := newExecutorBuilder(ctx, nil, nil, 0, false)
b := newExecutorBuilder(ctx, nil, nil, 0, false, oracle.GlobalTxnScope)
exec := b.build(plan)
hashAgg := exec.(*HashAggExec)
hashAgg.children[0] = src
Expand Down Expand Up @@ -342,7 +343,7 @@ func buildStreamAggExecutor(ctx sessionctx.Context, srcExec Executor, schema *ex
plan = sg
}

b := newExecutorBuilder(ctx, nil, nil, 0, false)
b := newExecutorBuilder(ctx, nil, nil, 0, false, oracle.GlobalTxnScope)
return b.build(plan)
}

Expand Down Expand Up @@ -575,7 +576,7 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f
plan = win
}

b := newExecutorBuilder(ctx, nil, nil, 0, false)
b := newExecutorBuilder(ctx, nil, nil, 0, false, oracle.GlobalTxnScope)
exec := b.build(plan)
return exec
}
Expand Down Expand Up @@ -1322,7 +1323,7 @@ func prepare4IndexInnerHashJoin(tc *indexJoinTestCase, outerDS *mockDataSource,
hashCols: tc.outerHashKeyIdx,
},
innerCtx: innerCtx{
readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil, nil, 0, false)},
readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil, nil, 0, false, oracle.GlobalTxnScope)},
rowTypes: rightTypes,
colLens: colLens,
keyCols: tc.innerJoinKeyIdx,
Expand Down Expand Up @@ -1388,7 +1389,7 @@ func prepare4IndexMergeJoin(tc *indexJoinTestCase, outerDS *mockDataSource, inne
compareFuncs: outerCompareFuncs,
},
innerMergeCtx: innerMergeCtx{
readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil, nil, 0, false)},
readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil, nil, 0, false, oracle.GlobalTxnScope)},
rowTypes: rightTypes,
joinKeys: innerJoinKeys,
colLens: colLens,
Expand Down
12 changes: 11 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type executorBuilder struct {
Ti *TelemetryInfo
// ExplicitStaleness means whether the 'SELECT' clause are using 'AS OF TIMESTAMP' to perform stale read explicitly.
explicitStaleness bool
txnScope string
}

// CTEStorages stores resTbl and iterInTbl for CTEExec.
Expand All @@ -95,13 +96,14 @@ type CTEStorages struct {
IterInTbl cteutil.Storage
}

func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, snapshotTS uint64, explicitStaleness bool) *executorBuilder {
func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, snapshotTS uint64, explicitStaleness bool, txnScope string) *executorBuilder {
return &executorBuilder{
ctx: ctx,
is: is,
Ti: ti,
snapshotTS: snapshotTS,
explicitStaleness: explicitStaleness,
txnScope: txnScope,
}
}

Expand Down Expand Up @@ -2679,6 +2681,8 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
dagPB: dagReq,
startTS: startTS,
txnScope: b.txnScope,
isStaleness: b.explicitStaleness,
table: tbl,
keepOrder: ts.KeepOrder,
desc: ts.Desc,
Expand Down Expand Up @@ -2950,6 +2954,8 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
dagPB: dagReq,
startTS: startTS,
txnScope: b.txnScope,
isStaleness: b.explicitStaleness,
physicalTableID: physicalTableID,
table: tbl,
index: is.Index,
Expand Down Expand Up @@ -3566,6 +3572,8 @@ func (builder *dataReaderBuilder) buildTableReaderBase(ctx context.Context, e *T
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetStreaming(e.streaming).
SetTxnScope(e.txnScope).
SetIsStaleness(e.isStaleness).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetFromInfoSchema(e.ctx.GetInfoSchema()).
Build()
Expand Down Expand Up @@ -4078,6 +4086,8 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
idxInfo: plan.IndexInfo,
rowDecoder: decoder,
startTS: startTS,
txnScope: b.txnScope,
isStaleness: b.explicitStaleness,
keepOrder: plan.KeepOrder,
desc: plan.Desc,
lock: plan.Lock,
Expand Down
1 change: 1 addition & 0 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
GoCtx: ctx,
SnapshotTS: ret.LastSnapshotTS,
ExplicitStaleness: ret.ExplicitStaleness,
TxnScope: ret.TxnScope,
InfoSchema: ret.InfoSchema,
Plan: finalPlan,
LowerPriority: lowerPriority,
Expand Down
Loading

0 comments on commit 799591a

Please sign in to comment.