From 95099237b7beb80f66efaac9f3e02bd01e68bcc2 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 25 May 2021 11:05:34 +0800 Subject: [PATCH] dump: always split TiDB v4.* tables through tidb rowid to save TiDB's memory (#273) (#280) --- tests/primary_key/data/pk_case_3.sql | 24 +- tests/primary_key/result/pk_case_3.sql | 22 +- v4/export/config.go | 1 + v4/export/dump.go | 233 ++++++++-- v4/export/sql.go | 61 ++- v4/export/sql_test.go | 560 +++++++++++++++++++++++-- v4/export/sql_type.go | 57 ++- 7 files changed, 839 insertions(+), 119 deletions(-) diff --git a/tests/primary_key/data/pk_case_3.sql b/tests/primary_key/data/pk_case_3.sql index 142102ce..ab69e5b9 100644 --- a/tests/primary_key/data/pk_case_3.sql +++ b/tests/primary_key/data/pk_case_3.sql @@ -1,14 +1,14 @@ # test random order and no primary key -create table `pk_case_3` (a int, b int); +create table `pk_case_3` (a int, b int, g geometry); insert into `pk_case_3` values -(6, 4), -(4, 6), -(8, 2), -(3, 7), -(1, 9), -(2, 8), -(5, 5), -(10, 0), -(0, 10), -(9, 1), -(7, 3); +(6, 4, ST_GeomFromText('POINT(1 1)')), +(4, 6, ST_GeomFromText('LINESTRING(2 1, 6 6)')), +(8, 2, NULL), +(3, 7, NULL), +(1, 9, NULL), +(2, 8, NULL), +(5, 5, NULL), +(10, 0, NULL), +(0, 10, NULL), +(9, 1, NULL), +(7, 3, NULL); diff --git a/tests/primary_key/result/pk_case_3.sql b/tests/primary_key/result/pk_case_3.sql index 0698d1a1..a29e568d 100644 --- a/tests/primary_key/result/pk_case_3.sql +++ b/tests/primary_key/result/pk_case_3.sql @@ -1,13 +1,13 @@ /*!40101 SET NAMES binary*/; INSERT INTO `pk_case_3` VALUES -(6,4), -(4,6), -(8,2), -(3,7), -(1,9), -(2,8), -(5,5), -(10,0), -(0,10), -(9,1), -(7,3); +(6,4,x'000000000101000000000000000000f03f000000000000f03f'), +(4,6,x'000000000102000000020000000000000000000040000000000000f03f00000000000018400000000000001840'), +(8,2,NULL), +(3,7,NULL), +(1,9,NULL), +(2,8,NULL), +(5,5,NULL), +(10,0,NULL), +(0,10,NULL), +(9,1,NULL), +(7,3,NULL); diff --git a/v4/export/config.go b/v4/export/config.go index 00d86803..a5b99f7e 100644 --- a/v4/export/config.go +++ b/v4/export/config.go @@ -565,6 +565,7 @@ var ( // ServerInfo is the combination of ServerType and ServerInfo type ServerInfo struct { + HasTiKV bool ServerType ServerType ServerVersion *semver.Version } diff --git a/v4/export/dump.go b/v4/export/dump.go index e3d72749..1ad1ca61 100755 --- a/v4/export/dump.go +++ b/v4/export/dump.go @@ -400,13 +400,13 @@ func (d *Dumper) buildConcatTask(tctx *tcontext.Context, conn *sql.Conn, meta Ta } } -func (d *Dumper) dumpWholeTableDirectly(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task) error { +func (d *Dumper) dumpWholeTableDirectly(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task, partition string, currentChunk, totalChunks int) error { conf := d.conf - tableIR, err := SelectAllFromTable(conf, conn, meta) + tableIR, err := SelectAllFromTable(conf, conn, meta, partition) if err != nil { return err } - task := NewTaskTableData(meta, tableIR, 0, 1) + task := NewTaskTableData(meta, tableIR, currentChunk, totalChunks) ctxDone := d.sendTaskToChan(tctx, task, taskChan) if ctxDone { return tctx.Err() @@ -432,7 +432,7 @@ func (d *Dumper) sequentialDumpTable(tctx *tcontext.Context, conn *sql.Conn, met zap.String("database", meta.DatabaseName()), zap.String("table", meta.TableName())) } - return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan) + return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, "", 0, 1) } // concurrentDumpTable tries to split table into several chunks to dump @@ -441,9 +441,8 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met db, tbl := meta.DatabaseName(), meta.TableName() if conf.ServerInfo.ServerType == ServerTypeTiDB && conf.ServerInfo.ServerVersion != nil && - conf.ServerInfo.ServerVersion.Compare(*tableSampleVersion) >= 0 { - tctx.L().Debug("dumping TiDB tables with TABLESAMPLE", - zap.String("database", db), zap.String("table", tbl)) + (conf.ServerInfo.ServerVersion.Compare(*tableSampleVersion) >= 0 || + (conf.ServerInfo.HasTiKV && conf.ServerInfo.ServerVersion.Compare(*gcSafePointVersion) >= 0)) { return d.concurrentDumpTiDBTables(tctx, conn, meta, taskChan) } field, err := pickupPossibleField(db, tbl, conn, conf) @@ -454,7 +453,7 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met // skip split chunk logic if not found proper field tctx.L().Warn("fallback to sequential dump due to no proper field", zap.String("database", db), zap.String("table", tbl)) - return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan) + return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, "", 0, 1) } min, max, err := d.selectMinAndMaxIntValue(conn, db, tbl, field) @@ -477,7 +476,7 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met zap.Uint64("conf.rows", conf.Rows), zap.String("database", db), zap.String("table", tbl)) - return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan) + return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, "", 0, 1) } // every chunk would have eventual adjustments @@ -508,7 +507,7 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met for max.Cmp(cutoff) >= 0 { nextCutOff := new(big.Int).Add(cutoff, bigEstimatedStep) where := fmt.Sprintf("%s(`%s` >= %d AND `%s` < %d)", nullValueCondition, escapeString(field), cutoff, escapeString(field), nextCutOff) - query := buildSelectQuery(db, tbl, selectField, buildWhereCondition(conf, where), orderByClause) + query := buildSelectQuery(db, tbl, selectField, "", buildWhereCondition(conf, where), orderByClause) if len(nullValueCondition) > 0 { nullValueCondition = "" } @@ -572,16 +571,76 @@ func (d *Dumper) selectMinAndMaxIntValue(conn *sql.Conn, db, tbl, field string) } func (d *Dumper) concurrentDumpTiDBTables(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task) error { - conf := d.conf db, tbl := meta.DatabaseName(), meta.TableName() - handleColNames, handleVals, err := selectTiDBTableSample(conn, db, tbl) + var ( + handleColNames []string + handleVals [][]string + err error + ) + if d.conf.ServerInfo.ServerVersion.Compare(*tableSampleVersion) >= 0 { + tctx.L().Debug("dumping TiDB tables with TABLESAMPLE", + zap.String("database", db), zap.String("table", tbl)) + handleColNames, handleVals, err = selectTiDBTableSample(tctx, conn, db, tbl) + } else { + tctx.L().Debug("dumping TiDB tables with TABLE REGIONS", + zap.String("database", db), zap.String("table", tbl)) + var partitions []string + partitions, err = GetPartitionNames(conn, db, tbl) + if err == nil { + if len(partitions) == 0 { + handleColNames, handleVals, err = selectTiDBTableRegion(tctx, conn, db, tbl) + } else { + return d.concurrentDumpTiDBPartitionTables(tctx, conn, meta, taskChan, partitions) + } + } + } if err != nil { return err } + return d.sendConcurrentDumpTiDBTasks(tctx, conn, meta, taskChan, handleColNames, handleVals, "", 0, len(handleVals)+1) +} + +func (d *Dumper) concurrentDumpTiDBPartitionTables(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task, partitions []string) error { + db, tbl := meta.DatabaseName(), meta.TableName() + tctx.L().Debug("dumping TiDB tables with TABLE REGIONS for partition table", + zap.String("database", db), zap.String("table", tbl), zap.Strings("partitions", partitions)) + + startChunkIdx := 0 + totalChunk := 0 + cachedHandleVals := make([][][]string, len(partitions)) + + handleColNames, _, err := selectTiDBRowKeyFields(conn, db, tbl, checkTiDBTableRegionPkFields) + if err != nil { + return err + } + // cache handleVals here to calculate the total chunks + for i, partition := range partitions { + handleVals, err := selectTiDBPartitionRegion(tctx, conn, db, tbl, partition) + if err != nil { + return err + } + totalChunk += len(handleVals) + 1 + cachedHandleVals[i] = handleVals + } + for i, partition := range partitions { + err := d.sendConcurrentDumpTiDBTasks(tctx, conn, meta, taskChan, handleColNames, cachedHandleVals[i], partition, startChunkIdx, totalChunk) + if err != nil { + return err + } + startChunkIdx += len(cachedHandleVals[i]) + 1 + } + return nil +} + +func (d *Dumper) sendConcurrentDumpTiDBTasks(tctx *tcontext.Context, + conn *sql.Conn, meta TableMeta, taskChan chan<- Task, + handleColNames []string, handleVals [][]string, partition string, startChunkIdx, totalChunk int) error { if len(handleVals) == 0 { - return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan) + return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, partition, startChunkIdx, totalChunk) } + conf := d.conf + db, tbl := meta.DatabaseName(), meta.TableName() selectField, selectLen, err := buildSelectField(conn, db, tbl, conf.CompleteInsert) if err != nil { return err @@ -590,8 +649,8 @@ func (d *Dumper) concurrentDumpTiDBTables(tctx *tcontext.Context, conn *sql.Conn orderByClause := buildOrderByClauseString(handleColNames) for i, w := range where { - query := buildSelectQuery(db, tbl, selectField, buildWhereCondition(conf, w), orderByClause) - task := NewTaskTableData(meta, newTableData(query, selectLen, false), i, len(where)) + query := buildSelectQuery(db, tbl, selectField, partition, buildWhereCondition(conf, w), orderByClause) + task := NewTaskTableData(meta, newTableData(query, selectLen, false), i+startChunkIdx, totalChunk) ctxDone := d.sendTaskToChan(tctx, task, taskChan) if ctxDone { return tctx.Err() @@ -605,23 +664,14 @@ func (d *Dumper) L() log.Logger { return d.tctx.L() } -func selectTiDBTableSample(conn *sql.Conn, dbName, tableName string) (pkFields []string, pkVals [][]string, err error) { - pkFields, pkColTypes, err := GetPrimaryKeyAndColumnTypes(conn, dbName, tableName) - if err != nil { - return nil, nil, errors.Trace(err) - } - hasImplicitRowID, err := SelectTiDBRowID(conn, dbName, tableName) +func selectTiDBTableSample(tctx *tcontext.Context, conn *sql.Conn, dbName, tableName string) (pkFields []string, pkVals [][]string, err error) { + pkFields, pkColTypes, err := selectTiDBRowKeyFields(conn, dbName, tableName, nil) if err != nil { return nil, nil, errors.Trace(err) } - if hasImplicitRowID { - pkFields, pkColTypes = []string{"_tidb_rowid"}, []string{"BIGINT"} - } - if len(pkFields) == 0 { - return pkFields, pkVals, nil - } + query := buildTiDBTableSampleQuery(pkFields, dbName, tableName) - rows, err := conn.QueryContext(context.Background(), query) + rows, err := conn.QueryContext(tctx, query) if err != nil { return nil, nil, errors.Trace(err) } @@ -645,7 +695,8 @@ func selectTiDBTableSample(conn *sql.Conn, dbName, tableName string) (pkFields [ pkVals = append(pkVals, pkValRow) iter.Next() } - return pkFields, pkVals, nil + iter.Close() + return pkFields, pkVals, iter.Error() } func buildTiDBTableSampleQuery(pkFields []string, dbName, tblName string) string { @@ -658,6 +709,124 @@ func buildTiDBTableSampleQuery(pkFields []string, dbName, tblName string) string return fmt.Sprintf(template, pks, escapeString(dbName), escapeString(tblName), pks) } +func selectTiDBRowKeyFields(conn *sql.Conn, dbName, tableName string, checkPkFields func([]string, []string) error) (pkFields, pkColTypes []string, err error) { + hasImplicitRowID, err := SelectTiDBRowID(conn, dbName, tableName) + if err != nil { + return + } + if hasImplicitRowID { + pkFields, pkColTypes = []string{"_tidb_rowid"}, []string{"BIGINT"} + } else { + pkFields, pkColTypes, err = GetPrimaryKeyAndColumnTypes(conn, dbName, tableName) + if err == nil { + if checkPkFields != nil { + err = checkPkFields(pkFields, pkColTypes) + } + } + } + return +} + +func checkTiDBTableRegionPkFields(pkFields, pkColTypes []string) (err error) { + if len(pkFields) != 1 || len(pkColTypes) != 1 { + err = errors.Errorf("unsupported primary key for selectTableRegion. pkFields: [%s], pkColTypes: [%s]", strings.Join(pkFields, ", "), strings.Join(pkColTypes, ", ")) + return + } + if _, ok := dataTypeNum[pkColTypes[0]]; !ok { + err = errors.Errorf("unsupported primary key type for selectTableRegion. pkFields: [%s], pkColTypes: [%s]", strings.Join(pkFields, ", "), strings.Join(pkColTypes, ", ")) + } + return +} + +func selectTiDBTableRegion(tctx *tcontext.Context, conn *sql.Conn, dbName, tableName string) (pkFields []string, pkVals [][]string, err error) { + pkFields, _, err = selectTiDBRowKeyFields(conn, dbName, tableName, checkTiDBTableRegionPkFields) + if err != nil { + return + } + + var ( + startKey, decodedKey sql.NullString + rowID = -1 + ) + const ( + tableRegionSQL = "SELECT START_KEY,tidb_decode_key(START_KEY) from INFORMATION_SCHEMA.TIKV_REGION_STATUS s WHERE s.DB_NAME = ? AND s.TABLE_NAME = ? AND IS_INDEX = 0 ORDER BY START_KEY;" + tidbRowID = "_tidb_rowid=" + ) + logger := tctx.L().With(zap.String("database", dbName), zap.String("table", tableName)) + err = simpleQueryWithArgs(conn, func(rows *sql.Rows) error { + rowID++ + err = rows.Scan(&startKey, &decodedKey) + if err != nil { + return errors.Trace(err) + } + // first region's start key has no use. It may come from another table or might be invalid + if rowID == 0 { + return nil + } + if !startKey.Valid { + logger.Debug("meet invalid start key", zap.Int("rowID", rowID)) + return nil + } + if !decodedKey.Valid { + logger.Debug("meet invalid decoded start key", zap.Int("rowID", rowID), zap.String("startKey", startKey.String)) + return nil + } + pkVal, err2 := extractTiDBRowIDFromDecodedKey(tidbRowID, decodedKey.String) + if err2 != nil { + logger.Debug("fail to extract pkVal from decoded start key", + zap.Int("rowID", rowID), zap.String("startKey", startKey.String), zap.String("decodedKey", decodedKey.String), zap.Error(err2)) + } else { + pkVals = append(pkVals, []string{pkVal}) + } + return nil + }, tableRegionSQL, dbName, tableName) + + return pkFields, pkVals, errors.Trace(err) +} + +func selectTiDBPartitionRegion(tctx *tcontext.Context, conn *sql.Conn, dbName, tableName, partition string) (pkVals [][]string, err error) { + var ( + rows *sql.Rows + startKeys []string + ) + const ( + partitionRegionSQL = "SHOW TABLE `%s`.`%s` PARTITION(`%s`) REGIONS" + regionRowKey = "r_" + ) + logger := tctx.L().With(zap.String("database", dbName), zap.String("table", tableName), zap.String("partition", partition)) + rows, err = conn.QueryContext(tctx, fmt.Sprintf(partitionRegionSQL, escapeString(dbName), escapeString(tableName), escapeString(partition))) + if err != nil { + err = errors.Trace(err) + return + } + startKeys, err = GetSpecifiedColumnValueAndClose(rows, "START_KEY") + if err != nil { + return + } + for rowID, startKey := range startKeys { + if rowID == 0 { + continue + } + pkVal, err2 := extractTiDBRowIDFromDecodedKey(regionRowKey, startKey) + if err2 != nil { + logger.Debug("show table region start key doesn't have rowID", + zap.Int("rowID", rowID), zap.String("startKey", startKey), zap.Error(err2)) + } else { + pkVals = append(pkVals, []string{pkVal}) + } + } + + return pkVals, err +} + +func extractTiDBRowIDFromDecodedKey(indexField, key string) (string, error) { + if p := strings.Index(key, indexField); p != -1 { + p += len(indexField) + return key[p:], nil + } + return "", errors.Errorf("decoded key %s doesn't have %s field", key, indexField) +} + func prepareTableListToDump(tctx *tcontext.Context, conf *Config, db *sql.Conn) error { databases, err := prepareDumpingDatabases(conf, db) if err != nil { @@ -968,21 +1137,21 @@ func setSessionParam(d *Dumper) error { if si.ServerType == ServerTypeTiDB && conf.TiDBMemQuotaQuery != UnspecifiedSize { sessionParam[TiDBMemQuotaQueryName] = conf.TiDBMemQuotaQuery } + var err error if snapshot != "" { if si.ServerType != ServerTypeTiDB { return errors.New("snapshot consistency is not supported for this server") } if consistency == consistencyTypeSnapshot { - hasTiKV, err := CheckTiDBWithTiKV(pool) + conf.ServerInfo.HasTiKV, err = CheckTiDBWithTiKV(pool) if err != nil { return err } - if hasTiKV { + if conf.ServerInfo.HasTiKV { sessionParam["tidb_snapshot"] = snapshot } } } - var err error if d.dbHandle, err = resetDBWithSessionParams(d.tctx, pool, conf.GetDSN(""), conf.SessionParams); err != nil { return errors.Trace(err) } diff --git a/v4/export/sql.go b/v4/export/sql.go index 2e160d8b..655d9c2b 100644 --- a/v4/export/sql.go +++ b/v4/export/sql.go @@ -18,6 +18,8 @@ import ( "go.uber.org/zap" ) +const orderByTiDBRowID = "ORDER BY `_tidb_rowid`" + // ShowDatabases shows the databases of a database server. func ShowDatabases(db *sql.Conn) ([]string, error) { var res oneStrColumnTable @@ -186,7 +188,7 @@ func SelectVersion(db *sql.DB) (string, error) { } // SelectAllFromTable dumps data serialized from a specified table -func SelectAllFromTable(conf *Config, db *sql.Conn, meta TableMeta) (TableDataIR, error) { +func SelectAllFromTable(conf *Config, db *sql.Conn, meta TableMeta, partition string) (TableDataIR, error) { database, table := meta.DatabaseName(), meta.TableName() selectedField, selectLen, err := buildSelectField(db, database, table, conf.CompleteInsert) if err != nil { @@ -197,7 +199,7 @@ func SelectAllFromTable(conf *Config, db *sql.Conn, meta TableMeta) (TableDataIR if err != nil { return nil, err } - query := buildSelectQuery(database, table, selectedField, buildWhereCondition(conf, ""), orderByClause) + query := buildSelectQuery(database, table, selectedField, partition, buildWhereCondition(conf, ""), orderByClause) return &tableData{ query: query, @@ -205,7 +207,7 @@ func SelectAllFromTable(conf *Config, db *sql.Conn, meta TableMeta) (TableDataIR }, nil } -func buildSelectQuery(database, table string, fields string, where string, orderByClause string) string { +func buildSelectQuery(database, table, fields, partition, where, orderByClause string) string { var query strings.Builder query.WriteString("SELECT ") if fields == "" { @@ -218,7 +220,12 @@ func buildSelectQuery(database, table string, fields string, where string, order query.WriteString(escapeString(database)) query.WriteString("`.`") query.WriteString(escapeString(table)) - query.WriteString("`") + query.WriteByte('`') + if partition != "" { + query.WriteString(" PARTITION(`") + query.WriteString(escapeString(partition)) + query.WriteString("`)") + } if where != "" { query.WriteString(" ") @@ -243,7 +250,7 @@ func buildOrderByClause(conf *Config, db *sql.Conn, database, table string) (str return "", errors.Trace(err) } if ok { - return "ORDER BY `_tidb_rowid`", nil + return orderByTiDBRowID, nil } } cols, err := GetPrimaryKeyColumns(db, database, table) @@ -433,8 +440,13 @@ func ShowMasterStatus(db *sql.Conn) ([]string, error) { return oneRow, nil } -// GetSpecifiedColumnValue get columns' values whose name is equal to columnName -func GetSpecifiedColumnValue(rows *sql.Rows, columnName string) ([]string, error) { +// GetSpecifiedColumnValueAndClose get columns' values whose name is equal to columnName and close the given rows +func GetSpecifiedColumnValueAndClose(rows *sql.Rows, columnName string) ([]string, error) { + if rows == nil { + return []string{}, nil + } + defer rows.Close() + columnName = strings.ToUpper(columnName) var strs []string columns, _ := rows.Columns() addr := make([]interface{}, len(columns)) @@ -458,7 +470,7 @@ func GetSpecifiedColumnValue(rows *sql.Rows, columnName string) ([]string, error strs = append(strs, oneRow[fieldIndex].String) } } - return strs, nil + return strs, errors.Trace(rows.Err()) } // GetPdAddrs gets PD address from TiDB @@ -470,8 +482,7 @@ func GetPdAddrs(tctx *tcontext.Context, db *sql.DB) ([]string, error) { zap.String("query", query), zap.Error(err)) return []string{}, errors.Annotatef(err, "sql: %s", query) } - defer rows.Close() - return GetSpecifiedColumnValue(rows, "STATUS_ADDRESS") + return GetSpecifiedColumnValueAndClose(rows, "STATUS_ADDRESS") } // GetTiDBDDLIDs gets DDL IDs from TiDB @@ -483,8 +494,7 @@ func GetTiDBDDLIDs(tctx *tcontext.Context, db *sql.DB) ([]string, error) { zap.String("query", query), zap.Error(err)) return []string{}, errors.Annotatef(err, "sql: %s", query) } - defer rows.Close() - return GetSpecifiedColumnValue(rows, "DDL_ID") + return GetSpecifiedColumnValueAndClose(rows, "DDL_ID") } // CheckTiDBWithTiKV use sql to check whether current TiDB has TiKV @@ -615,7 +625,7 @@ func buildSelectField(db *sql.Conn, dbName, tableName string, completeInsert boo } func buildWhereClauses(handleColNames []string, handleVals [][]string) []string { - if len(handleColNames) == 0 { + if len(handleColNames) == 0 || len(handleVals) == 0 { return nil } quotaCols := make([]string, len(handleColNames)) @@ -817,7 +827,7 @@ func simpleQueryWithArgs(conn *sql.Conn, handleOneRow func(*sql.Rows) error, sql } } rows.Close() - return rows.Err() + return errors.Annotatef(rows.Err(), "sql: %s", sql) } func pickupPossibleField(dbName, tableName string, db *sql.Conn, conf *Config) (string, error) { @@ -964,14 +974,14 @@ func buildWhereCondition(conf *Config, where string) string { var query strings.Builder separator := "WHERE" if conf.Where != "" { - query.WriteString(" ") query.WriteString(separator) - query.WriteString(" ") + query.WriteByte(' ') query.WriteString(conf.Where) + query.WriteByte(' ') separator = "AND" + query.WriteByte(' ') } if where != "" { - query.WriteString(" ") query.WriteString(separator) query.WriteString(" ") query.WriteString(where) @@ -982,3 +992,20 @@ func buildWhereCondition(conf *Config, where string) string { func escapeString(s string) string { return strings.ReplaceAll(s, "`", "``") } + +// GetPartitionNames get partition names from a specified table +func GetPartitionNames(db *sql.Conn, schema, table string) (partitions []string, err error) { + partitions = make([]string, 0) + var partitionName sql.NullString + err = simpleQueryWithArgs(db, func(rows *sql.Rows) error { + err := rows.Scan(&partitionName) + if err != nil { + return errors.Trace(err) + } + if partitionName.Valid { + partitions = append(partitions, partitionName.String) + } + return nil + }, "SELECT PARTITION_NAME from INFORMATION_SCHEMA.PARTITIONS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?", schema, table) + return +} diff --git a/v4/export/sql_test.go b/v4/export/sql_test.go index e346fa15..7d8c94ad 100644 --- a/v4/export/sql_test.go +++ b/v4/export/sql_test.go @@ -87,7 +87,7 @@ func (s *testSQLSuite) TestBuildSelectAllQuery(c *C) { selectedField, _, err := buildSelectField(conn, "test", "t", false) c.Assert(err, IsNil) - q := buildSelectQuery("test", "t", selectedField, "", orderByClause) + q := buildSelectQuery("test", "t", selectedField, "", "", orderByClause) c.Assert(q, Equals, "SELECT * FROM `test`.`t` ORDER BY `_tidb_rowid`") // _tidb_rowid is unavailable, or PKIsHandle. @@ -107,7 +107,7 @@ func (s *testSQLSuite) TestBuildSelectAllQuery(c *C) { selectedField, _, err = buildSelectField(conn, "test", "t", false) c.Assert(err, IsNil) - q = buildSelectQuery("test", "t", selectedField, "", orderByClause) + q = buildSelectQuery("test", "t", selectedField, "", "", orderByClause) c.Assert(q, Equals, "SELECT * FROM `test`.`t` ORDER BY `id`") c.Assert(mock.ExpectationsWereMet(), IsNil) @@ -130,7 +130,7 @@ func (s *testSQLSuite) TestBuildSelectAllQuery(c *C) { selectedField, _, err = buildSelectField(conn, "test", "t", false) c.Assert(err, IsNil) - q = buildSelectQuery("test", "t", selectedField, "", orderByClause) + q = buildSelectQuery("test", "t", selectedField, "", "", orderByClause) c.Assert(q, Equals, "SELECT * FROM `test`.`t` ORDER BY `id`", cmt) err = mock.ExpectationsWereMet() c.Assert(err, IsNil, cmt) @@ -154,7 +154,7 @@ func (s *testSQLSuite) TestBuildSelectAllQuery(c *C) { selectedField, _, err = buildSelectField(conn, "test", "t", false) c.Assert(err, IsNil) - q := buildSelectQuery("test", "t", selectedField, "", orderByClause) + q := buildSelectQuery("test", "t", selectedField, "", "", orderByClause) c.Assert(q, Equals, "SELECT * FROM `test`.`t`", cmt) err = mock.ExpectationsWereMet() c.Assert(err, IsNil, cmt) @@ -173,7 +173,7 @@ func (s *testSQLSuite) TestBuildSelectAllQuery(c *C) { selectedField, _, err := buildSelectField(conn, "test", "t", false) c.Assert(err, IsNil) - q := buildSelectQuery("test", "t", selectedField, "", "") + q := buildSelectQuery("test", "t", selectedField, "", "", "") c.Assert(q, Equals, "SELECT * FROM `test`.`t`", cmt) c.Assert(mock.ExpectationsWereMet(), IsNil, cmt) } @@ -198,7 +198,7 @@ func (s *testSQLSuite) TestBuildOrderByClause(c *C) { orderByClause, err := buildOrderByClause(mockConf, conn, "test", "t") c.Assert(err, IsNil) - c.Assert(orderByClause, Equals, "ORDER BY `_tidb_rowid`") + c.Assert(orderByClause, Equals, orderByTiDBRowID) // _tidb_rowid is unavailable, or PKIsHandle. mock.ExpectExec("SELECT _tidb_rowid from `test`.`t`"). @@ -358,11 +358,13 @@ func (s *testSQLSuite) TestGetSuitableRows(c *C) { defer db.Close() conn, err := db.Conn(context.Background()) c.Assert(err, IsNil) - const query = "select AVG_ROW_LENGTH from INFORMATION_SCHEMA.TABLES where table_schema=\\? and table_name=\\?;" tctx, cancel := tcontext.Background().WithCancel() defer cancel() - database := "foo" - table := "bar" + const ( + query = "select AVG_ROW_LENGTH from INFORMATION_SCHEMA.TABLES where table_schema=\\? and table_name=\\?;" + database = "foo" + table = "bar" + ) testCases := []struct { avgRowLength uint64 @@ -409,7 +411,7 @@ func (s *testSQLSuite) TestGetSuitableRows(c *C) { } } -func (s *testSQLSuite) TestBuildWhereClauses(c *C) { +func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) defer db.Close() @@ -423,29 +425,59 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { cancelCtx: cancel, } d.conf.ServerInfo = ServerInfo{ + HasTiKV: true, ServerType: ServerTypeTiDB, ServerVersion: tableSampleVersion, } - database := "foo" - table := "bar" + + const ( + database = "foo" + table = "bar" + ) testCases := []struct { handleColNames []string handleColTypes []string handleVals [][]driver.Value expectedWhereClauses []string + hasTiDBRowID bool }{ { []string{}, []string{}, [][]driver.Value{}, nil, + false, }, { []string{"a"}, []string{"bigint"}, [][]driver.Value{{1}}, []string{"`a`<1", "`a`>=1"}, + false, + }, + // check whether dumpling can turn to dump whole table + { + []string{"a"}, + []string{"bigint"}, + [][]driver.Value{}, + nil, + false, + }, + // check whether dumpling can turn to dump whole table + { + []string{"_tidb_rowid"}, + []string{"bigint"}, + [][]driver.Value{}, + nil, + true, + }, + { + []string{"_tidb_rowid"}, + []string{"bigint"}, + [][]driver.Value{{1}}, + []string{"`_tidb_rowid`<1", "`_tidb_rowid`>=1"}, + true, }, { []string{"a"}, @@ -456,12 +488,14 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { {3}, }, []string{"`a`<1", "`a`>=1 and `a`<2", "`a`>=2 and `a`<3", "`a`>=3"}, + false, }, { []string{"a", "b"}, []string{"bigint", "bigint"}, [][]driver.Value{{1, 2}}, []string{"`a`<1 or(`a`=1 and `b`<2)", "`a`>1 or(`a`=1 and `b`>=2)"}, + false, }, { []string{"a", "b"}, @@ -477,6 +511,7 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { "(`a`>3 and `a`<5)or(`a`=3 and(`b`>=4))or(`a`=5 and(`b`<6))", "`a`>5 or(`a`=5 and `b`>=6)", }, + false, }, { []string{"a", "b", "c"}, @@ -490,6 +525,7 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { "(`a`>1 and `a`<4)or(`a`=1 and(`b`>2 or(`b`=2 and `c`>=3)))or(`a`=4 and(`b`<5 or(`b`=5 and `c`<6)))", "`a`>4 or(`a`=4 and `b`>5)or(`a`=4 and `b`=5 and `c`>=6)", }, + false, }, { []string{"a", "b", "c"}, @@ -503,6 +539,7 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { "`a`=1 and((`b`>2 and `b`<4)or(`b`=2 and(`c`>=3))or(`b`=4 and(`c`<5)))", "`a`>1 or(`a`=1 and `b`>4)or(`a`=1 and `b`=4 and `c`>=5)", }, + false, }, { []string{"a", "b", "c"}, @@ -516,6 +553,7 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { "`a`=1 and `b`=2 and(`c`>=3 and `c`<8)", "`a`>1 or(`a`=1 and `b`>2)or(`a`=1 and `b`=2 and `c`>=8)", }, + false, }, // special case: avoid return same samples { @@ -530,6 +568,7 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { "false", "`a`>1 or(`a`=1 and `b`>2)or(`a`=1 and `b`=2 and `c`>=3)", }, + false, }, // special case: numbers has bigger lexicographically order but lower number { @@ -544,6 +583,7 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { "(`a`>12 and `a`<111)or(`a`=12 and(`b`>2 or(`b`=2 and `c`>=3)))or(`a`=111 and(`b`<4 or(`b`=4 and `c`<5)))", // should return sql correctly "`a`>111 or(`a`=111 and `b`>4)or(`a`=111 and `b`=4 and `c`>=5)", }, + false, }, // test string fields { @@ -558,6 +598,7 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { "`a`=1 and((`b`>2 and `b`<4)or(`b`=2 and(`c`>='3'))or(`b`=4 and(`c`<'5')))", "`a`>1 or(`a`=1 and `b`>4)or(`a`=1 and `b`=4 and `c`>='5')", }, + false, }, { []string{"a", "b", "c", "d"}, @@ -571,6 +612,7 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { "(`a`>1 and `a`<5)or(`a`=1 and(`b`>2 or(`b`=2 and `c`>3)or(`b`=2 and `c`=3 and `d`>=4)))or(`a`=5 and(`b`<6 or(`b`=6 and `c`<7)or(`b`=6 and `c`=7 and `d`<8)))", "`a`>5 or(`a`=5 and `b`>6)or(`a`=5 and `b`=6 and `c`>7)or(`a`=5 and `b`=6 and `c`=7 and `d`>=8)", }, + false, }, } transferHandleValStrings := func(handleColTypes []string, handleVals [][]driver.Value) [][]string { @@ -595,8 +637,8 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { return handleValStrings } - for i, testCase := range testCases { - c.Log(fmt.Sprintf("case #%d", i)) + for caseID, testCase := range testCases { + c.Log(fmt.Sprintf("case #%d", caseID)) handleColNames := testCase.handleColNames handleColTypes := testCase.handleColTypes handleVals := testCase.handleVals @@ -623,44 +665,514 @@ func (s *testSQLSuite) TestBuildWhereClauses(c *C) { }, } + if testCase.hasTiDBRowID { + mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). + WillReturnResult(sqlmock.NewResult(0, 0)) + } else { + mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). + WillReturnError(&mysql.MyError{ + Code: mysql.ER_BAD_FIELD_ERROR, + State: "42S22", + Message: "Unknown column '_tidb_rowid' in 'field list'", + }) + rows := sqlmock.NewRows([]string{"COLUMN_NAME", "DATA_TYPE"}) + for i := range handleColNames { + rows.AddRow(handleColNames[i], handleColTypes[i]) + } + mock.ExpectQuery("SELECT c.COLUMN_NAME, DATA_TYPE FROM").WithArgs(database, table).WillReturnRows(rows) + } + + rows := sqlmock.NewRows(handleColNames) + for _, handleVal := range handleVals { + rows.AddRow(handleVal...) + } + mock.ExpectQuery(fmt.Sprintf("SELECT .* FROM `%s`.`%s` TABLESAMPLE REGIONS", database, table)).WillReturnRows(rows) + + rows = sqlmock.NewRows([]string{"COLUMN_NAME", "EXTRA"}) + for _, handleCol := range handleColNames { + rows.AddRow(handleCol, "") + } + mock.ExpectQuery("SELECT COLUMN_NAME,EXTRA FROM INFORMATION_SCHEMA.COLUMNS").WithArgs(database, table). + WillReturnRows(rows) + // special case, no value found, will scan whole table and try build order clause + if len(handleVals) == 0 { + mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). + WillReturnResult(sqlmock.NewResult(0, 0)) + } + + c.Assert(d.concurrentDumpTable(tctx, conn, meta, taskChan), IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) + orderByClause := buildOrderByClauseString(handleColNames) + + checkQuery := func(i int, query string) { + task := <-taskChan + taskTableData, ok := task.(*TaskTableData) + c.Assert(ok, IsTrue) + c.Assert(taskTableData.ChunkIndex, Equals, i) + data, ok := taskTableData.Data.(*tableData) + c.Assert(ok, IsTrue) + c.Assert(data.query, Equals, query) + } + + // special case, no value found + if len(handleVals) == 0 { + orderByClause = orderByTiDBRowID + query := buildSelectQuery(database, table, "*", "", "", orderByClause) + checkQuery(0, query) + continue + } + + for i, w := range testCase.expectedWhereClauses { + query := buildSelectQuery(database, table, "*", "", buildWhereCondition(d.conf, w), orderByClause) + checkQuery(i, query) + } + } + } +} + +func (s *testSQLSuite) TestBuildPartitionClauses(c *C) { + const ( + dbName = "test" + tbName = "t" + fields = "*" + partition = "p0" + where = "WHERE a > 10" + orderByClause = "ORDER BY a" + ) + testCases := []struct { + partition string + where string + orderByClause string + expectedQuery string + }{ + { + "", + "", + "", + "SELECT * FROM `test`.`t`", + }, + { + partition, + "", + "", + "SELECT * FROM `test`.`t` PARTITION(`p0`)", + }, + { + partition, + where, + "", + "SELECT * FROM `test`.`t` PARTITION(`p0`) WHERE a > 10", + }, + { + partition, + "", + orderByClause, + "SELECT * FROM `test`.`t` PARTITION(`p0`) ORDER BY a", + }, + { + partition, + where, + orderByClause, + "SELECT * FROM `test`.`t` PARTITION(`p0`) WHERE a > 10 ORDER BY a", + }, + { + "", + where, + orderByClause, + "SELECT * FROM `test`.`t` WHERE a > 10 ORDER BY a", + }, + } + for _, testCase := range testCases { + query := buildSelectQuery(dbName, tbName, fields, testCase.partition, testCase.where, testCase.orderByClause) + c.Assert(query, Equals, testCase.expectedQuery) + } +} + +func (s *testSQLSuite) TestBuildRegionQueriesWithoutPartition(c *C) { + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + defer db.Close() + conn, err := db.Conn(context.Background()) + c.Assert(err, IsNil) + tctx, cancel := tcontext.Background().WithLogger(appLogger).WithCancel() + + d := &Dumper{ + tctx: tctx, + conf: DefaultConfig(), + cancelCtx: cancel, + } + d.conf.ServerInfo = ServerInfo{ + HasTiKV: true, + ServerType: ServerTypeTiDB, + ServerVersion: gcSafePointVersion, + } + database := "foo" + table := "bar" + + testCases := []struct { + regionResults [][]driver.Value + handleColNames []string + handleColTypes []string + expectedWhereClauses []string + hasTiDBRowID bool + }{ + { + [][]driver.Value{ + {"7480000000000000FF3300000000000000F8", "7480000000000000FF3300000000000000F8"}, + }, + []string{"a"}, + []string{"bigint"}, + []string{ + "", + }, + false, + }, + { + [][]driver.Value{ + {"7480000000000000FF3300000000000000F8", "7480000000000000FF3300000000000000F8"}, + }, + []string{"_tidb_rowid"}, + []string{"bigint"}, + []string{ + "", + }, + true, + }, + { + [][]driver.Value{ + {"7480000000000000FF3300000000000000F8", "7480000000000000FF3300000000000000F8"}, + {"7480000000000000FF335F728000000000FF0EA6010000000000FA", "tableID=51, _tidb_rowid=960001"}, + {"7480000000000000FF335F728000000000FF1D4C010000000000FA", "tableID=51, _tidb_rowid=1920001"}, + {"7480000000000000FF335F728000000000FF2BF2010000000000FA", "tableID=51, _tidb_rowid=2880001"}, + }, + []string{"a"}, + []string{"bigint"}, + []string{ + "`a`<960001", + "`a`>=960001 and `a`<1920001", + "`a`>=1920001 and `a`<2880001", + "`a`>=2880001", + }, + false, + }, + { + [][]driver.Value{ + {"7480000000000000FF3300000000000000F8", "7480000000000000FF3300000000000000F8"}, + {"7480000000000000FF335F728000000000FF0EA6010000000000FA", "tableID=51, _tidb_rowid=960001"}, + // one invalid key + {"7520000000000000FF335F728000000000FF0EA6010000000000FA", "7520000000000000FF335F728000000000FF0EA6010000000000FA"}, + {"7480000000000000FF335F728000000000FF1D4C010000000000FA", "tableID=51, _tidb_rowid=1920001"}, + {"7480000000000000FF335F728000000000FF2BF2010000000000FA", "tableID=51, _tidb_rowid=2880001"}, + }, + []string{"_tidb_rowid"}, + []string{"bigint"}, + []string{ + "`_tidb_rowid`<960001", + "`_tidb_rowid`>=960001 and `_tidb_rowid`<1920001", + "`_tidb_rowid`>=1920001 and `_tidb_rowid`<2880001", + "`_tidb_rowid`>=2880001", + }, + true, + }, + } + + for caseID, testCase := range testCases { + c.Log(fmt.Sprintf("case #%d", caseID)) + handleColNames := testCase.handleColNames + handleColTypes := testCase.handleColTypes + regionResults := testCase.regionResults + + // Test build tasks through table region + taskChan := make(chan Task, 128) + quotaCols := make([]string, 0, len(handleColNames)) + for _, col := range quotaCols { + quotaCols = append(quotaCols, wrapBackTicks(col)) + } + selectFields := strings.Join(quotaCols, ",") + meta := &tableMeta{ + database: database, + table: table, + selectedField: selectFields, + specCmts: []string{ + "/*!40101 SET NAMES binary*/;", + }, + } + + mock.ExpectQuery("SELECT PARTITION_NAME from INFORMATION_SCHEMA.PARTITIONS"). + WithArgs(database, table).WillReturnRows(sqlmock.NewRows([]string{"PARTITION_NAME"}).AddRow(nil)) + + if testCase.hasTiDBRowID { + mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). + WillReturnResult(sqlmock.NewResult(0, 0)) + } else { + mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). + WillReturnError(&mysql.MyError{ + Code: mysql.ER_BAD_FIELD_ERROR, + State: "42S22", + Message: "Unknown column '_tidb_rowid' in 'field list'", + }) rows := sqlmock.NewRows([]string{"COLUMN_NAME", "DATA_TYPE"}) for i := range handleColNames { rows.AddRow(handleColNames[i], handleColTypes[i]) } mock.ExpectQuery("SELECT c.COLUMN_NAME, DATA_TYPE FROM").WithArgs(database, table).WillReturnRows(rows) + } + + rows := sqlmock.NewRows([]string{"START_KEY", "tidb_decode_key(START_KEY)"}) + for _, regionResult := range regionResults { + rows.AddRow(regionResult...) + } + mock.ExpectQuery("SELECT START_KEY,tidb_decode_key\\(START_KEY\\) from INFORMATION_SCHEMA.TIKV_REGION_STATUS"). + WithArgs(database, table).WillReturnRows(rows) + + rows = sqlmock.NewRows([]string{"COLUMN_NAME", "EXTRA"}) + for _, handleCol := range handleColNames { + rows.AddRow(handleCol, "") + } + mock.ExpectQuery("SELECT COLUMN_NAME,EXTRA FROM INFORMATION_SCHEMA.COLUMNS").WithArgs(database, table). + WillReturnRows(rows) + + orderByClause := buildOrderByClauseString(handleColNames) + // special case, no enough value to split chunks + if len(regionResults) <= 1 { + mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). + WillReturnResult(sqlmock.NewResult(0, 0)) + orderByClause = orderByTiDBRowID + } + c.Assert(d.concurrentDumpTable(tctx, conn, meta, taskChan), IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) + + for i, w := range testCase.expectedWhereClauses { + query := buildSelectQuery(database, table, "*", "", buildWhereCondition(d.conf, w), orderByClause) + task := <-taskChan + taskTableData, ok := task.(*TaskTableData) + c.Assert(ok, IsTrue) + c.Assert(taskTableData.ChunkIndex, Equals, i) + data, ok := taskTableData.Data.(*tableData) + c.Assert(ok, IsTrue) + c.Assert(data.query, Equals, query) + } + } +} + +func (s *testSQLSuite) TestBuildRegionQueriesWithPartitions(c *C) { + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + defer db.Close() + conn, err := db.Conn(context.Background()) + c.Assert(err, IsNil) + tctx, cancel := tcontext.Background().WithLogger(appLogger).WithCancel() + + d := &Dumper{ + tctx: tctx, + conf: DefaultConfig(), + cancelCtx: cancel, + } + d.conf.ServerInfo = ServerInfo{ + HasTiKV: true, + ServerType: ServerTypeTiDB, + ServerVersion: gcSafePointVersion, + } + database := "foo" + table := "bar" + partitions := []string{"p0", "p1", "p2"} + + testCases := []struct { + regionResults [][][]driver.Value + handleColNames []string + handleColTypes []string + expectedWhereClauses [][]string + hasTiDBRowID bool + dumpWholeTable bool + }{ + { + [][][]driver.Value{ + { + {6009, "t_121_i_1_0380000000000ea6010380000000000ea601", "t_121_", 6010, 1, 6010, 0, 0, 0, 74, 1052002}, + {6011, "t_121_", "t_121_i_1_0380000000000ea6010380000000000ea601", 6012, 1, 6012, 0, 0, 0, 68, 972177}, + }, + { + {6015, "t_122_i_1_0380000000002d2a810380000000002d2a81", "t_122_", 6016, 1, 6016, 0, 0, 0, 77, 1092962}, + {6017, "t_122_", "t_122_i_1_0380000000002d2a810380000000002d2a81", 6018, 1, 6018, 0, 0, 0, 66, 939975}, + }, + { + {6021, "t_123_i_1_0380000000004baf010380000000004baf01", "t_123_", 6022, 1, 6022, 0, 0, 0, 85, 1206726}, + {6023, "t_123_", "t_123_i_1_0380000000004baf010380000000004baf01", 6024, 1, 6024, 0, 0, 0, 65, 927576}, + }, + }, + []string{"_tidb_rowid"}, + []string{"bigint"}, + [][]string{ + {""}, {""}, {""}, + }, + true, + true, + }, + { + [][][]driver.Value{ + { + {6009, "t_121_i_1_0380000000000ea6010380000000000ea601", "t_121_r_10001", 6010, 1, 6010, 0, 0, 0, 74, 1052002}, + {6013, "t_121_r_10001", "t_121_r_970001", 6014, 1, 6014, 0, 0, 0, 75, 975908}, + {6003, "t_121_r_970001", "t_122_", 6004, 1, 6004, 0, 0, 0, 79, 1022285}, + {6011, "t_121_", "t_121_i_1_0380000000000ea6010380000000000ea601", 6012, 1, 6012, 0, 0, 0, 68, 972177}, + }, + { + {6015, "t_122_i_1_0380000000002d2a810380000000002d2a81", "t_122_r_2070760", 6016, 1, 6016, 0, 0, 0, 77, 1092962}, + {6019, "t_122_r_2070760", "t_122_r_3047115", 6020, 1, 6020, 0, 0, 0, 75, 959650}, + {6005, "t_122_r_3047115", "t_123_", 6006, 1, 6006, 0, 0, 0, 77, 992339}, + {6017, "t_122_", "t_122_i_1_0380000000002d2a810380000000002d2a81", 6018, 1, 6018, 0, 0, 0, 66, 939975}, + }, + { + {6021, "t_123_i_1_0380000000004baf010380000000004baf01", "t_123_r_4186953", 6022, 1, 6022, 0, 0, 0, 85, 1206726}, + {6025, "t_123_r_4186953", "t_123_r_5165682", 6026, 1, 6026, 0, 0, 0, 74, 951379}, + {6007, "t_123_r_5165682", "t_124_", 6008, 1, 6008, 0, 0, 0, 71, 918488}, + {6023, "t_123_", "t_123_i_1_0380000000004baf010380000000004baf01", 6024, 1, 6024, 0, 0, 0, 65, 927576}, + }, + }, + []string{"_tidb_rowid"}, + []string{"bigint"}, + [][]string{ + { + "`_tidb_rowid`<10001", + "`_tidb_rowid`>=10001 and `_tidb_rowid`<970001", + "`_tidb_rowid`>=970001", + }, + { + "`_tidb_rowid`<2070760", + "`_tidb_rowid`>=2070760 and `_tidb_rowid`<3047115", + "`_tidb_rowid`>=3047115", + }, + { + "`_tidb_rowid`<4186953", + "`_tidb_rowid`>=4186953 and `_tidb_rowid`<5165682", + "`_tidb_rowid`>=5165682", + }, + }, + true, + false, + }, + { + [][][]driver.Value{ + { + {6041, "t_134_", "t_134_r_960001", 6042, 1, 6042, 0, 0, 0, 69, 964987}, + {6035, "t_134_r_960001", "t_135_", 6036, 1, 6036, 0, 0, 0, 75, 1052130}, + }, + { + {6043, "t_135_", "t_135_r_2960001", 6044, 1, 6044, 0, 0, 0, 69, 969576}, + {6037, "t_135_r_2960001", "t_136_", 6038, 1, 6038, 0, 0, 0, 72, 1014464}, + }, + { + {6045, "t_136_", "t_136_r_4960001", 6046, 1, 6046, 0, 0, 0, 68, 957557}, + {6039, "t_136_r_4960001", "t_137_", 6040, 1, 6040, 0, 0, 0, 75, 1051579}, + }, + }, + []string{"a"}, + []string{"bigint"}, + [][]string{ + + { + "`a`<960001", + "`a`>=960001", + }, + { + "`a`<2960001", + "`a`>=2960001", + }, + { + "`a`<4960001", + "`a`>=4960001", + }, + }, + false, + false, + }, + } + + for i, testCase := range testCases { + c.Log(fmt.Sprintf("case #%d", i)) + handleColNames := testCase.handleColNames + handleColTypes := testCase.handleColTypes + regionResults := testCase.regionResults + + // Test build tasks through table region + taskChan := make(chan Task, 128) + quotaCols := make([]string, 0, len(handleColNames)) + for _, col := range quotaCols { + quotaCols = append(quotaCols, wrapBackTicks(col)) + } + selectFields := strings.Join(quotaCols, ",") + meta := &tableMeta{ + database: database, + table: table, + selectedField: selectFields, + specCmts: []string{ + "/*!40101 SET NAMES binary*/;", + }, + } + + rows := sqlmock.NewRows([]string{"PARTITION_NAME"}) + for _, partition := range partitions { + rows.AddRow(partition) + } + mock.ExpectQuery("SELECT PARTITION_NAME from INFORMATION_SCHEMA.PARTITIONS"). + WithArgs(database, table).WillReturnRows(rows) + + if testCase.hasTiDBRowID { + mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). + WillReturnResult(sqlmock.NewResult(0, 0)) + } else { mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). WillReturnError(&mysql.MyError{ Code: mysql.ER_BAD_FIELD_ERROR, State: "42S22", Message: "Unknown column '_tidb_rowid' in 'field list'", }) + rows = sqlmock.NewRows([]string{"COLUMN_NAME", "DATA_TYPE"}) + for i := range handleColNames { + rows.AddRow(handleColNames[i], handleColTypes[i]) + } + mock.ExpectQuery("SELECT c.COLUMN_NAME, DATA_TYPE FROM").WithArgs(database, table).WillReturnRows(rows) + } - rows = sqlmock.NewRows(handleColNames) - for _, handleVal := range handleVals { - rows.AddRow(handleVal...) + for i, partition := range partitions { + rows = sqlmock.NewRows([]string{"REGION_ID", "START_KEY", "END_KEY", "LEADER_ID", "LEADER_STORE_ID", "PEERS", "SCATTERING", "WRITTEN_BYTES", "READ_BYTES", "APPROXIMATE_SIZE(MB)", "APPROXIMATE_KEYS"}) + for _, regionResult := range regionResults[i] { + rows.AddRow(regionResult...) } - mock.ExpectQuery(fmt.Sprintf("SELECT .* FROM `%s`.`%s` TABLESAMPLE REGIONS", database, table)).WillReturnRows(rows) + mock.ExpectQuery(fmt.Sprintf("SHOW TABLE `%s`.`%s` PARTITION\\(`%s`\\) REGIONS", escapeString(database), escapeString(table), escapeString(partition))). + WillReturnRows(rows) + } + for range partitions { rows = sqlmock.NewRows([]string{"COLUMN_NAME", "EXTRA"}) for _, handleCol := range handleColNames { rows.AddRow(handleCol, "") } mock.ExpectQuery("SELECT COLUMN_NAME,EXTRA FROM INFORMATION_SCHEMA.COLUMNS").WithArgs(database, table). WillReturnRows(rows) + // special case, dump whole table + if testCase.dumpWholeTable { + mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). + WillReturnResult(sqlmock.NewResult(0, 0)) + } + } - c.Assert(d.concurrentDumpTable(tctx, conn, meta, taskChan), IsNil) - c.Assert(mock.ExpectationsWereMet(), IsNil) - orderByClause := buildOrderByClauseString(handleColNames) + orderByClause := buildOrderByClauseString(handleColNames) + c.Assert(d.concurrentDumpTable(tctx, conn, meta, taskChan), IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) - for i, w := range testCase.expectedWhereClauses { - query := buildSelectQuery(database, table, "*", buildWhereCondition(d.conf, w), orderByClause) + chunkIdx := 0 + for i, partition := range partitions { + for _, w := range testCase.expectedWhereClauses[i] { + query := buildSelectQuery(database, table, "*", partition, buildWhereCondition(d.conf, w), orderByClause) task := <-taskChan taskTableData, ok := task.(*TaskTableData) c.Assert(ok, IsTrue) - c.Assert(taskTableData.ChunkIndex, Equals, i) + c.Assert(taskTableData.ChunkIndex, Equals, chunkIdx) data, ok := taskTableData.Data.(*tableData) c.Assert(ok, IsTrue) c.Assert(data.query, Equals, query) + chunkIdx++ } } } diff --git a/v4/export/sql_type.go b/v4/export/sql_type.go index 6e117f08..69450961 100644 --- a/v4/export/sql_type.go +++ b/v4/export/sql_type.go @@ -17,38 +17,49 @@ var ( doubleQuotationMark = []byte{'"'} ) +// There are two kinds of scenes to use this dataType +// The first is to be the receiver of table sample, which will use tidb's INFORMATION_SCHEMA.COLUMNS's DATA_TYPE column, which is from +// https://github.com/pingcap/tidb/blob/619c4720059ea619081b01644ef3084b426d282f/executor/infoschema_reader.go#L654 +// https://github.com/pingcap/parser/blob/8e8ed7927bde11c4cf0967afc5e05ab5aeb14cc7/types/etc.go#L44-70 +// The second is to be the receiver of select row type, which will use sql.DB's rows.DatabaseTypeName(), which is from +// https://github.com/go-sql-driver/mysql/blob/v1.5.0/fields.go#L17-97 func initColTypeRowReceiverMap() { - for _, s := range dataTypeString { + dataTypeStringArr := []string{ + "CHAR", "NCHAR", "VARCHAR", "NVARCHAR", "CHARACTER", "VARCHARACTER", + "TIMESTAMP", "DATETIME", "DATE", "TIME", "YEAR", "SQL_TSI_YEAR", + "TEXT", "TINYTEXT", "MEDIUMTEXT", "LONGTEXT", + "ENUM", "SET", "JSON", "NULL", "VAR_STRING", + } + + dataTypeNumArr := []string{ + "INTEGER", "BIGINT", "TINYINT", "SMALLINT", "MEDIUMINT", + "INT", "INT1", "INT2", "INT3", "INT8", + "FLOAT", "REAL", "DOUBLE", "DOUBLE PRECISION", + "DECIMAL", "NUMERIC", "FIXED", + "BOOL", "BOOLEAN", + } + + dataTypeBinArr := []string{ + "BLOB", "TINYBLOB", "MEDIUMBLOB", "LONGBLOB", "LONG", + "BINARY", "VARBINARY", + "BIT", "GEOMETRY", + } + + for _, s := range dataTypeStringArr { + dataTypeString[s] = struct{}{} colTypeRowReceiverMap[s] = SQLTypeStringMaker } - for _, s := range dataTypeNum { + for _, s := range dataTypeNumArr { + dataTypeNum[s] = struct{}{} colTypeRowReceiverMap[s] = SQLTypeNumberMaker } - for _, s := range dataTypeBin { + for _, s := range dataTypeBinArr { + dataTypeBin[s] = struct{}{} colTypeRowReceiverMap[s] = SQLTypeBytesMaker } } -var dataTypeString = []string{ - "CHAR", "NCHAR", "VARCHAR", "NVARCHAR", "CHARACTER", "VARCHARACTER", - "TIMESTAMP", "DATETIME", "DATE", "TIME", "YEAR", "SQL_TSI_YEAR", - "TEXT", "TINYTEXT", "MEDIUMTEXT", "LONGTEXT", - "ENUM", "SET", "JSON", -} - -var dataTypeNum = []string{ - "INTEGER", "BIGINT", "TINYINT", "SMALLINT", "MEDIUMINT", - "INT", "INT1", "INT2", "INT3", "INT8", - "FLOAT", "REAL", "DOUBLE", "DOUBLE PRECISION", - "DECIMAL", "NUMERIC", "FIXED", - "BOOL", "BOOLEAN", -} - -var dataTypeBin = []string{ - "BLOB", "TINYBLOB", "MEDIUMBLOB", "LONGBLOB", "LONG", - "BINARY", "VARBINARY", - "BIT", -} +var dataTypeString, dataTypeNum, dataTypeBin = make(map[string]struct{}), make(map[string]struct{}), make(map[string]struct{}) func escapeBackslashSQL(s []byte, bf *bytes.Buffer) { var (