Skip to content

Commit

Permalink
fix concurrency problem (pingcap#117)
Browse files Browse the repository at this point in the history
* resolve consistency problem

* support dump follower status for mysql and mariaDB

* revise metadata save point

* address comment
  • Loading branch information
lichunzhu authored Jul 10, 2020
1 parent e98f308 commit 19ba921
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 75 deletions.
80 changes: 53 additions & 27 deletions dumpling/v4/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,14 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
if conf.ServerInfo.ServerType != ServerTypeTiDB {
return errors.New("snapshot consistency is not supported for this server")
}
hasTiKV, err := CheckTiDBWithTiKV(pool)
if err != nil {
return err
}
if hasTiKV {
conf.SessionParams["tidb_snapshot"] = conf.Snapshot
if conf.Consistency == "snapshot" {
hasTiKV, err := CheckTiDBWithTiKV(pool)
if err != nil {
return err
}
if hasTiKV {
conf.SessionParams["tidb_snapshot"] = conf.Snapshot
}
}
}

Expand All @@ -103,26 +105,23 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
return err
}

databases, err := prepareDumpingDatabases(conf, pool)
if err != nil {
return err
}

conf.Tables, err = listAllTables(pool, databases)
if err != nil {
return err
}
m := newGlobalMetadata(conf.OutputDirPath)
// write metadata even if dump failed
defer m.writeGlobalMetaData()

if !conf.NoViews {
views, err := listAllViews(pool, databases)
// for consistency lock, we should lock tables at first to get the tables we want to lock & dump
// for consistency lock, record meta pos before lock tables because other tables may still be modified while locking tables
if conf.Consistency == "lock" {
m.recordStartTime(time.Now())
err = m.recordGlobalMetaData(pool, conf.ServerInfo.ServerType)
if err != nil {
log.Info("get global metadata failed", zap.Error(err))
}
if err = prepareTableListToDump(conf, pool); err != nil {
return err
}
conf.Tables.Merge(views)
}

filterTables(conf)

conCtrl, err := NewConsistencyController(conf, pool)
if err != nil {
return err
Expand All @@ -131,13 +130,17 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
return err
}

m := newGlobalMetadata(conf.OutputDirPath)
// write metadata even if dump failed
defer m.writeGlobalMetaData()
m.recordStartTime(time.Now())
err = m.getGlobalMetaData(pool, conf.ServerInfo.ServerType)
if err != nil {
log.Info("get global metadata failed", zap.Error(err))
// for other consistencies, we should get table list after consistency is set up and GlobalMetaData is cached
// for other consistencies, record snapshot after whole tables are locked. The recorded meta info is exactly the locked snapshot.
if conf.Consistency != "lock" {
m.recordStartTime(time.Now())
err = m.recordGlobalMetaData(pool, conf.ServerInfo.ServerType)
if err != nil {
log.Info("get global metadata failed", zap.Error(err))
}
if err = prepareTableListToDump(conf, pool); err != nil {
return err
}
}

var writer Writer
Expand Down Expand Up @@ -197,6 +200,29 @@ func dumpDatabases(ctx context.Context, conf *Config, db *sql.DB, writer Writer)
return nil
}

func prepareTableListToDump(conf *Config, pool *sql.DB) error {
databases, err := prepareDumpingDatabases(conf, pool)
if err != nil {
return err
}

conf.Tables, err = listAllTables(pool, databases)
if err != nil {
return err
}

if !conf.NoViews {
views, err := listAllViews(pool, databases)
if err != nil {
return err
}
conf.Tables.Merge(views)
}

filterTables(conf)
return nil
}

func dumpSql(ctx context.Context, conf *Config, db *sql.DB, writer Writer) error {
tableIR, err := SelectFromSql(conf, db)
if err != nil {
Expand Down
130 changes: 91 additions & 39 deletions dumpling/v4/export/metadata.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
package export

import (
"bytes"
"database/sql"
"errors"
"fmt"
"path"
"strings"
"time"
)

type globalMetadata struct {
logFile string
pos string
gtidSet string
buffer bytes.Buffer

filePath string
startTime time.Time
finishTime time.Time
filePath string
}

const (
Expand All @@ -31,43 +30,25 @@ const (
func newGlobalMetadata(outputDir string) *globalMetadata {
return &globalMetadata{
filePath: path.Join(outputDir, metadataPath),
buffer: bytes.Buffer{},
}
}

func (m globalMetadata) String() string {
str := ""
if m.startTime.IsZero() {
return str
}
str += "Started dump at: " + m.startTime.Format(metadataTimeLayout) + "\n"

str += "SHOW MASTER STATUS:\n"
if m.logFile != "" {
str += "\t\tLog: " + m.logFile + "\n"
}
if m.pos != "" {
str += "\t\tPos: " + m.pos + "\n"
}
if m.gtidSet != "" {
str += "\t\tGTID:" + m.gtidSet + "\n"
}

if m.finishTime.IsZero() {
return str
}
str += "Finished dump at: " + m.finishTime.Format(metadataTimeLayout) + "\n"
return str
return m.buffer.String()
}

func (m *globalMetadata) recordStartTime(t time.Time) {
m.startTime = t
m.buffer.WriteString("Started dump at: " + t.Format(metadataTimeLayout) + "\n")
}

func (m *globalMetadata) recordFinishTime(t time.Time) {
m.finishTime = t
m.buffer.WriteString("Finished dump at: " + t.Format(metadataTimeLayout) + "\n")
}

func (m *globalMetadata) getGlobalMetaData(db *sql.DB, serverType ServerType) error {
func (m *globalMetadata) recordGlobalMetaData(db *sql.DB, serverType ServerType) error {
// get master status info
m.buffer.WriteString("SHOW MASTER STATUS:\n")
switch serverType {
// For MySQL:
// mysql> SHOW MASTER STATUS;
Expand All @@ -91,9 +72,15 @@ func (m *globalMetadata) getGlobalMetaData(db *sql.DB, serverType ServerType) er
if err != nil {
return err
}
m.logFile = str[fileFieldIndex]
m.pos = str[posFieldIndex]
m.gtidSet = str[gtidSetFieldIndex]
if logFile := str[fileFieldIndex]; logFile != "" {
m.buffer.WriteString("\tLog: " + logFile + "\n")
}
if pos := str[posFieldIndex]; pos != "" {
m.buffer.WriteString("\tPos: " + pos + "\n")
}
if gtidSet := str[gtidSetFieldIndex]; gtidSet != "" {
m.buffer.WriteString("\tGTID:" + gtidSet + "\n")
}
// For MariaDB:
// SHOW MASTER STATUS;
// +--------------------+----------+--------------+------------------+
Expand All @@ -113,16 +100,81 @@ func (m *globalMetadata) getGlobalMetaData(db *sql.DB, serverType ServerType) er
if err != nil {
return err
}
m.logFile = str[fileFieldIndex]
m.pos = str[posFieldIndex]
err = db.QueryRow("SELECT @@global.gtid_binlog_pos").Scan(&m.gtidSet)
if logFile := str[fileFieldIndex]; logFile != "" {
m.buffer.WriteString("\tLog: " + logFile + "\n")
}
if pos := str[posFieldIndex]; pos != "" {
m.buffer.WriteString("\tPos: " + pos + "\n")
}
var gtidSet string
err = db.QueryRow("SELECT @@global.gtid_binlog_pos").Scan(&gtidSet)
if err != nil {
return err
}
if gtidSet != "" {
m.buffer.WriteString("\tGTID:" + gtidSet + "\n")
}
default:
return errors.New("unsupported serverType" + serverType.String() + "for getGlobalMetaData")
return errors.New("unsupported serverType" + serverType.String() + "for recordGlobalMetaData")
}
m.buffer.WriteString("\n")
if serverType == ServerTypeTiDB {
return nil
}
// get follower status info
var (
isms bool
query string
)
if err := simpleQuery(db, "SELECT @@default_master_connection", func(rows *sql.Rows) error {
isms = true
return nil
}); err != nil {
isms = false
}
return nil
if isms {
query = "SHOW ALL SLAVES STATUS"
} else {
query = "SHOW SLAVE STATUS"
}
return simpleQuery(db, query, func(rows *sql.Rows) error {
cols, err := rows.Columns()
if err != nil {
return err
}
data := make([]string, len(cols))
args := make([]interface{}, 0, len(cols))
for i := range data {
args = append(args, &data[i])
}
if err := rows.Scan(args...); err != nil {
return err
}
var connName, pos, logFile, host, gtidSet string
for i, col := range cols {
col = strings.ToLower(col)
switch col {
case "connection_name":
connName = data[i]
case "exec_master_log_pos":
pos = data[i]
case "relay_master_log_file":
logFile = data[i]
case "master_host":
host = data[i]
case "executed_gtid_set":
gtidSet = data[i]
}
}
if len(host) > 0 {
m.buffer.WriteString("SHOW SLAVE STATUS:\n")
if isms {
m.buffer.WriteString("\tConnection name: " + connName + "\n")
}
fmt.Fprintf(&m.buffer, "\tHost: %s\n\tLog: %s\n\tPos: %s\n\tGTID:%s\n\n", host, logFile, pos, gtidSet)
}
return nil
})
}

func (m *globalMetadata) writeGlobalMetaData() error {
Expand Down
Loading

0 comments on commit 19ba921

Please sign in to comment.