Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support next chunk for slow_query #14754

Merged
merged 11 commits into from
Feb 18, 2020
8 changes: 8 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1345,6 +1345,14 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
columns: v.Columns,
},
}
case strings.ToLower(infoschema.TableSlowQuery), strings.ToLower(infoschema.ClusterTableSlowLog):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
retriever: &SlowQueryRetriever{
table: v.Table,
outputCols: v.Columns,
},
}
}
}
tb, _ := b.is.TableByID(v.Table.ID)
Expand Down
152 changes: 78 additions & 74 deletions infoschema/slow_log.go → executor/slow_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package infoschema
package executor

import (
"bufio"
"context"
"io"
"os"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand All @@ -34,56 +37,57 @@ import (
"go.uber.org/zap"
)

var slowQueryCols = []columnInfo{
{variable.SlowLogTimeStr, mysql.TypeTimestamp, 26, 0, nil, nil},
{variable.SlowLogTxnStartTSStr, mysql.TypeLonglong, 20, mysql.UnsignedFlag, nil, nil},
{variable.SlowLogUserStr, mysql.TypeVarchar, 64, 0, nil, nil},
{variable.SlowLogHostStr, mysql.TypeVarchar, 64, 0, nil, nil},
{variable.SlowLogConnIDStr, mysql.TypeLonglong, 20, mysql.UnsignedFlag, nil, nil},
{variable.SlowLogQueryTimeStr, mysql.TypeDouble, 22, 0, nil, nil},
{variable.SlowLogParseTimeStr, mysql.TypeDouble, 22, 0, nil, nil},
{variable.SlowLogCompileTimeStr, mysql.TypeDouble, 22, 0, nil, nil},
{execdetails.PreWriteTimeStr, mysql.TypeDouble, 22, 0, nil, nil},
{execdetails.BinlogPrewriteTimeStr, mysql.TypeDouble, 22, 0, nil, nil},
{execdetails.CommitTimeStr, mysql.TypeDouble, 22, 0, nil, nil},
{execdetails.GetCommitTSTimeStr, mysql.TypeDouble, 22, 0, nil, nil},
{execdetails.CommitBackoffTimeStr, mysql.TypeDouble, 22, 0, nil, nil},
{execdetails.BackoffTypesStr, mysql.TypeVarchar, 64, 0, nil, nil},
{execdetails.ResolveLockTimeStr, mysql.TypeDouble, 22, 0, nil, nil},
{execdetails.LocalLatchWaitTimeStr, mysql.TypeDouble, 22, 0, nil, nil},
{execdetails.WriteKeysStr, mysql.TypeLonglong, 22, 0, nil, nil},
{execdetails.WriteSizeStr, mysql.TypeLonglong, 22, 0, nil, nil},
{execdetails.PrewriteRegionStr, mysql.TypeLonglong, 22, 0, nil, nil},
{execdetails.TxnRetryStr, mysql.TypeLonglong, 22, 0, nil, nil},
{execdetails.ProcessTimeStr, mysql.TypeDouble, 22, 0, nil, nil},
{execdetails.WaitTimeStr, mysql.TypeDouble, 22, 0, nil, nil},
{execdetails.BackoffTimeStr, mysql.TypeDouble, 22, 0, nil, nil},
{execdetails.LockKeysTimeStr, mysql.TypeDouble, 22, 0, nil, nil},
{execdetails.RequestCountStr, mysql.TypeLonglong, 20, mysql.UnsignedFlag, nil, nil},
{execdetails.TotalKeysStr, mysql.TypeLonglong, 20, mysql.UnsignedFlag, nil, nil},
{execdetails.ProcessKeysStr, mysql.TypeLonglong, 20, mysql.UnsignedFlag, nil, nil},
{variable.SlowLogDBStr, mysql.TypeVarchar, 64, 0, nil, nil},
{variable.SlowLogIndexNamesStr, mysql.TypeVarchar, 100, 0, nil, nil},
{variable.SlowLogIsInternalStr, mysql.TypeTiny, 1, 0, nil, nil},
{variable.SlowLogDigestStr, mysql.TypeVarchar, 64, 0, nil, nil},
{variable.SlowLogStatsInfoStr, mysql.TypeVarchar, 512, 0, nil, nil},
{variable.SlowLogCopProcAvg, mysql.TypeDouble, 22, 0, nil, nil},
{variable.SlowLogCopProcP90, mysql.TypeDouble, 22, 0, nil, nil},
{variable.SlowLogCopProcMax, mysql.TypeDouble, 22, 0, nil, nil},
{variable.SlowLogCopProcAddr, mysql.TypeVarchar, 64, 0, nil, nil},
{variable.SlowLogCopWaitAvg, mysql.TypeDouble, 22, 0, nil, nil},
{variable.SlowLogCopWaitP90, mysql.TypeDouble, 22, 0, nil, nil},
{variable.SlowLogCopWaitMax, mysql.TypeDouble, 22, 0, nil, nil},
{variable.SlowLogCopWaitAddr, mysql.TypeVarchar, 64, 0, nil, nil},
{variable.SlowLogMemMax, mysql.TypeLonglong, 20, 0, nil, nil},
{variable.SlowLogSucc, mysql.TypeTiny, 1, 0, nil, nil},
{variable.SlowLogPlan, mysql.TypeLongBlob, types.UnspecifiedLength, 0, nil, nil},
{variable.SlowLogPlanDigest, mysql.TypeVarchar, 128, 0, nil, nil},
{variable.SlowLogPrevStmt, mysql.TypeLongBlob, types.UnspecifiedLength, 0, nil, nil},
{variable.SlowLogQuerySQLStr, mysql.TypeLongBlob, types.UnspecifiedLength, 0, nil, nil},
//SlowQueryRetriever is used to read slow log data.
type SlowQueryRetriever struct {
table *model.TableInfo
outputCols []*model.ColumnInfo
retrieved bool
initialized bool
file *os.File
fileLine int
}

func dataForSlowLog(ctx sessionctx.Context) ([][]types.Datum, error) {
func (e *SlowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
if e.retrieved {
return nil, nil
}
if !e.initialized {
var err error
e.file, err = os.Open(sctx.GetSessionVars().SlowQueryFile)
if err != nil {
return nil, err
}
e.initialized = true
}
rows, err := e.dataForSlowLog(sctx)
if err != nil {
return nil, err
}
if len(e.outputCols) == len(e.table.Columns) {
return rows, nil
}
retRows := make([][]types.Datum, len(rows))
for i, fullRow := range rows {
row := make([]types.Datum, len(e.outputCols))
for j, col := range e.outputCols {
row[j] = fullRow[col.Offset]
}
retRows[i] = row
}
return retRows, nil
}

func (e *SlowQueryRetriever) close() error {
if e.file != nil {
err := e.file.Close()
if err != nil {
logutil.BgLogger().Error("close slow log file failed.", zap.Error(err))
}
}
return nil
}

func (e *SlowQueryRetriever) dataForSlowLog(ctx sessionctx.Context) ([][]types.Datum, error) {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
var hasProcessPriv bool
if pm := privilege.GetPrivilegeManager(ctx); pm != nil {
if pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv) {
Expand All @@ -97,50 +101,48 @@ func dataForSlowLog(ctx sessionctx.Context) ([][]types.Datum, error) {
}
return true
}
return parseSlowLogFile(ctx.GetSessionVars().Location(), ctx.GetSessionVars().SlowQueryFile, checkValid)
}

// parseSlowLogFile uses to parse slow log file.
// TODO: Support parse multiple log-files.
func parseSlowLogFile(tz *time.Location, filePath string, checkValid checkValidFunc) ([][]types.Datum, error) {
file, err := os.Open(filePath)
rows, fileLine, err := ParseSlowLog(ctx, bufio.NewReader(e.file), e.fileLine, 1024, checkValid)
if err != nil {
return nil, errors.Trace(err)
}
defer func() {
if err = file.Close(); err != nil {
logutil.BgLogger().Error("close slow log file failed.", zap.String("file", filePath), zap.Error(err))
if err == io.EOF {
e.retrieved = true
} else {
return nil, err
}
}()

return ParseSlowLog(tz, bufio.NewReader(file), checkValid)
}
e.fileLine = fileLine
if e.table.Name.L == strings.ToLower(infoschema.ClusterTableSlowLog) {
return infoschema.AppendHostInfoToRows(rows)
}
return rows, nil
}

type checkValidFunc func(string) bool

// ParseSlowLog exports for testing.
// TODO: optimize for parse huge log-file.
func ParseSlowLog(tz *time.Location, reader *bufio.Reader, checkValid checkValidFunc) ([][]types.Datum, error) {
func ParseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, fileLine, maxRow int, checkValid checkValidFunc) ([][]types.Datum, int, error) {
var rows [][]types.Datum
startFlag := false
lineNum := fileLine
tz := ctx.GetSessionVars().Location()
var st *slowQueryTuple
lineNum := 0
for {
if len(rows) >= maxRow {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
return rows, lineNum, nil
}
lineNum++
lineByte, err := getOneLine(reader)
if err != nil {
if err == io.EOF {
return rows, nil
}
return rows, err
return rows, lineNum, err
}
line := string(hack.String(lineByte))
// Check slow log entry start flag.
if !startFlag && strings.HasPrefix(line, variable.SlowLogStartPrefixStr) {
st = &slowQueryTuple{}
valid, err := st.setFieldValue(tz, variable.SlowLogTimeStr, line[len(variable.SlowLogStartPrefixStr):], lineNum, checkValid)
if err != nil {
return rows, err
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
}
if valid {
startFlag = true
Expand All @@ -163,7 +165,8 @@ func ParseSlowLog(tz *time.Location, reader *bufio.Reader, checkValid checkValid
}
valid, err := st.setFieldValue(tz, field, fieldValues[i+1], lineNum, checkValid)
if err != nil {
return rows, err
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
}
if !valid {
startFlag = false
Expand All @@ -174,7 +177,8 @@ func ParseSlowLog(tz *time.Location, reader *bufio.Reader, checkValid checkValid
// Get the sql string, and mark the start flag to false.
_, err = st.setFieldValue(tz, variable.SlowLogQuerySQLStr, string(hack.Slice(line)), lineNum, checkValid)
if err != nil {
return rows, err
ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
}
if checkValid == nil || checkValid(st.user) {
rows = append(rows, st.convertToDatumRow())
Expand Down Expand Up @@ -381,7 +385,7 @@ func (st *slowQueryTuple) setFieldValue(tz *time.Location, field, value string,
}

func (st *slowQueryTuple) convertToDatumRow() []types.Datum {
record := make([]types.Datum, 0, len(slowQueryCols))
record := make([]types.Datum, 0, 64)
reafans marked this conversation as resolved.
Show resolved Hide resolved
record = append(record, types.NewTimeDatum(types.NewTime(types.FromGoTime(st.time), mysql.TypeDatetime, types.MaxFsp)))
record = append(record, types.NewUintDatum(st.txnStartTs))
record = append(record, types.NewStringDatum(st.user))
Expand Down
52 changes: 36 additions & 16 deletions infoschema/slow_log_test.go → executor/slow_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,32 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package infoschema_test
package executor_test

import (
"bufio"
"bytes"
"io"
"strings"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mock"
)

func parseSlowLog(ctx sessionctx.Context, reader *bufio.Reader, checkValid func(string) bool) ([][]types.Datum, int, error) {
rows, lineNum, err := executor.ParseSlowLog(ctx, reader, 0, 1024, checkValid)
if err == io.EOF {
err = nil
}
return rows, lineNum, err
}

func (s *testSuite) TestParseSlowLogFile(c *C) {
slowLogStr :=
`# Time: 2019-04-28T15:24:04.309074+08:00
Expand All @@ -44,11 +56,13 @@ select * from t;`
reader := bufio.NewReader(bytes.NewBufferString(slowLogStr))
loc, err := time.LoadLocation("Asia/Shanghai")
c.Assert(err, IsNil)
rows, err := infoschema.ParseSlowLog(loc, reader, func(_ string) bool { return false })
s.ctx = mock.NewContext()
s.ctx.GetSessionVars().TimeZone = loc
rows, _, err := parseSlowLog(s.ctx, reader, func(_ string) bool { return false })
c.Assert(err, IsNil)
c.Assert(len(rows), Equals, 0)
reader = bufio.NewReader(bytes.NewBufferString(slowLogStr))
rows, err = infoschema.ParseSlowLog(loc, reader, nil)
rows, _, err = parseSlowLog(s.ctx, reader, nil)
c.Assert(err, IsNil)
c.Assert(len(rows), Equals, 1)
recordString := ""
Expand Down Expand Up @@ -78,7 +92,7 @@ select a# from t;
select * from t;
`)
reader = bufio.NewReader(slowLog)
_, err = infoschema.ParseSlowLog(loc, reader, nil)
_, _, err = parseSlowLog(s.ctx, reader, nil)
c.Assert(err, IsNil)

// test for time format compatibility.
Expand All @@ -89,7 +103,7 @@ select * from t;
select * from t;
`)
reader = bufio.NewReader(slowLog)
rows, err = infoschema.ParseSlowLog(loc, reader, nil)
rows, _, err = parseSlowLog(s.ctx, reader, nil)
c.Assert(err, IsNil)
c.Assert(len(rows) == 2, IsTrue)
t0Str, err := rows[0][0].ToString()
Expand All @@ -110,13 +124,13 @@ select * from t;
sql := strings.Repeat("x", int(variable.MaxOfMaxAllowedPacket+1))
slowLog.WriteString(sql)
reader = bufio.NewReader(slowLog)
_, err = infoschema.ParseSlowLog(loc, reader, nil)
_, _, err = parseSlowLog(s.ctx, reader, nil)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "single line length exceeds limit: 65536")

variable.MaxOfMaxAllowedPacket = originValue
reader = bufio.NewReader(slowLog)
_, err = infoschema.ParseSlowLog(loc, reader, nil)
_, _, err = parseSlowLog(s.ctx, reader, nil)
c.Assert(err, IsNil)

// Add parse error check.
Expand All @@ -126,15 +140,17 @@ select * from t;
select * from t;
`)
reader = bufio.NewReader(slowLog)
_, err = infoschema.ParseSlowLog(loc, reader, nil)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "Parse slow log at line 2 failed. Field: `Succ`, error: strconv.ParseBool: parsing \"abc\": invalid syntax")
_, _, err = parseSlowLog(s.ctx, reader, nil)
c.Assert(err, IsNil)
warnings := s.ctx.GetSessionVars().StmtCtx.GetWarnings()
c.Assert(warnings, HasLen, 1)
c.Assert(warnings[0].Err.Error(), Equals, "Parse slow log at line 2 failed. Field: `Succ`, error: strconv.ParseBool: parsing \"abc\": invalid syntax")
}

func (s *testSuite) TestSlowLogParseTime(c *C) {
t1Str := "2019-01-24T22:32:29.313255+08:00"
t2Str := "2019-01-24T22:32:29.313255"
t1, err := infoschema.ParseTime(t1Str)
t1, err := executor.ParseTime(t1Str)
c.Assert(err, IsNil)
loc, err := time.LoadLocation("Asia/Shanghai")
c.Assert(err, IsNil)
Expand Down Expand Up @@ -176,7 +192,9 @@ select * from t;`)
scanner := bufio.NewReader(slowLog)
loc, err := time.LoadLocation("Asia/Shanghai")
c.Assert(err, IsNil)
_, err = infoschema.ParseSlowLog(loc, scanner, nil)
s.ctx = mock.NewContext()
s.ctx.GetSessionVars().TimeZone = loc
_, _, err = parseSlowLog(s.ctx, scanner, nil)
c.Assert(err, IsNil)

// Test parser error.
Expand All @@ -186,8 +204,10 @@ select * from t;`)
`)

scanner = bufio.NewReader(slowLog)
_, err = infoschema.ParseSlowLog(loc, scanner, nil)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "Parse slow log at line 2 failed. Field: `Txn_start_ts`, error: strconv.ParseUint: parsing \"405888132465033227#\": invalid syntax")
_, _, err = parseSlowLog(s.ctx, scanner, nil)
c.Assert(err, IsNil)
warnings := s.ctx.GetSessionVars().StmtCtx.GetWarnings()
c.Assert(warnings, HasLen, 1)
c.Assert(warnings[0].Err.Error(), Equals, "Parse slow log at line 2 failed. Field: `Txn_start_ts`, error: strconv.ParseUint: parsing \"405888132465033227#\": invalid syntax")

}
Loading