Skip to content

Commit

Permalink
pkg: support the TSO format for asof expression
Browse files Browse the repository at this point in the history
Signed-off-by: BornChanger <dawn_catcher@126.com>
  • Loading branch information
BornChanger committed Nov 7, 2023
1 parent 9d07f83 commit 032afaf
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 0 deletions.
61 changes: 61 additions & 0 deletions pkg/executor/recover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,67 @@ func TestFlashbackWithSafeTs(t *testing.T) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/changeFlashbackGetMinSafeTimeTimeout"))
}

func TestFlashbackTSOWithSafeTs(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/changeFlashbackGetMinSafeTimeTimeout", `return(0)`))

timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
defer resetGC()

// Set GC safe point.
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))

time.Sleep(time.Second)
ts, _ := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
flashbackTs := oracle.GetTimeFromTS(ts)
testcases := []struct {
name string
sql string
injectSafeTS uint64
// compareWithSafeTS will be 0 if FlashbackTS==SafeTS, -1 if FlashbackTS < SafeTS, and +1 if FlashbackTS > SafeTS.
compareWithSafeTS int
}{
{
name: "5 seconds ago to now, safeTS 5 secs ago",
sql: fmt.Sprintf("flashback cluster to timestamp '%d'", ts),
injectSafeTS: oracle.GoTimeToTS(flashbackTs),
compareWithSafeTS: 0,
},
{
name: "10 seconds ago to now, safeTS 5 secs ago",
sql: fmt.Sprintf("flashback cluster to timestamp '%d'", ts),
injectSafeTS: oracle.GoTimeToTS(flashbackTs.Add(10 * time.Second)),
compareWithSafeTS: -1,
},
{
name: "5 seconds ago to now, safeTS 10 secs ago",
sql: fmt.Sprintf("flashback cluster to timestamp '%d'", ts),
injectSafeTS: oracle.GoTimeToTS(flashbackTs.Add(-10 * time.Second)),
compareWithSafeTS: 1,
},
}
for _, testcase := range testcases {
t.Log(testcase.name)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", testcase.injectSafeTS)))
if testcase.compareWithSafeTS == 1 {
start := time.Now()
tk.MustContainErrMsg(testcase.sql,
"cannot set flashback timestamp after min-resolved-ts")
// When set `flashbackGetMinSafeTimeTimeout` = 0, no retry for `getStoreGlobalMinSafeTS`.
require.Less(t, time.Since(start), time.Second)
} else {
tk.MustExec(testcase.sql)
}
}
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/changeFlashbackGetMinSafeTimeTimeout"))
}

func TestFlashbackRetryGetMinSafeTime(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
7 changes: 7 additions & 0 deletions pkg/sessiontxn/staleread/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package staleread

import (
"context"
"strconv"
"time"

"github.com/pingcap/failpoint"
Expand All @@ -30,6 +31,7 @@ import (
)

// CalculateAsOfTsExpr calculates the TsExpr of AsOfClause to get a StartTS.
// tsExpr could be an expression of TSO or a timestamp
func CalculateAsOfTsExpr(ctx context.Context, sctx sessionctx.Context, tsExpr ast.ExprNode) (uint64, error) {
sctx.GetSessionVars().StmtCtx.SetStaleTSOProvider(func() (uint64, error) {
failpoint.Inject("mockStaleReadTSO", func(val failpoint.Value) (uint64, error) {
Expand All @@ -49,6 +51,11 @@ func CalculateAsOfTsExpr(ctx context.Context, sctx sessionctx.Context, tsExpr as
return 0, errAsOf.FastGenWithCause("as of timestamp cannot be NULL")
}

// if tsVal is TSO already, return it directly.
if tso, err := strconv.ParseUint(tsVal.GetString(), 10, 64); err == nil {
return tso, nil
}

toTypeTimestamp := types.NewFieldType(mysql.TypeTimestamp)
// We need at least the millionsecond here, so set fsp to 3.
toTypeTimestamp.SetDecimal(3)
Expand Down

0 comments on commit 032afaf

Please sign in to comment.