diff --git a/executor/builder.go b/executor/builder.go index 73c70e2bb5d74..258e26018141a 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1354,12 +1354,16 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo }, } case strings.ToLower(infoschema.TableSlowQuery), strings.ToLower(infoschema.ClusterTableSlowLog): + retriever := &slowQueryRetriever{ + table: v.Table, + outputCols: v.Columns, + } + if v.Extractor != nil { + retriever.extractor = v.Extractor.(*plannercore.SlowQueryExtractor) + } return &MemTableReaderExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), - retriever: &SlowQueryRetriever{ - table: v.Table, - outputCols: v.Columns, - }, + retriever: retriever, } } } diff --git a/executor/slow_query.go b/executor/slow_query.go index b3f94ce0f555e..2294cc5a00cf7 100644 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -18,14 +18,19 @@ import ( "context" "io" "os" + "path/filepath" + "sort" "strconv" "strings" "time" "github.com/pingcap/errors" + "github.com/pingcap/parser/auth" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/infoschema" + plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" @@ -37,28 +42,34 @@ import ( "go.uber.org/zap" ) -//SlowQueryRetriever is used to read slow log data. -type SlowQueryRetriever struct { +//slowQueryRetriever is used to read slow log data. +type slowQueryRetriever struct { table *model.TableInfo outputCols []*model.ColumnInfo retrieved bool initialized bool - file *os.File + extractor *plannercore.SlowQueryExtractor + files []logFile + fileIdx int fileLine int + checker *slowLogChecker } -func (e *SlowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) { +func (e *slowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) { if e.retrieved { return nil, nil } if !e.initialized { - var err error - e.file, err = os.Open(sctx.GetSessionVars().SlowQueryFile) + err := e.initialize(sctx) if err != nil { return nil, err } - e.initialized = true } + if len(e.files) == 0 || e.fileIdx >= len(e.files) { + e.retrieved = true + return nil, nil + } + rows, err := e.dataForSlowLog(sctx) if err != nil { return nil, err @@ -77,9 +88,29 @@ func (e *SlowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Conte return retRows, nil } -func (e *SlowQueryRetriever) close() error { - if e.file != nil { - err := e.file.Close() +func (e *slowQueryRetriever) initialize(sctx sessionctx.Context) error { + var err error + var hasProcessPriv bool + if pm := privilege.GetPrivilegeManager(sctx); pm != nil { + hasProcessPriv = pm.RequestVerification(sctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv) + } + e.checker = &slowLogChecker{ + hasProcessPriv: hasProcessPriv, + user: sctx.GetSessionVars().User, + } + if e.extractor != nil { + e.checker.enableTimeCheck = e.extractor.Enable + e.checker.startTime = e.extractor.StartTime + e.checker.endTime = e.extractor.EndTime + } + e.initialized = true + e.files, err = e.getAllFiles(sctx, sctx.GetSessionVars().SlowQueryFile) + return err +} + +func (e *slowQueryRetriever) close() error { + for _, f := range e.files { + err := f.file.Close() if err != nil { logutil.BgLogger().Error("close slow log file failed.", zap.Error(err)) } @@ -87,59 +118,69 @@ func (e *SlowQueryRetriever) close() error { return nil } -func (e *SlowQueryRetriever) dataForSlowLog(ctx sessionctx.Context) ([][]types.Datum, error) { - var hasProcessPriv bool - if pm := privilege.GetPrivilegeManager(ctx); pm != nil { - if pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv) { - hasProcessPriv = true - } - } - user := ctx.GetSessionVars().User - checkValid := func(userName string) bool { - if !hasProcessPriv && user != nil && userName != user.Username { - return false - } - return true - } - rows, fileLine, err := ParseSlowLog(ctx, bufio.NewReader(e.file), e.fileLine, 1024, checkValid) +func (e *slowQueryRetriever) dataForSlowLog(ctx sessionctx.Context) ([][]types.Datum, error) { + reader := bufio.NewReader(e.files[e.fileIdx].file) + rows, err := e.parseSlowLog(ctx, reader, 1024) if err != nil { - if err == io.EOF { - e.retrieved = true - } else { - return nil, err - } + return nil, err } - e.fileLine = fileLine if e.table.Name.L == strings.ToLower(infoschema.ClusterTableSlowLog) { return infoschema.AppendHostInfoToRows(rows) } return rows, nil } -type checkValidFunc func(string) bool +type slowLogChecker struct { + // Below fields is used to check privilege. + hasProcessPriv bool + user *auth.UserIdentity + // Below fields is used to check slow log time valid. + enableTimeCheck bool + startTime time.Time + endTime time.Time +} + +func (sc *slowLogChecker) hasPrivilege(userName string) bool { + return sc.hasProcessPriv || sc.user == nil || userName == sc.user.Username +} + +func (sc *slowLogChecker) isTimeValid(t time.Time) bool { + if sc.enableTimeCheck && (t.Before(sc.startTime) || t.After(sc.endTime)) { + return false + } + return true +} -// ParseSlowLog exports for testing. // TODO: optimize for parse huge log-file. -func ParseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, fileLine, maxRow int, checkValid checkValidFunc) ([][]types.Datum, int, error) { +func (e *slowQueryRetriever) parseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, maxRow int) ([][]types.Datum, error) { var rows [][]types.Datum + var st *slowQueryTuple startFlag := false - lineNum := fileLine tz := ctx.GetSessionVars().Location() - var st *slowQueryTuple for { if len(rows) >= maxRow { - return rows, lineNum, nil + return rows, nil } - lineNum++ + e.fileLine++ lineByte, err := getOneLine(reader) if err != nil { - return rows, lineNum, err + if err == io.EOF { + e.fileIdx++ + e.fileLine = 0 + if e.fileIdx >= len(e.files) { + e.retrieved = true + return rows, nil + } + reader = bufio.NewReader(e.files[e.fileIdx].file) + continue + } + return rows, err } line := string(hack.String(lineByte)) // Check slow log entry start flag. if !startFlag && strings.HasPrefix(line, variable.SlowLogStartPrefixStr) { st = &slowQueryTuple{} - valid, err := st.setFieldValue(tz, variable.SlowLogTimeStr, line[len(variable.SlowLogStartPrefixStr):], lineNum, checkValid) + valid, err := st.setFieldValue(tz, variable.SlowLogTimeStr, line[len(variable.SlowLogStartPrefixStr):], e.fileLine, e.checker) if err != nil { ctx.GetSessionVars().StmtCtx.AppendWarning(err) continue @@ -163,7 +204,7 @@ func ParseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, fileLine, maxRow if strings.HasSuffix(field, ":") { field = field[:len(field)-1] } - valid, err := st.setFieldValue(tz, field, fieldValues[i+1], lineNum, checkValid) + valid, err := st.setFieldValue(tz, field, fieldValues[i+1], e.fileLine, e.checker) if err != nil { ctx.GetSessionVars().StmtCtx.AppendWarning(err) continue @@ -175,12 +216,12 @@ func ParseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, fileLine, maxRow } } else if strings.HasSuffix(line, variable.SlowLogSQLSuffixStr) { // Get the sql string, and mark the start flag to false. - _, err = st.setFieldValue(tz, variable.SlowLogQuerySQLStr, string(hack.Slice(line)), lineNum, checkValid) + _, err = st.setFieldValue(tz, variable.SlowLogQuerySQLStr, string(hack.Slice(line)), e.fileLine, e.checker) if err != nil { ctx.GetSessionVars().StmtCtx.AppendWarning(err) continue } - if checkValid == nil || checkValid(st.user) { + if e.checker.hasPrivilege(st.user) { rows = append(rows, st.convertToDatumRow()) } startFlag = false @@ -271,7 +312,7 @@ type slowQueryTuple struct { planDigest string } -func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string, lineNum int, checkValid checkValidFunc) (valid bool, err error) { +func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string, lineNum int, checker *slowLogChecker) (valid bool, err error) { valid = true switch field { case variable.SlowLogTimeStr: @@ -282,6 +323,9 @@ func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string, if st.time.Location() != tz { st.time = st.time.In(tz) } + if checker != nil { + valid = checker.isTimeValid(st.time) + } case variable.SlowLogTxnStartTSStr: st.txnStartTs, err = strconv.ParseUint(value, 10, 64) case variable.SlowLogUserStr: @@ -292,8 +336,8 @@ func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string, if len(field) > 1 { st.host = fields[1] } - if checkValid != nil { - valid = checkValid(st.user) + if checker != nil { + valid = checker.hasPrivilege(st.user) } case variable.SlowLogConnIDStr: st.connID, err = strconv.ParseUint(value, 10, 64) @@ -465,3 +509,151 @@ func ParseTime(s string) (time.Time, error) { } return t, err } + +type logFile struct { + file *os.File // The opened file handle + start, end time.Time // The start/end time of the log file +} + +// getAllFiles is used to get all slow-log needed to parse, it is exported for test. +func (e *slowQueryRetriever) getAllFiles(sctx sessionctx.Context, logFilePath string) ([]logFile, error) { + if e.extractor == nil || !e.extractor.Enable { + file, err := os.Open(logFilePath) + if err != nil { + return nil, err + } + return []logFile{{file: file}}, nil + } + var logFiles []logFile + logDir := filepath.Dir(logFilePath) + ext := filepath.Ext(logFilePath) + prefix := logFilePath[:len(logFilePath)-len(ext)] + handleErr := func(err error) error { + // Ignore the error and append warning for usability. + if err != io.EOF { + sctx.GetSessionVars().StmtCtx.AppendWarning(err) + } + return nil + } + err := filepath.Walk(logDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return handleErr(err) + } + if info.IsDir() { + return nil + } + // All rotated log files have the same prefix with the original file. + if !strings.HasPrefix(path, prefix) { + return nil + } + file, err := os.OpenFile(path, os.O_RDONLY, os.ModePerm) + if err != nil { + return handleErr(err) + } + skip := false + defer func() { + if !skip { + terror.Log(file.Close()) + } + }() + // Get the file start time. + fileStartTime, err := e.getFileStartTime(file) + if err != nil { + return handleErr(err) + } + if fileStartTime.After(e.extractor.EndTime) { + return nil + } + + // Get the file end time. + fileEndTime, err := e.getFileEndTime(file) + if err != nil { + return handleErr(err) + } + if fileEndTime.Before(e.extractor.StartTime) { + return nil + } + _, err = file.Seek(0, io.SeekStart) + if err != nil { + return handleErr(err) + } + logFiles = append(logFiles, logFile{ + file: file, + start: fileStartTime, + end: fileEndTime, + }) + skip = true + return nil + }) + // Sort by start time + sort.Slice(logFiles, func(i, j int) bool { + return logFiles[i].start.Before(logFiles[j].start) + }) + return logFiles, err +} + +func (e *slowQueryRetriever) getFileStartTime(file *os.File) (time.Time, error) { + var t time.Time + _, err := file.Seek(0, io.SeekStart) + if err != nil { + return t, err + } + reader := bufio.NewReader(file) + maxNum := 128 + for { + lineByte, err := getOneLine(reader) + if err != nil { + return t, err + } + line := string(lineByte) + if strings.HasPrefix(line, variable.SlowLogStartPrefixStr) { + return ParseTime(line[len(variable.SlowLogStartPrefixStr):]) + } + maxNum -= 1 + if maxNum <= 0 { + break + } + } + return t, errors.Errorf("malform slow query file %v", file.Name()) +} +func (e *slowQueryRetriever) getFileEndTime(file *os.File) (time.Time, error) { + var t time.Time + stat, err := file.Stat() + if err != nil { + return t, err + } + fileSize := stat.Size() + cursor := int64(0) + line := make([]byte, 0, 64) + maxLineNum := 128 + for { + cursor -= 1 + _, err := file.Seek(cursor, io.SeekEnd) + if err != nil { + return t, err + } + + char := make([]byte, 1) + _, err = file.Read(char) + if err != nil { + return t, err + } + // If find a line. + if cursor != -1 && (char[0] == '\n' || char[0] == '\r') { + for i, j := 0, len(line)-1; i < j; i, j = i+1, j-1 { + line[i], line[j] = line[j], line[i] + } + lineStr := string(line) + lineStr = strings.TrimSpace(lineStr) + if strings.HasPrefix(lineStr, variable.SlowLogStartPrefixStr) { + return ParseTime(lineStr[len(variable.SlowLogStartPrefixStr):]) + } + line = line[:0] + maxLineNum -= 1 + } + line = append(line, char[0]) + if cursor == -fileSize || maxLineNum <= 0 { + return t, errors.Errorf("malform slow query file %v", file.Name()) + } + } +} diff --git a/executor/slow_query_test.go b/executor/slow_query_test.go index 07c654e868ffe..cd02389897496 100644 --- a/executor/slow_query_test.go +++ b/executor/slow_query_test.go @@ -11,17 +11,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -package executor_test +package executor import ( "bufio" "bytes" "io" + "os" "strings" "time" . "github.com/pingcap/check" - "github.com/pingcap/tidb/executor" + "github.com/pingcap/parser/terror" + plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" @@ -29,15 +31,18 @@ import ( "github.com/pingcap/tidb/util/mock" ) -func parseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, checkValid func(string) bool) ([][]types.Datum, int, error) { - rows, lineNum, err := executor.ParseSlowLog(ctx, reader, 0, 1024, checkValid) +func parseSlowLog(ctx sessionctx.Context, reader *bufio.Reader) ([][]types.Datum, error) { + retriever := &slowQueryRetriever{} + // Ignore the error is ok for test. + terror.Log(retriever.initialize(ctx)) + rows, err := retriever.parseSlowLog(ctx, reader, 1024) if err == io.EOF { err = nil } - return rows, lineNum, err + return rows, err } -func (s *testSuite) TestParseSlowLogFile(c *C) { +func (s *testExecSuite) TestParseSlowLogFile(c *C) { slowLogStr := `# Time: 2019-04-28T15:24:04.309074+08:00 # Txn_start_ts: 405888132465033227 @@ -56,13 +61,9 @@ select * from t;` reader := bufio.NewReader(bytes.NewBufferString(slowLogStr)) loc, err := time.LoadLocation("Asia/Shanghai") c.Assert(err, IsNil) - s.ctx = mock.NewContext() - s.ctx.GetSessionVars().TimeZone = loc - rows, _, err := parseSlowLog(s.ctx, reader, func(_ string) bool { return false }) - c.Assert(err, IsNil) - c.Assert(len(rows), Equals, 0) - reader = bufio.NewReader(bytes.NewBufferString(slowLogStr)) - rows, _, err = parseSlowLog(s.ctx, reader, nil) + ctx := mock.NewContext() + ctx.GetSessionVars().TimeZone = loc + rows, err := parseSlowLog(ctx, reader) c.Assert(err, IsNil) c.Assert(len(rows), Equals, 1) recordString := "" @@ -92,7 +93,7 @@ select a# from t; select * from t; `) reader = bufio.NewReader(slowLog) - _, _, err = parseSlowLog(s.ctx, reader, nil) + _, err = parseSlowLog(ctx, reader) c.Assert(err, IsNil) // test for time format compatibility. @@ -103,7 +104,7 @@ select * from t; select * from t; `) reader = bufio.NewReader(slowLog) - rows, _, err = parseSlowLog(s.ctx, reader, nil) + rows, err = parseSlowLog(ctx, reader) c.Assert(err, IsNil) c.Assert(len(rows) == 2, IsTrue) t0Str, err := rows[0][0].ToString() @@ -124,13 +125,13 @@ select * from t; sql := strings.Repeat("x", int(variable.MaxOfMaxAllowedPacket+1)) slowLog.WriteString(sql) reader = bufio.NewReader(slowLog) - _, _, err = parseSlowLog(s.ctx, reader, nil) + _, err = parseSlowLog(ctx, reader) c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "single line length exceeds limit: 65536") variable.MaxOfMaxAllowedPacket = originValue reader = bufio.NewReader(slowLog) - _, _, err = parseSlowLog(s.ctx, reader, nil) + _, err = parseSlowLog(ctx, reader) c.Assert(err, IsNil) // Add parse error check. @@ -140,17 +141,17 @@ select * from t; select * from t; `) reader = bufio.NewReader(slowLog) - _, _, err = parseSlowLog(s.ctx, reader, nil) + _, err = parseSlowLog(ctx, reader) c.Assert(err, IsNil) - warnings := s.ctx.GetSessionVars().StmtCtx.GetWarnings() + warnings := ctx.GetSessionVars().StmtCtx.GetWarnings() c.Assert(warnings, HasLen, 1) c.Assert(warnings[0].Err.Error(), Equals, "Parse slow log at line 2 failed. Field: `Succ`, error: strconv.ParseBool: parsing \"abc\": invalid syntax") } -func (s *testSuite) TestSlowLogParseTime(c *C) { +func (s *testExecSuite) TestSlowLogParseTime(c *C) { t1Str := "2019-01-24T22:32:29.313255+08:00" t2Str := "2019-01-24T22:32:29.313255" - t1, err := executor.ParseTime(t1Str) + t1, err := ParseTime(t1Str) c.Assert(err, IsNil) loc, err := time.LoadLocation("Asia/Shanghai") c.Assert(err, IsNil) @@ -164,7 +165,7 @@ func (s *testSuite) TestSlowLogParseTime(c *C) { // TestFixParseSlowLogFile bugfix // sql select * from INFORMATION_SCHEMA.SLOW_QUERY limit 1; // ERROR 1105 (HY000): string "2019-05-12-11:23:29.61474688" doesn't has a prefix that matches format "2006-01-02-15:04:05.999999999 -0700", err: parsing time "2019-05-12-11:23:29.61474688" as "2006-01-02-15:04:05.999999999 -0700": cannot parse "" as "-0700" -func (s *testSuite) TestFixParseSlowLogFile(c *C) { +func (s *testExecSuite) TestFixParseSlowLogFile(c *C) { slowLog := bytes.NewBufferString( `# Time: 2019-05-12-11:23:29.614327491 +0800 # Txn_start_ts: 405888132465033227 @@ -192,9 +193,9 @@ select * from t;`) scanner := bufio.NewReader(slowLog) loc, err := time.LoadLocation("Asia/Shanghai") c.Assert(err, IsNil) - s.ctx = mock.NewContext() - s.ctx.GetSessionVars().TimeZone = loc - _, _, err = parseSlowLog(s.ctx, scanner, nil) + ctx := mock.NewContext() + ctx.GetSessionVars().TimeZone = loc + _, err = parseSlowLog(ctx, scanner) c.Assert(err, IsNil) // Test parser error. @@ -204,10 +205,164 @@ select * from t;`) `) scanner = bufio.NewReader(slowLog) - _, _, err = parseSlowLog(s.ctx, scanner, nil) + _, err = parseSlowLog(ctx, scanner) c.Assert(err, IsNil) - warnings := s.ctx.GetSessionVars().StmtCtx.GetWarnings() + warnings := ctx.GetSessionVars().StmtCtx.GetWarnings() c.Assert(warnings, HasLen, 1) c.Assert(warnings[0].Err.Error(), Equals, "Parse slow log at line 2 failed. Field: `Txn_start_ts`, error: strconv.ParseUint: parsing \"405888132465033227#\": invalid syntax") } + +func (s *testExecSuite) TestSlowQueryRetriever(c *C) { + writeFile := func(file string, data string) { + f, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY, 0644) + c.Assert(err, IsNil) + _, err = f.Write([]byte(data)) + c.Assert(f.Close(), IsNil) + c.Assert(err, IsNil) + } + + logData0 := "" + logData1 := ` +# Time: 2020-02-15T18:00:01.000000+08:00 +select 1; +# Time: 2020-02-15T19:00:05.000000+08:00 +select 2;` + logData2 := ` +# Time: 2020-02-16T18:00:01.000000+08:00 +select 3; +# Time: 2020-02-16T18:00:05.000000+08:00 +select 4;` + logData3 := ` +# Time: 2020-02-16T19:00:00.000000+08:00 +select 5; +# Time: 2020-02-17T18:00:05.000000+08:00 +select 6;` + + fileName0 := "tidb-slow-2020-02-14T19-04-05.01.log" + fileName1 := "tidb-slow-2020-02-15T19-04-05.01.log" + fileName2 := "tidb-slow-2020-02-16T19-04-05.01.log" + fileName3 := "tidb-slow.log" + writeFile(fileName0, logData0) + writeFile(fileName1, logData1) + writeFile(fileName2, logData2) + writeFile(fileName3, logData3) + defer func() { + os.Remove(fileName0) + os.Remove(fileName1) + os.Remove(fileName2) + os.Remove(fileName3) + }() + + cases := []struct { + startTime string + endTime string + files []string + querys []string + }{ + { + startTime: "2020-02-15T18:00:00.000000+08:00", + endTime: "2020-02-17T20:00:00.000000+08:00", + files: []string{fileName1, fileName2, fileName3}, + querys: []string{ + "select 1;", + "select 2;", + "select 3;", + "select 4;", + "select 5;", + "select 6;", + }, + }, + { + startTime: "2020-02-15T18:00:02.000000+08:00", + endTime: "2020-02-16T20:00:00.000000+08:00", + files: []string{fileName1, fileName2, fileName3}, + querys: []string{ + "select 2;", + "select 3;", + "select 4;", + "select 5;", + }, + }, + { + startTime: "2020-02-16T18:00:03.000000+08:00", + endTime: "2020-02-16T18:59:00.000000+08:00", + files: []string{fileName2}, + querys: []string{ + "select 4;", + }, + }, + { + startTime: "2020-02-16T18:00:03.000000+08:00", + endTime: "2020-02-16T20:00:00.000000+08:00", + files: []string{fileName2, fileName3}, + querys: []string{ + "select 4;", + "select 5;", + }, + }, + { + startTime: "2020-02-16T19:00:00.000000+08:00", + endTime: "2020-02-17T17:00:00.000000+08:00", + files: []string{fileName3}, + querys: []string{ + "select 5;", + }, + }, + { + startTime: "2010-01-01T00:00:00.000000+08:00", + endTime: "2010-01-01T01:00:00.000000+08:00", + files: []string{}, + }, + { + startTime: "2020-03-01T00:00:00.000000+08:00", + endTime: "2010-03-01T01:00:00.000000+08:00", + files: []string{}, + }, + { + startTime: "", + endTime: "", + files: []string{fileName3}, + querys: []string{ + "select 5;", + "select 6;", + }, + }, + } + + loc, err := time.LoadLocation("Asia/Shanghai") + c.Assert(err, IsNil) + ctx := mock.NewContext() + ctx.GetSessionVars().TimeZone = loc + ctx.GetSessionVars().SlowQueryFile = fileName3 + for i, cas := range cases { + extractor := &plannercore.SlowQueryExtractor{Enable: (len(cas.startTime) > 0 && len(cas.endTime) > 0)} + if extractor.Enable { + startTime, err := ParseTime(cas.startTime) + c.Assert(err, IsNil) + endTime, err := ParseTime(cas.endTime) + c.Assert(err, IsNil) + extractor.StartTime = startTime + extractor.EndTime = endTime + + } + retriever := &slowQueryRetriever{extractor: extractor} + err := retriever.initialize(ctx) + c.Assert(err, IsNil) + comment := Commentf("case id: %v", i) + c.Assert(retriever.files, HasLen, len(cas.files), comment) + if len(retriever.files) > 0 { + rows, err := retriever.parseSlowLog(ctx, bufio.NewReader(retriever.files[0].file), 1024) + c.Assert(err, IsNil) + c.Assert(len(rows), Equals, len(cas.querys), comment) + for i, row := range rows { + c.Assert(row[len(row)-1].GetString(), Equals, cas.querys[i], comment) + } + } + + for i, file := range retriever.files { + c.Assert(file.file.Name(), Equals, cas.files[i]) + c.Assert(file.file.Close(), IsNil) + } + } +} diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index c664a443ee409..db378eb115199 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -2797,6 +2797,8 @@ func (b *PlanBuilder) buildMemTable(ctx context.Context, dbName model.CIStr, tab p.Extractor = &InspectionSummaryTableExtractor{} case infoschema.TableMetricSummary, infoschema.TableMetricSummaryByLabel: p.Extractor = newMetricTableExtractor() + case infoschema.TableSlowQuery: + p.Extractor = &SlowQueryExtractor{} } } return p, nil diff --git a/planner/core/memtable_predicate_extractor.go b/planner/core/memtable_predicate_extractor.go index b27d09204f257..8e74f8dd02d00 100644 --- a/planner/core/memtable_predicate_extractor.go +++ b/planner/core/memtable_predicate_extractor.go @@ -382,6 +382,13 @@ func (helper extractHelper) parseQuantiles(quantileSet set.StringSet) []float64 return quantiles } +func (helper extractHelper) convertToTime(t int64) time.Time { + if t == 0 || t == math.MaxInt64 { + return time.Now() + } + return time.Unix(t/1000, (t%1000)*int64(time.Millisecond)) +} + // ClusterTableExtractor is used to extract some predicates of cluster table. type ClusterTableExtractor struct { extractHelper @@ -573,13 +580,6 @@ func (e *MetricTableExtractor) getTimeRange(start, end int64) (time.Time, time.T return startTime, endTime } -func (e *MetricTableExtractor) convertToTime(t int64) time.Time { - if t == 0 || t == math.MaxInt64 { - return time.Now() - } - return time.Unix(t/1000, (t%1000)*int64(time.Millisecond)) -} - // InspectionResultTableExtractor is used to extract some predicates of `inspection_result` type InspectionResultTableExtractor struct { extractHelper @@ -640,3 +640,54 @@ func (e *InspectionSummaryTableExtractor) Extract( e.MetricNames = metricNames return remained } + +// SlowQueryExtractor is used to extract some predicates of `slow_query` +type SlowQueryExtractor struct { + extractHelper + + SkipRequest bool + StartTime time.Time + EndTime time.Time + // Enable is true means the executor should use the time range to locate the slow-log file that need to be parsed. + // Enable is false, means the executor should keep the behavior compatible with before, which is only parse the + // current slow-log file. + Enable bool +} + +// Extract implements the MemTablePredicateExtractor Extract interface +func (e *SlowQueryExtractor) Extract( + ctx sessionctx.Context, + schema *expression.Schema, + names []*types.FieldName, + predicates []expression.Expression, +) []expression.Expression { + remained, startTime, endTime := e.extractTimeRange(ctx, schema, names, predicates, "time", ctx.GetSessionVars().StmtCtx.TimeZone) + e.setTimeRange(startTime, endTime) + e.SkipRequest = e.Enable && e.StartTime.After(e.EndTime) + if e.SkipRequest { + return nil + } + return remained +} + +func (e *SlowQueryExtractor) setTimeRange(start, end int64) { + const defaultSlowQueryDuration = 24 * time.Hour + var startTime, endTime time.Time + if start == 0 && end == 0 { + return + } + if start != 0 { + startTime = e.convertToTime(start) + } + if end != 0 { + endTime = e.convertToTime(end) + } + if start == 0 { + startTime = endTime.Add(-defaultSlowQueryDuration) + } + if end == 0 { + endTime = startTime.Add(defaultSlowQueryDuration) + } + e.StartTime, e.EndTime = startTime, endTime + e.Enable = true +}