Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Provide staleReadProcessor to process stale read #32699

Merged
merged 21 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 4 additions & 35 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"
"time"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
Expand All @@ -45,8 +46,10 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/table/temptable"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
Expand All @@ -60,11 +63,7 @@ import (
"github.com/pingcap/tidb/util/sem"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"

"github.com/cznic/mathutil"
"github.com/pingcap/tidb/table/tables"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -3080,7 +3079,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan,
case *ast.BeginStmt:
readTS := b.ctx.GetSessionVars().TxnReadTS.PeakTxnReadTS()
if raw.AsOf != nil {
startTS, err := calculateTsExpr(b.ctx, raw.AsOf)
startTS, err := staleread.CalculateAsOfTsExpr(b.ctx, raw.AsOf)
if err != nil {
return nil, err
}
Expand All @@ -3097,36 +3096,6 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan,
return p, nil
}

// calculateTsExpr calculates the TsExpr of AsOfClause to get a StartTS.
func calculateTsExpr(sctx sessionctx.Context, asOfClause *ast.AsOfClause) (uint64, error) {
tsVal, err := evalAstExpr(sctx, asOfClause.TsExpr)
if err != nil {
return 0, err
}
toTypeTimestamp := types.NewFieldType(mysql.TypeTimestamp)
// We need at least the millionsecond here, so set fsp to 3.
toTypeTimestamp.Decimal = 3
tsTimestamp, err := tsVal.ConvertTo(sctx.GetSessionVars().StmtCtx, toTypeTimestamp)
if err != nil {
return 0, err
}
tsTime, err := tsTimestamp.GetMysqlTime().GoTime(sctx.GetSessionVars().Location())
if err != nil {
return 0, err
}
return oracle.GoTimeToTS(tsTime), nil
}

func calculateTsWithReadStaleness(sctx sessionctx.Context, readStaleness time.Duration) (uint64, error) {
nowVal, err := expression.GetStmtTimestamp(sctx)
if err != nil {
return 0, err
}
tsVal := nowVal.Add(readStaleness)
minTsVal := expression.GetMinSafeTime(sctx)
return oracle.GoTimeToTS(expression.CalAppropriateTime(tsVal, nowVal, minTsVal)), nil
}

func collectVisitInfoFromRevokeStmt(sctx sessionctx.Context, vi []visitInfo, stmt *ast.RevokeStmt) ([]visitInfo, error) {
// To use REVOKE, you must have the GRANT OPTION privilege,
// and you must have the privileges that you are granting.
Expand Down
136 changes: 26 additions & 110 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package core

import (
"context"
"fmt"
"math"
"strings"
Expand All @@ -26,7 +25,6 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
Expand All @@ -39,6 +37,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/temptable"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -113,7 +112,12 @@ func TryAddExtraLimit(ctx sessionctx.Context, node ast.StmtNode) ast.StmtNode {
// Preprocess resolves table names of the node, and checks some statements' validation.
// preprocessReturn used to extract the infoschema for the tableName and the timestamp from the asof clause.
func Preprocess(ctx sessionctx.Context, node ast.Node, preprocessOpt ...PreprocessOpt) error {
v := preprocessor{ctx: ctx, tableAliasInJoin: make([]map[string]interface{}, 0), withName: make(map[string]interface{})}
v := preprocessor{
ctx: ctx,
tableAliasInJoin: make([]map[string]interface{}, 0),
withName: make(map[string]interface{}),
staleReadProcessor: staleread.NewStaleReadProcessor(ctx),
}
for _, optFn := range preprocessOpt {
optFn(&v)
}
Expand Down Expand Up @@ -184,6 +188,8 @@ type preprocessor struct {
tableAliasInJoin []map[string]interface{}
withName map[string]interface{}

staleReadProcessor staleread.Processor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though I'm OK to make this change, does this mean that we may have more and more xxxProcessor fields in the future? Do we have any idea on how to simplify this part?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a staleReadProcessor here is to decouple the stale read code from preprocessor and expect to reuse it. It is only a choice for stale read and I think for other scenes the design will not be limited to 'xxxProcessor'.

For stale read, in the future, there is still only one processor to process stale read. However, we can have multiple subclasses inherits staleread.Processor and we can use one of them according to the exact scenario


// values that may be returned
*PreprocessorReturn
*PreprocessExecuteISUpdate
Expand Down Expand Up @@ -1446,7 +1452,7 @@ func (p *preprocessor) handleTableName(tn *ast.TableName) {
return
}

p.handleAsOfAndReadTS(tn.AsOf)
p.handleAsOfAndReadTS(tn)
if p.err != nil {
return
}
Expand Down Expand Up @@ -1608,7 +1614,7 @@ func (p *preprocessor) checkFuncCastExpr(node *ast.FuncCastExpr) {
}

// handleAsOfAndReadTS tries to handle as of closure, or possibly read_ts.
func (p *preprocessor) handleAsOfAndReadTS(node *ast.AsOfClause) {
func (p *preprocessor) handleAsOfAndReadTS(tn *ast.TableName) {
if p.stmtTp != TypeSelect {
return
}
Expand All @@ -1620,117 +1626,27 @@ func (p *preprocessor) handleAsOfAndReadTS(node *ast.AsOfClause) {
p.ctx.GetSessionVars().StmtCtx.IsStaleness = true
}
}()
// When statement is during the Txn, we check whether there exists AsOfClause. If exists, we will return error,
// otherwise we should directly set the return param from TxnCtx.
p.ReadReplicaScope = kv.GlobalReplicaScope
if p.ctx.GetSessionVars().InTxn() {
if node != nil {
p.err = ErrAsOf.FastGenWithCause("as of timestamp can't be set in transaction.")
return
}
txnCtx := p.ctx.GetSessionVars().TxnCtx
p.ReadReplicaScope = txnCtx.TxnScope
// It means we meet following case:
// 1. start transaction read only as of timestamp ts
// 2. select statement
if txnCtx.IsStaleness {
p.LastSnapshotTS = txnCtx.StartTS
p.IsStaleness = txnCtx.IsStaleness
p.initedLastSnapshotTS = true
return
}
}
scope := config.GetTxnScopeFromConfig()
if p.ctx.GetSessionVars().GetReplicaRead().IsClosestRead() && scope != kv.GlobalReplicaScope {
p.ReadReplicaScope = scope
}

// If the statement is in auto-commit mode, we will check whether there exists read_ts, if exists,
// we will directly use it. The txnScope will be defined by the zone label, if it is not set, we will use
// global txnScope directly.
readTS := p.ctx.GetSessionVars().TxnReadTS.UseTxnReadTS()
readStaleness := p.ctx.GetSessionVars().ReadStaleness
var ts uint64
switch {
case readTS > 0:
ts = readTS
if node != nil {
p.err = ErrAsOf.FastGenWithCause("can't use select as of while already set transaction as of")
return
}
if !p.initedLastSnapshotTS {
p.SnapshotTSEvaluator = func(sessionctx.Context) (uint64, error) {
return ts, nil
}
p.LastSnapshotTS = ts
p.IsStaleness = true
}
case readTS == 0 && node != nil:
// If we didn't use read_ts, and node isn't nil, it means we use 'select table as of timestamp ... '
// for stale read
// It means we meet following case:
// select statement with as of timestamp
ts, p.err = calculateTsExpr(p.ctx, node)
if p.err != nil {
return
}
if err := sessionctx.ValidateStaleReadTS(context.Background(), p.ctx, ts); err != nil {
p.err = errors.Trace(err)
return
}
if !p.initedLastSnapshotTS {
p.SnapshotTSEvaluator = func(ctx sessionctx.Context) (uint64, error) {
return calculateTsExpr(ctx, node)
}
p.LastSnapshotTS = ts
p.IsStaleness = true
}
case readTS == 0 && node == nil && readStaleness != 0:
// If both readTS and node is empty while the readStaleness isn't, it means we meet following situation:
// set @@tidb_read_staleness='-5';
// select * from t;
// Then the following select statement should be affected by the tidb_read_staleness in session.
ts, p.err = calculateTsWithReadStaleness(p.ctx, readStaleness)
if p.err != nil {
return
}
if err := sessionctx.ValidateStaleReadTS(context.Background(), p.ctx, ts); err != nil {
p.err = errors.Trace(err)
return
}
if !p.initedLastSnapshotTS {
p.SnapshotTSEvaluator = func(ctx sessionctx.Context) (uint64, error) {
return calculateTsWithReadStaleness(p.ctx, readStaleness)
}
p.LastSnapshotTS = ts
p.IsStaleness = true
}
case readTS == 0 && node == nil && readStaleness == 0:
// If both readTS and node is empty while the readStaleness is empty,
// setting p.ReadReplicaScope is necessary to verify the txn scope later
// because we may be in a local txn without using the Stale Read.
p.ReadReplicaScope = scope
if p.err = p.staleReadProcessor.OnSelectTable(tn); p.err != nil {
return
}

// If the select statement is related to multi tables, we should grantee that all tables use the same timestamp
if p.LastSnapshotTS != ts {
p.err = ErrAsOf.GenWithStack("can not set different time in the as of")
if p.initedLastSnapshotTS {
return
}
if p.LastSnapshotTS != 0 {
dom := domain.GetDomain(p.ctx)
is, err := dom.GetSnapshotInfoSchema(p.LastSnapshotTS)
// if infoschema is empty, LastSnapshotTS init failed
if err != nil {
p.err = err
return
}
if is == nil {
p.err = fmt.Errorf("can not get any information schema based on snapshotTS: %d", p.LastSnapshotTS)
return
}
p.InfoSchema = temptable.AttachLocalTemporaryTableInfoSchema(p.ctx, is)

if p.IsStaleness = p.staleReadProcessor.IsStaleness(); p.IsStaleness {
p.LastSnapshotTS = p.staleReadProcessor.GetStalenessReadTS()
p.SnapshotTSEvaluator = p.staleReadProcessor.GetStalenessTSEvaluatorForPrepare()
p.InfoSchema = p.staleReadProcessor.GetStalenessInfoSchema()
}

if p.IsStaleness || p.ctx.GetSessionVars().GetReplicaRead().IsClosestRead() {
p.ReadReplicaScope = config.GetTxnScopeFromConfig()
} else {
p.ReadReplicaScope = p.ctx.GetSessionVars().TxnCtx.TxnScope
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved
}

p.initedLastSnapshotTS = true
}

Expand Down
24 changes: 24 additions & 0 deletions sessiontxn/staleread/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package staleread

import (
mysql "github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/util/dbterror"
)

var (
errAsOf = dbterror.ClassOptimizer.NewStd(mysql.ErrAsOf)
)
30 changes: 30 additions & 0 deletions sessiontxn/staleread/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package staleread

import (
"testing"

"github.com/pingcap/tidb/util/testbridge"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
testbridge.SetupForCommonTest()
goleak.VerifyTestMain(m, opts...)
}
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved
Loading