Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add a column describing memory usage for table information_schema processlist (#10837) #10896

Merged
merged 5 commits into from
Jun 27, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,6 @@ func (s *testSuite) TestAggregation(c *C) {
result = tk.MustQuery("SELECT COALESCE ( + 1, cor0.col0 ) + - CAST( NULL AS DECIMAL ) FROM t2, t1 AS cor0, t2 AS cor1 GROUP BY cor0.col1")
result.Check(testkit.Rows("<nil>", "<nil>"))

result = tk.MustQuery("select count(*) from information_schema.columns")
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
// When adding new memory columns in information_schema, please update this variable.
columnCountOfAllInformationSchemaTables := "793"
result.Check(testkit.Rows(columnCountOfAllInformationSchemaTables))

tk.MustExec("drop table if exists t1")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t1 (c1 int)")
Expand Down
16 changes: 8 additions & 8 deletions executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ type testExecSuite struct {

// mockSessionManager is a mocked session manager which is used for test.
type mockSessionManager struct {
PS []util.ProcessInfo
PS []*util.ProcessInfo
}

// ShowProcessList implements the SessionManager.ShowProcessList interface.
func (msm *mockSessionManager) ShowProcessList() map[uint64]util.ProcessInfo {
ret := make(map[uint64]util.ProcessInfo)
func (msm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo {
ret := make(map[uint64]*util.ProcessInfo)
for _, item := range msm.PS {
ret[item.ID] = item
}
Expand All @@ -56,19 +56,19 @@ func (msm *mockSessionManager) Kill(cid uint64, query bool) {

func (s *testExecSuite) TestShowProcessList(c *C) {
// Compose schema.
names := []string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info", "Mem"}
names := []string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info"}
ftypes := []byte{mysql.TypeLonglong, mysql.TypeVarchar, mysql.TypeVarchar,
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLong, mysql.TypeVarchar, mysql.TypeString, mysql.TypeLonglong}
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLong, mysql.TypeVarchar, mysql.TypeString}
schema := buildSchema(names, ftypes)

// Compose a mocked session manager.
ps := make([]util.ProcessInfo, 0, 1)
pi := util.ProcessInfo{
ps := make([]*util.ProcessInfo, 0, 1)
pi := &util.ProcessInfo{
ID: 0,
User: "test",
Host: "127.0.0.1",
DB: "test",
Command: "select * from t",
Command: 't',
State: 1,
Info: "",
}
Expand Down
20 changes: 2 additions & 18 deletions executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,24 +203,8 @@ func (e *ShowExec) fetchShowProcessList() error {

pl := sm.ShowProcessList()
for _, pi := range pl {
var info string
if e.Full {
info = pi.Info
} else {
info = fmt.Sprintf("%.100v", pi.Info)
}

e.appendRow([]interface{}{
pi.ID,
pi.User,
pi.Host,
pi.DB,
pi.Command,
uint64(time.Since(pi.Time) / time.Second),
fmt.Sprintf("%d", pi.State),
info,
pi.Mem,
})
row := pi.ToRowForShow(e.Full)
e.appendRow(row)
}
return nil
}
Expand Down
19 changes: 4 additions & 15 deletions infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,8 @@ var tableProcesslistCols = []columnInfo{
{"COMMAND", mysql.TypeVarchar, 16, mysql.NotNullFlag, "", nil},
{"TIME", mysql.TypeLong, 7, mysql.NotNullFlag, 0, nil},
{"STATE", mysql.TypeVarchar, 7, 0, nil, nil},
{"Info", mysql.TypeString, 512, 0, nil, nil},
{"INFO", mysql.TypeString, 512, 0, nil, nil},
{"MEM", mysql.TypeLonglong, 21, 0, nil, nil},
}

var tableTiDBIndexesCols = []columnInfo{
Expand Down Expand Up @@ -626,20 +627,8 @@ func dataForProcesslist(ctx sessionctx.Context) [][]types.Datum {
var records [][]types.Datum
pl := sm.ShowProcessList()
for _, pi := range pl {
var t uint64
if len(pi.Info) != 0 {
t = uint64(time.Since(pi.Time) / time.Second)
}
record := types.MakeDatums(
pi.ID,
pi.User,
pi.Host,
pi.DB,
pi.Command,
t,
fmt.Sprintf("%d", pi.State),
pi.Info,
)
rows := pi.ToRow()
record := types.MakeDatums(rows...)
records = append(records, record)
}
return records
Expand Down
109 changes: 109 additions & 0 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@ import (
"fmt"
"os"
"strconv"
"strings"

. "github.com/pingcap/check"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/testutil"
Expand Down Expand Up @@ -87,6 +92,18 @@ func (s *testSuite) TestInfoschemaFieldValue(c *C) {
}, nil, nil), IsTrue)

tk1.MustQuery("select distinct(table_schema) from information_schema.tables").Check(testkit.Rows("INFORMATION_SCHEMA"))

// Fix issue 9836
sm := &mockSessionManager{make(map[uint64]*util.ProcessInfo, 1)}
sm.processInfoMap[1] = &util.ProcessInfo{
ID: 1,
User: "root",
Host: "127.0.0.1",
Command: mysql.ComQuery,
StmtCtx: tk.Se.GetSessionVars().StmtCtx,
}
tk.Se.SetSessionManager(sm)
tk.MustQuery("SELECT user,host,command FROM information_schema.processlist;").Check(testkit.Rows("root 127.0.0.1 Query"))
}

func (s *testSuite) TestDataForTableStatsField(c *C) {
Expand Down Expand Up @@ -227,6 +244,98 @@ func (s *testSuite) TestCharacterSetCollations(c *C) {
tk.MustExec("DROP DATABASE charset_collate_test")
}

var _ = Suite(&testTableSuite{})

type testTableSuite struct {
store kv.Storage
dom *domain.Domain
}

func (s *testTableSuite) SetUpSuite(c *C) {
testleak.BeforeTest()

var err error
s.store, err = mockstore.NewMockTikvStore()
c.Assert(err, IsNil)
session.SetStatsLease(0)
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
}

func (s *testTableSuite) TearDownSuite(c *C) {
defer testleak.AfterTest(c)()
s.dom.Close()
s.store.Close()
}

type mockSessionManager struct {
processInfoMap map[uint64]*util.ProcessInfo
}

func (sm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo { return sm.processInfoMap }

func (sm *mockSessionManager) GetProcessInfo(id uint64) (*util.ProcessInfo, bool) {
rs, ok := sm.processInfoMap[id]
return rs, ok
}

func (sm *mockSessionManager) Kill(connectionID uint64, query bool) {}

func (s *testTableSuite) TestSomeTables(c *C) {
tk := testkit.NewTestKit(c, s.store)

tk.MustQuery("select * from information_schema.COLLATION_CHARACTER_SET_APPLICABILITY where COLLATION_NAME='utf8mb4_bin';").Check(
testkit.Rows("utf8mb4_bin utf8mb4"))
tk.MustQuery("select * from information_schema.SESSION_VARIABLES where VARIABLE_NAME='tidb_retry_limit';").Check(testkit.Rows("tidb_retry_limit 10"))
// cherry-pick https://github.com/pingcap/tidb/pull/7831
//tk.MustQuery("select * from information_schema.ENGINES;").Check(testkit.Rows("InnoDB DEFAULT Supports transactions, row-level locking, and foreign keys YES YES YES"))
tk.MustQuery("select * from information_schema.TABLE_CONSTRAINTS where TABLE_NAME='gc_delete_range';").Check(testkit.Rows("def mysql delete_range_index mysql gc_delete_range UNIQUE"))
tk.MustQuery("select * from information_schema.KEY_COLUMN_USAGE where TABLE_NAME='stats_meta' and COLUMN_NAME='table_id';").Check(
testkit.Rows("def mysql tbl def mysql stats_meta table_id 1 <nil> <nil> <nil> <nil>"))
// https://github.com/pingcap/tidb/pull/9898
//tk.MustQuery("select * from information_schema.STATISTICS where TABLE_NAME='columns_priv' and COLUMN_NAME='Host';").Check(
// testkit.Rows("def mysql columns_priv 0 mysql PRIMARY 1 Host A <nil> <nil> <nil> BTREE "))
tk.MustQuery("select * from information_schema.USER_PRIVILEGES where PRIVILEGE_TYPE='Select';").Check(testkit.Rows("'root'@'%' def Select YES"))

sm := &mockSessionManager{make(map[uint64]*util.ProcessInfo, 2)}
sm.processInfoMap[1] = &util.ProcessInfo{
ID: 1,
User: "user-1",
Host: "localhost",
DB: "information_schema",
Command: byte(1),
State: 1,
Info: "do something",
StmtCtx: tk.Se.GetSessionVars().StmtCtx,
}
sm.processInfoMap[2] = &util.ProcessInfo{
ID: 2,
User: "user-2",
Host: "localhost",
DB: "test",
Command: byte(2),
State: 2,
Info: strings.Repeat("x", 101),
StmtCtx: tk.Se.GetSessionVars().StmtCtx,
}
tk.Se.SetSessionManager(sm)
tk.MustQuery("select * from information_schema.PROCESSLIST order by ID;").Sort().Check(
testkit.Rows(
fmt.Sprintf("1 user-1 localhost information_schema Quit 9223372036 1 %s 0", "do something"),
fmt.Sprintf("2 user-2 localhost test Init DB 9223372036 2 %s 0", strings.Repeat("x", 101)),
))
tk.MustQuery("SHOW PROCESSLIST;").Sort().Check(
testkit.Rows(
fmt.Sprintf("1 user-1 localhost information_schema Quit 9223372036 1 %s", "do something"),
fmt.Sprintf("2 user-2 localhost test Init DB 9223372036 2 %s", strings.Repeat("x", 100)),
))
tk.MustQuery("SHOW FULL PROCESSLIST;").Sort().Check(
testkit.Rows(
fmt.Sprintf("1 user-1 localhost information_schema Quit 9223372036 1 %s", "do something"),
fmt.Sprintf("2 user-2 localhost test Init DB 9223372036 2 %s", strings.Repeat("x", 101)),
))
}

func (s *testSuite) TestSchemataCharacterSet(c *C) {
testleak.BeforeTest()
defer testleak.AfterTest(c)()
Expand Down
2 changes: 1 addition & 1 deletion planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ type LogicalUnionScan struct {
conditions []expression.Expression
}

// DataSource represents a tablescan without condition push down.
// DataSource represents a tableScan without condition push down.
type DataSource struct {
logicalSchemaProducer

Expand Down
4 changes: 2 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1873,9 +1873,9 @@ func buildShowSchema(s *ast.ShowStmt) (schema *expression.Schema) {
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar,
}
case ast.ShowProcessList:
names = []string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info", "Mem"}
names = []string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info"}
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
ftypes = []byte{mysql.TypeLonglong, mysql.TypeVarchar, mysql.TypeVarchar,
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLong, mysql.TypeVarchar, mysql.TypeString, mysql.TypeLonglong}
mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLong, mysql.TypeVarchar, mysql.TypeString}
case ast.ShowPumpStatus:
names = []string{"NodeID", "Address", "State", "Max_Commit_Ts", "Update_Time"}
ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeLonglong, mysql.TypeVarchar}
Expand Down
2 changes: 1 addition & 1 deletion server/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type QueryCtx interface {
Auth(user *auth.UserIdentity, auth []byte, salt []byte) bool

// ShowProcess shows the information about the session.
ShowProcess() util.ProcessInfo
ShowProcess() *util.ProcessInfo

// GetSessionVars return SessionVars.
GetSessionVars() *variable.SessionVars
Expand Down
2 changes: 1 addition & 1 deletion server/driver_tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (tc *TiDBContext) Prepare(sql string) (statement PreparedStatement, columns
}

// ShowProcess implements QueryCtx ShowProcess method.
func (tc *TiDBContext) ShowProcess() util.ProcessInfo {
func (tc *TiDBContext) ShowProcess() *util.ProcessInfo {
return tc.session.ShowProcess()
}

Expand Down
4 changes: 2 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,9 @@ func (cc *clientConn) connectInfo() *variable.ConnectionInfo {
}

// ShowProcessList implements the SessionManager interface.
func (s *Server) ShowProcessList() map[uint64]util.ProcessInfo {
func (s *Server) ShowProcessList() map[uint64]*util.ProcessInfo {
s.rwlock.RLock()
rs := make(map[uint64]util.ProcessInfo, len(s.clients))
rs := make(map[uint64]*util.ProcessInfo, len(s.clients))
for _, client := range s.clients {
if atomic.LoadInt32(&client.status) == connStatusWaitShutdown {
continue
Expand Down
16 changes: 8 additions & 8 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ import (
"golang.org/x/net/context"
)

// Session context
// Session context, it is consistent with the lifecycle of a client connection.
type Session interface {
sessionctx.Context
Status() uint16 // Flag of current status, such as autocommit.
Expand All @@ -89,7 +89,7 @@ type Session interface {
SetSessionManager(util.SessionManager)
Close()
Auth(user *auth.UserIdentity, auth []byte, salt []byte) bool
ShowProcess() util.ProcessInfo
ShowProcess() *util.ProcessInfo
// PrePareTxnCtx is exported for test.
PrepareTxnCtx(context.Context)
// FieldList returns fields list of a table.
Expand Down Expand Up @@ -787,16 +787,17 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte) {
pi := util.ProcessInfo{
ID: s.sessionVars.ConnectionID,
DB: s.sessionVars.CurrentDB,
Command: mysql.Command2Str[command],
Command: command,
Time: t,
State: s.Status(),
Info: sql,
StmtCtx: s.sessionVars.StmtCtx,
}
if s.sessionVars.User != nil {
pi.User = s.sessionVars.User.Username
pi.Host = s.sessionVars.User.Hostname
}
s.processInfo.Store(pi)
s.processInfo.Store(&pi)
}

func (s *session) executeStatement(ctx context.Context, connID uint64, stmtNode ast.StmtNode, stmt sqlexec.Statement, recordSets []sqlexec.RecordSet) ([]sqlexec.RecordSet, error) {
Expand Down Expand Up @@ -1528,12 +1529,11 @@ func (s *session) GetStore() kv.Storage {
return s.store
}

func (s *session) ShowProcess() util.ProcessInfo {
var pi util.ProcessInfo
func (s *session) ShowProcess() *util.ProcessInfo {
var pi *util.ProcessInfo
tmp := s.processInfo.Load()
if tmp != nil {
pi = tmp.(util.ProcessInfo)
pi.Mem = s.GetSessionVars().StmtCtx.MemTracker.BytesConsumed()
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
pi = tmp.(*util.ProcessInfo)
}
return pi
}
Expand Down
Loading