Skip to content
This repository has been archived by the owner on Aug 21, 2023. It is now read-only.

fix concurrency problem #117

Merged
merged 4 commits into from
Jul 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 53 additions & 27 deletions v4/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,14 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
if conf.ServerInfo.ServerType != ServerTypeTiDB {
return errors.New("snapshot consistency is not supported for this server")
}
hasTiKV, err := CheckTiDBWithTiKV(pool)
if err != nil {
return err
}
if hasTiKV {
conf.SessionParams["tidb_snapshot"] = conf.Snapshot
if conf.Consistency == "snapshot" {
hasTiKV, err := CheckTiDBWithTiKV(pool)
if err != nil {
return err
}
if hasTiKV {
conf.SessionParams["tidb_snapshot"] = conf.Snapshot
}
}
}

Expand All @@ -103,26 +105,23 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
return err
}

databases, err := prepareDumpingDatabases(conf, pool)
if err != nil {
return err
}

conf.Tables, err = listAllTables(pool, databases)
if err != nil {
return err
}
m := newGlobalMetadata(conf.OutputDirPath)
// write metadata even if dump failed
defer m.writeGlobalMetaData()

if !conf.NoViews {
views, err := listAllViews(pool, databases)
// for consistency lock, we should lock tables at first to get the tables we want to lock & dump
// for consistency lock, record meta pos before lock tables because other tables may still be modified while locking tables
if conf.Consistency == "lock" {
m.recordStartTime(time.Now())
err = m.recordGlobalMetaData(pool, conf.ServerInfo.ServerType)
if err != nil {
log.Info("get global metadata failed", zap.Error(err))
}
if err = prepareTableListToDump(conf, pool); err != nil {
return err
}
conf.Tables.Merge(views)
}

filterTables(conf)

conCtrl, err := NewConsistencyController(conf, pool)
if err != nil {
return err
Expand All @@ -131,13 +130,17 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
return err
}

m := newGlobalMetadata(conf.OutputDirPath)
// write metadata even if dump failed
defer m.writeGlobalMetaData()
m.recordStartTime(time.Now())
err = m.getGlobalMetaData(pool, conf.ServerInfo.ServerType)
if err != nil {
log.Info("get global metadata failed", zap.Error(err))
// for other consistencies, we should get table list after consistency is set up and GlobalMetaData is cached
// for other consistencies, record snapshot after whole tables are locked. The recorded meta info is exactly the locked snapshot.
if conf.Consistency != "lock" {
m.recordStartTime(time.Now())
err = m.recordGlobalMetaData(pool, conf.ServerInfo.ServerType)
if err != nil {
log.Info("get global metadata failed", zap.Error(err))
}
if err = prepareTableListToDump(conf, pool); err != nil {
return err
}
}

var writer Writer
Expand Down Expand Up @@ -197,6 +200,29 @@ func dumpDatabases(ctx context.Context, conf *Config, db *sql.DB, writer Writer)
return nil
}

func prepareTableListToDump(conf *Config, pool *sql.DB) error {
databases, err := prepareDumpingDatabases(conf, pool)
if err != nil {
return err
}

conf.Tables, err = listAllTables(pool, databases)
if err != nil {
return err
}

if !conf.NoViews {
views, err := listAllViews(pool, databases)
if err != nil {
return err
}
conf.Tables.Merge(views)
}

filterTables(conf)
return nil
}

func dumpSql(ctx context.Context, conf *Config, db *sql.DB, writer Writer) error {
tableIR, err := SelectFromSql(conf, db)
if err != nil {
Expand Down
130 changes: 91 additions & 39 deletions v4/export/metadata.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
package export

import (
"bytes"
"database/sql"
"errors"
"fmt"
"path"
"strings"
"time"
)

type globalMetadata struct {
logFile string
pos string
gtidSet string
buffer bytes.Buffer

filePath string
startTime time.Time
finishTime time.Time
filePath string
}
Comment on lines 13 to 17
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change this to immediately write to a buffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I think it's clearer.. WDYT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh ok.


const (
Expand All @@ -31,43 +30,25 @@ const (
func newGlobalMetadata(outputDir string) *globalMetadata {
return &globalMetadata{
filePath: path.Join(outputDir, metadataPath),
buffer: bytes.Buffer{},
}
}

func (m globalMetadata) String() string {
str := ""
if m.startTime.IsZero() {
return str
}
str += "Started dump at: " + m.startTime.Format(metadataTimeLayout) + "\n"

str += "SHOW MASTER STATUS:\n"
if m.logFile != "" {
str += "\t\tLog: " + m.logFile + "\n"
}
if m.pos != "" {
str += "\t\tPos: " + m.pos + "\n"
}
if m.gtidSet != "" {
str += "\t\tGTID:" + m.gtidSet + "\n"
}

if m.finishTime.IsZero() {
return str
}
str += "Finished dump at: " + m.finishTime.Format(metadataTimeLayout) + "\n"
return str
return m.buffer.String()
}

func (m *globalMetadata) recordStartTime(t time.Time) {
m.startTime = t
m.buffer.WriteString("Started dump at: " + t.Format(metadataTimeLayout) + "\n")
}

func (m *globalMetadata) recordFinishTime(t time.Time) {
m.finishTime = t
m.buffer.WriteString("Finished dump at: " + t.Format(metadataTimeLayout) + "\n")
}

func (m *globalMetadata) getGlobalMetaData(db *sql.DB, serverType ServerType) error {
func (m *globalMetadata) recordGlobalMetaData(db *sql.DB, serverType ServerType) error {
// get master status info
m.buffer.WriteString("SHOW MASTER STATUS:\n")
switch serverType {
// For MySQL:
// mysql> SHOW MASTER STATUS;
Expand All @@ -91,9 +72,15 @@ func (m *globalMetadata) getGlobalMetaData(db *sql.DB, serverType ServerType) er
if err != nil {
return err
}
m.logFile = str[fileFieldIndex]
m.pos = str[posFieldIndex]
m.gtidSet = str[gtidSetFieldIndex]
if logFile := str[fileFieldIndex]; logFile != "" {
m.buffer.WriteString("\tLog: " + logFile + "\n")
}
if pos := str[posFieldIndex]; pos != "" {
m.buffer.WriteString("\tPos: " + pos + "\n")
}
if gtidSet := str[gtidSetFieldIndex]; gtidSet != "" {
m.buffer.WriteString("\tGTID:" + gtidSet + "\n")
}
// For MariaDB:
// SHOW MASTER STATUS;
// +--------------------+----------+--------------+------------------+
Expand All @@ -113,16 +100,81 @@ func (m *globalMetadata) getGlobalMetaData(db *sql.DB, serverType ServerType) er
if err != nil {
return err
}
m.logFile = str[fileFieldIndex]
m.pos = str[posFieldIndex]
err = db.QueryRow("SELECT @@global.gtid_binlog_pos").Scan(&m.gtidSet)
if logFile := str[fileFieldIndex]; logFile != "" {
m.buffer.WriteString("\tLog: " + logFile + "\n")
}
if pos := str[posFieldIndex]; pos != "" {
m.buffer.WriteString("\tPos: " + pos + "\n")
}
var gtidSet string
err = db.QueryRow("SELECT @@global.gtid_binlog_pos").Scan(&gtidSet)
if err != nil {
return err
}
if gtidSet != "" {
m.buffer.WriteString("\tGTID:" + gtidSet + "\n")
}
default:
return errors.New("unsupported serverType" + serverType.String() + "for getGlobalMetaData")
return errors.New("unsupported serverType" + serverType.String() + "for recordGlobalMetaData")
}
m.buffer.WriteString("\n")
if serverType == ServerTypeTiDB {
return nil
}
// get follower status info
var (
isms bool
query string
)
if err := simpleQuery(db, "SELECT @@default_master_connection", func(rows *sql.Rows) error {
isms = true
return nil
}); err != nil {
isms = false
}
return nil
if isms {
query = "SHOW ALL SLAVES STATUS"
} else {
query = "SHOW SLAVE STATUS"
}
return simpleQuery(db, query, func(rows *sql.Rows) error {
cols, err := rows.Columns()
if err != nil {
return err
}
data := make([]string, len(cols))
args := make([]interface{}, 0, len(cols))
for i := range data {
args = append(args, &data[i])
}
if err := rows.Scan(args...); err != nil {
return err
}
var connName, pos, logFile, host, gtidSet string
for i, col := range cols {
col = strings.ToLower(col)
switch col {
case "connection_name":
connName = data[i]
case "exec_master_log_pos":
pos = data[i]
case "relay_master_log_file":
logFile = data[i]
case "master_host":
host = data[i]
case "executed_gtid_set":
gtidSet = data[i]
}
}
if len(host) > 0 {
m.buffer.WriteString("SHOW SLAVE STATUS:\n")
if isms {
m.buffer.WriteString("\tConnection name: " + connName + "\n")
}
fmt.Fprintf(&m.buffer, "\tHost: %s\n\tLog: %s\n\tPos: %s\n\tGTID:%s\n\n", host, logFile, pos, gtidSet)
}
return nil
})
}

func (m *globalMetadata) writeGlobalMetaData() error {
Expand Down
Loading