Skip to content

Commit

Permalink
*: update dumpling log and pd usage (pingcap#335)
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu committed Aug 24, 2021
1 parent b90d1f2 commit 0aeb22d
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 30 deletions.
19 changes: 10 additions & 9 deletions dumpling/v4/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met
return err
}
tctx.L().Warn("fallback to concurrent dump tables using rows due to tidb error",
zap.String("database", db), zap.String("table", tbl), zap.Error(err))
zap.String("database", db), zap.String("table", tbl), log.ShortError(err))
}

orderByClause, err := buildOrderByClause(conf, conn, db, tbl, meta.HasImplicitRowID())
Expand All @@ -482,7 +482,7 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met
if err != nil || field == "" {
// skip split chunk logic if not found proper field
tctx.L().Warn("fallback to sequential dump due to no proper field",
zap.String("database", db), zap.String("table", tbl), zap.Error(err))
zap.String("database", db), zap.String("table", tbl), log.ShortError(err))
return d.dumpWholeTableDirectly(tctx, meta, taskChan, "", orderByClause, 0, 1)
}

Expand Down Expand Up @@ -1023,7 +1023,7 @@ func startHTTPService(d *Dumper) error {
go func() {
err := startDumplingService(d.tctx, conf.StatusAddr)
if err != nil {
d.L().Warn("meet error when stopping dumpling http service", zap.Error(err))
d.L().Warn("meet error when stopping dumpling http service", log.ShortError(err))
}
}()
}
Expand Down Expand Up @@ -1080,16 +1080,17 @@ func tidbSetPDClientForGC(d *Dumper) error {
}
pdAddrs, err := GetPdAddrs(tctx, pool)
if err != nil {
return err
tctx.L().Warn("meet error while fetching pd addrs", log.ShortError(err))
return nil
}
if len(pdAddrs) > 0 {
doPdGC, err := checkSameCluster(tctx, pool, pdAddrs)
if err != nil {
tctx.L().Warn("meet error while check whether fetched pd addr and TiDB belong to one cluster", zap.Error(err), zap.Strings("pdAddrs", pdAddrs))
tctx.L().Warn("meet error while check whether fetched pd addr and TiDB belong to one cluster", log.ShortError(err), zap.Strings("pdAddrs", pdAddrs))
} else if doPdGC {
pdClient, err := pd.NewClientWithContext(tctx, pdAddrs, pd.SecurityOption{})
if err != nil {
tctx.L().Warn("create pd client to control GC failed", zap.Error(err), zap.Strings("pdAddrs", pdAddrs))
tctx.L().Warn("create pd client to control GC failed", log.ShortError(err), zap.Strings("pdAddrs", pdAddrs))
}
d.tidbPDClientForGC = pdClient
}
Expand All @@ -1105,13 +1106,13 @@ func tidbGetSnapshot(d *Dumper) error {
if conf.Snapshot == "" && (doPdGC || consistency == "snapshot") {
conn, err := pool.Conn(tctx)
if err != nil {
tctx.L().Warn("cannot get snapshot from TiDB", zap.Error(err))
tctx.L().Warn("cannot get snapshot from TiDB", log.ShortError(err))
return nil
}
snapshot, err := getSnapshot(conn)
_ = conn.Close()
if err != nil {
tctx.L().Warn("cannot get snapshot from TiDB", zap.Error(err))
tctx.L().Warn("cannot get snapshot from TiDB", log.ShortError(err))
return nil
}
conf.Snapshot = snapshot
Expand Down Expand Up @@ -1186,7 +1187,7 @@ func setSessionParam(d *Dumper) error {
if consistency == consistencyTypeSnapshot {
conf.ServerInfo.HasTiKV, err = CheckTiDBWithTiKV(pool)
if err != nil {
return err
d.L().Warn("fail to check whether TiDB has TiKV", log.ShortError(err))
}
if conf.ServerInfo.HasTiKV {
sessionParam["tidb_snapshot"] = snapshot
Expand Down
5 changes: 3 additions & 2 deletions dumpling/v4/export/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (
"strings"
"time"

"github.com/pingcap/dumpling/v4/log"

tcontext "github.com/pingcap/dumpling/v4/context"

"github.com/pingcap/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/soheilhy/cmux"
"go.uber.org/zap"
)

var cmuxReadTimeout = 10 * time.Second
Expand All @@ -35,7 +36,7 @@ func startHTTPServer(tctx *tcontext.Context, lis net.Listener) {
err := httpServer.Serve(lis)
err = errors.Cause(err)
if err != nil && !isErrNetClosing(err) && err != http.ErrServerClosed {
tctx.L().Warn("http server return with error", zap.Error(err))
tctx.L().Warn("dumpling http handler return with error", log.ShortError(err))
}
}

Expand Down
28 changes: 15 additions & 13 deletions dumpling/v4/export/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"strconv"
"strings"

"github.com/pingcap/dumpling/v4/log"

"github.com/go-sql-driver/mysql"

tcontext "github.com/pingcap/dumpling/v4/context"
Expand Down Expand Up @@ -586,23 +588,21 @@ func GetPdAddrs(tctx *tcontext.Context, db *sql.DB) ([]string, error) {
query := "SELECT * FROM information_schema.cluster_info where type = 'pd';"
rows, err := db.QueryContext(tctx, query)
if err != nil {
tctx.L().Warn("can't execute query from db",
zap.String("query", query), zap.Error(err))
return []string{}, errors.Annotatef(err, "sql: %s", query)
}
return GetSpecifiedColumnValueAndClose(rows, "STATUS_ADDRESS")
pdAddrs, err := GetSpecifiedColumnValueAndClose(rows, "STATUS_ADDRESS")
return pdAddrs, errors.Annotatef(err, "sql: %s", query)
}

// GetTiDBDDLIDs gets DDL IDs from TiDB
func GetTiDBDDLIDs(tctx *tcontext.Context, db *sql.DB) ([]string, error) {
query := "SELECT * FROM information_schema.tidb_servers_info;"
rows, err := db.QueryContext(tctx, query)
if err != nil {
tctx.L().Warn("can't execute query from db",
zap.String("query", query), zap.Error(err))
return []string{}, errors.Annotatef(err, "sql: %s", query)
}
return GetSpecifiedColumnValueAndClose(rows, "DDL_ID")
ddlIDs, err := GetSpecifiedColumnValueAndClose(rows, "DDL_ID")
return ddlIDs, errors.Annotatef(err, "sql: %s", query)
}

// CheckTiDBWithTiKV use sql to check whether current TiDB has TiKV
Expand All @@ -612,7 +612,9 @@ func CheckTiDBWithTiKV(db *sql.DB) (bool, error) {
row := db.QueryRow(query)
err := row.Scan(&count)
if err != nil {
return false, errors.Annotatef(err, "sql: %s", query)
// still return true here. Because sometimes users may not have privileges for MySQL.TiDB database
// In most production cases TiDB has TiKV
return true, errors.Annotatef(err, "sql: %s", query)
}
return count > 0, nil
}
Expand Down Expand Up @@ -995,22 +997,22 @@ func estimateCount(tctx *tcontext.Context, dbName, tableName string, db *sql.Con
func detectEstimateRows(tctx *tcontext.Context, db *sql.Conn, query string, fieldNames []string) uint64 {
rows, err := db.QueryContext(tctx, query)
if err != nil {
tctx.L().Warn("can't execute query from db",
zap.String("query", query), zap.Error(err))
tctx.L().Warn("can't detect estimate rows from db",
zap.String("query", query), log.ShortError(err))
return 0
}
defer rows.Close()
rows.Next()
columns, err := rows.Columns()
if err != nil {
tctx.L().Warn("can't get columns from db",
zap.String("query", query), zap.Error(err))
zap.String("query", query), log.ShortError(err))
return 0
}
err = rows.Err()
if err != nil {
tctx.L().Warn("rows meet some error during the query",
zap.String("query", query), zap.Error(err))
zap.String("query", query), log.ShortError(err))
return 0
}
addr := make([]interface{}, len(columns))
Expand All @@ -1031,14 +1033,14 @@ found:
err = rows.Scan(addr...)
if err != nil || fieldIndex < 0 {
tctx.L().Warn("can't get estimate count from db",
zap.String("query", query), zap.Error(err))
zap.String("query", query), log.ShortError(err))
return 0
}

estRows, err := strconv.ParseFloat(oneRow[fieldIndex].String, 64)
if err != nil {
tctx.L().Warn("can't get parse rows from db",
zap.String("query", query), zap.Error(err))
zap.String("query", query), log.ShortError(err))
return 0
}
return uint64(estRows)
Expand Down
13 changes: 7 additions & 6 deletions dumpling/v4/export/writer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

tcontext "github.com/pingcap/dumpling/v4/context"
"github.com/pingcap/dumpling/v4/log"

"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/summary"
Expand Down Expand Up @@ -177,12 +178,12 @@ func WriteInsert(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR Tabl

defer func() {
if err != nil {
pCtx.L().Warn("fail to dumping table(chunk), will revert some metrics now",
zap.Error(err),
pCtx.L().Warn("fail to dumping table(chunk), will revert some metrics and start a retry if possible",
zap.String("database", meta.DatabaseName()),
zap.String("table", meta.TableName()),
zap.Uint64("finished rows", lastCounter),
zap.Uint64("finished size", wp.finishedFileSize))
zap.Uint64("finished size", wp.finishedFileSize),
log.ShortError(err))
SubGauge(finishedRowsGauge, cfg.Labels, float64(lastCounter))
SubGauge(finishedSizeGauge, cfg.Labels, float64(wp.finishedFileSize))
} else {
Expand Down Expand Up @@ -315,12 +316,12 @@ func WriteInsertInCsv(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR

defer func() {
if err != nil {
pCtx.L().Warn("fail to dumping table(chunk), will revert some metrics now",
zap.Error(err),
pCtx.L().Warn("fail to dumping table(chunk), will revert some metrics and start a retry if possible",
zap.String("database", meta.DatabaseName()),
zap.String("table", meta.TableName()),
zap.Uint64("finished rows", lastCounter),
zap.Uint64("finished size", wp.finishedFileSize))
zap.Uint64("finished size", wp.finishedFileSize),
log.ShortError(err))
SubGauge(finishedRowsGauge, cfg.Labels, float64(lastCounter))
SubGauge(finishedSizeGauge, cfg.Labels, float64(wp.finishedFileSize))
} else {
Expand Down
9 changes: 9 additions & 0 deletions dumpling/v4/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,12 @@ func InitAppLogger(cfg *Config) (Logger, *pclog.ZapProperties, error) {
func NewAppLogger(logger *zap.Logger) Logger {
return Logger{logger}
}

// ShortError contructs a field which only records the error message without the
// verbose text (i.e. excludes the stack trace).
func ShortError(err error) zap.Field {
if err == nil {
return zap.Skip()
}
return zap.String("error", err.Error())
}

0 comments on commit 0aeb22d

Please sign in to comment.