Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
config,mydumper: replace black-white-list by table-filter
Browse files Browse the repository at this point in the history
  • Loading branch information
kennytm committed Jun 16, 2020
1 parent 368d52e commit f333f5a
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 57 deletions.
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ go 1.13
require (
github.com/BurntSushi/toml v0.3.1
github.com/DATA-DOG/go-sqlmock v1.4.1
github.com/carlmjohnson/flagext v0.0.11
github.com/cockroachdb/pebble v0.0.0-20200601233547-7956a7440a70
github.com/coreos/go-semver v0.3.0
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 // indirect
github.com/go-bindata/go-bindata v3.1.2+incompatible // indirect
github.com/go-sql-driver/mysql v1.5.0
github.com/gogo/protobuf v1.3.1
github.com/golang/mock v1.4.3
github.com/jeremywohl/flatten v0.0.0-20190921043622-d936035e55cf // indirect
github.com/joho/sqltocsv v0.0.0-20190824231449-5650f27fd5b6
github.com/pingcap/br v0.0.0-20200521085655-53201addd4ad
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
Expand All @@ -23,7 +22,7 @@ require (
github.com/pingcap/parser v0.0.0-20200522094936-3b720a0512a6
github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181
github.com/pingcap/tidb v1.1.0-beta.0.20200527030457-572bba0499e1
github.com/pingcap/tidb-tools v4.0.0-rc.1.0.20200514040632-f76b3e428e19+incompatible
github.com/pingcap/tidb-tools v4.0.1+incompatible
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/satori/go.uuid v1.2.0
Expand Down
92 changes: 76 additions & 16 deletions go.sum

Large diffs are not rendered by default.

28 changes: 19 additions & 9 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pingcap/tidb-lightning/lightning/log"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb-tools/pkg/table-filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
tidbcfg "github.com/pingcap/tidb/config"
"go.uber.org/zap"
Expand Down Expand Up @@ -91,12 +91,13 @@ type Config struct {

Checkpoint Checkpoint `toml:"checkpoint" json:"checkpoint"`
Mydumper MydumperRuntime `toml:"mydumper" json:"mydumper"`
BWList *filter.Rules `toml:"black-white-list" json:"black-white-list"`
TikvImporter TikvImporter `toml:"tikv-importer" json:"tikv-importer"`
PostRestore PostRestore `toml:"post-restore" json:"post-restore"`
Cron Cron `toml:"cron" json:"cron"`
Routes []*router.TableRule `toml:"routes" json:"routes"`
Security Security `toml:"security" json:"security"`

BWList filter.MySQLReplicationRules `toml:"black-white-list" json:"black-white-list"`
}

func (c *Config) String() string {
Expand Down Expand Up @@ -149,6 +150,7 @@ type MydumperRuntime struct {
CaseSensitive bool `toml:"case-sensitive" json:"case-sensitive"`
StrictFormat bool `toml:"strict-format" json:"strict-format"`
MaxRegionSize int64 `toml:"max-region-size" json:"max-region-size"`
Filter []string `toml:"filter" json:"filter"`
}

type TikvImporter struct {
Expand Down Expand Up @@ -256,6 +258,7 @@ func NewConfig() *Config {
},
StrictFormat: false,
MaxRegionSize: MaxRegionSize,
Filter: []string{"*.*"},
},
TikvImporter: TikvImporter{
Backend: BackendImporter,
Expand All @@ -268,7 +271,6 @@ func NewConfig() *Config {
Checksum: true,
Analyze: true,
},
BWList: &filter.Rules{},
}
}

Expand All @@ -286,6 +288,7 @@ func (cfg *Config) LoadFromGlobal(global *GlobalConfig) error {
cfg.TiDB.PdAddr = global.TiDB.PdAddr
cfg.Mydumper.SourceDir = global.Mydumper.SourceDir
cfg.Mydumper.NoSchema = global.Mydumper.NoSchema
cfg.Mydumper.Filter = global.Mydumper.Filter
cfg.TikvImporter.Addr = global.TikvImporter.Addr
cfg.TikvImporter.Backend = global.TikvImporter.Backend
cfg.TikvImporter.SortedKVDir = global.TikvImporter.SortedKVDir
Expand Down Expand Up @@ -454,12 +457,13 @@ func (cfg *Config) Adjust() error {
return errors.Errorf("invalid config: unsupported `tidb.tls` config %s", cfg.TiDB.TLS)
}

cfg.BWList.IgnoreDBs = append(cfg.BWList.IgnoreDBs,
"mysql",
"information_schema",
"performance_schema",
"sys",
)
// mydumper.filter and black-white-list cannot co-exist.
if cfg.HasLegacyBlackWhiteList() {
log.L().Warn("the config `black-white-list` has been deprecated, please replace with `mydumper.filter`")
if !(len(cfg.Mydumper.Filter) == 1 && cfg.Mydumper.Filter[0] == "*.*") {
return errors.New("invalid config: `mydumper.filter` and `black-white-list` cannot be simultaneously defined")
}
}

for _, rule := range cfg.Routes {
if !cfg.Mydumper.CaseSensitive {
Expand Down Expand Up @@ -538,3 +542,9 @@ func (cfg *Config) Adjust() error {

return nil
}

// HasLegacyBlackWhiteList checks whether the deprecated [black-white-list] section
// was defined.
func (cfg *Config) HasLegacyBlackWhiteList() bool {
return len(cfg.BWList.DoTables) != 0 || len(cfg.BWList.DoDBs) != 0 || len(cfg.BWList.IgnoreTables) != 0 || len(cfg.BWList.IgnoreDBs) != 0
}
19 changes: 15 additions & 4 deletions lightning/config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/carlmjohnson/flagext"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pingcap/tidb-lightning/lightning/log"
Expand Down Expand Up @@ -49,8 +50,9 @@ type GlobalTiDB struct {
}

type GlobalMydumper struct {
SourceDir string `toml:"data-source-dir" json:"data-source-dir"`
NoSchema bool `toml:"no-schema" json:"no-schema"`
SourceDir string `toml:"data-source-dir" json:"data-source-dir"`
NoSchema bool `toml:"no-schema" json:"no-schema"`
Filter []string `toml:"filter" json:"filter"`
}

type GlobalImporter struct {
Expand Down Expand Up @@ -95,6 +97,9 @@ func NewGlobalConfig() *GlobalConfig {
StatusPort: 10080,
LogLevel: "error",
},
Mydumper: GlobalMydumper{
Filter: []string{"*.*"},
},
TikvImporter: GlobalImporter{
Backend: "importer",
},
Expand Down Expand Up @@ -136,7 +141,7 @@ func LoadGlobalConfig(args []string, extraFlags func(*flag.FlagSet)) (*GlobalCon
fs.StringVar(&configFilePath, "config", "", "tidb-lightning configuration file")
printVersion := fs.Bool("V", false, "print version of lightning")

logLevel := fs.String("L", "", `log level: info, debug, warn, error, fatal (default "info")`)
logLevel := flagext.ChoiceVar(fs, "L", "info", `log level: info, debug, warn, error, fatal`, "info", "debug", "warn", "warning", "error", "fatal")
logFilePath := fs.String("log-file", timestampLogFileName(), "log file path")
tidbHost := fs.String("tidb-host", "", "TiDB server host")
tidbPort := fs.Int("tidb-port", 0, "TiDB server port (default 4000)")
Expand All @@ -146,7 +151,7 @@ func LoadGlobalConfig(args []string, extraFlags func(*flag.FlagSet)) (*GlobalCon
pdAddr := fs.String("pd-urls", "", "PD endpoint address")
dataSrcPath := fs.String("d", "", "Directory of the dump to import")
importerAddr := fs.String("importer", "", "address (host:port) to connect to tikv-importer")
backend := fs.String("backend", "", `delivery backend ("importer" or "tidb" or "local")`)
backend := flagext.ChoiceVar(fs, "backend", "importer", `delivery backend ("importer" or "tidb" or "local")`, "importer", "tidb", "local")
sortedKVDir := fs.String("sorted-kv-dir", "", "path for KV pairs when local backend enabled")
enableCheckpoint := fs.Bool("enable-checkpoint", true, "whether to enable checkpoints")
noSchema := fs.Bool("no-schema", false, "ignore schema files, get schema directly from TiDB instead")
Expand All @@ -160,6 +165,9 @@ func LoadGlobalConfig(args []string, extraFlags func(*flag.FlagSet)) (*GlobalCon
statusAddr := fs.String("status-addr", "", "the Lightning server address")
serverMode := fs.Bool("server-mode", false, "start Lightning in server mode, wait for multiple tasks instead of starting immediately")

var filter []string
flagext.StringsVar(fs, &filter, "f", "select tables to import")

if extraFlags != nil {
extraFlags(fs)
}
Expand Down Expand Up @@ -252,6 +260,9 @@ func LoadGlobalConfig(args []string, extraFlags func(*flag.FlagSet)) (*GlobalCon
if *tlsKeyPath != "" {
cfg.Security.KeyPath = *tlsKeyPath
}
if len(filter) > 0 {
cfg.Mydumper.Filter = filter
}

if cfg.App.StatusAddr == "" && cfg.App.ServerMode {
return nil, errors.New("If server-mode is enabled, the status-addr must be a valid listen address")
Expand Down
10 changes: 8 additions & 2 deletions lightning/lightning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ func (s *lightningSuite) TestRun(c *C) {
c.Assert(err, ErrorMatches, ".*mydumper dir does not exist")

err = lightning.run(&config.Config{
Mydumper: config.MydumperRuntime{SourceDir: "."},
Mydumper: config.MydumperRuntime{
SourceDir: ".",
Filter: []string{"*.*"},
},
Checkpoint: config.Checkpoint{
Enable: true,
Driver: "invalid",
Expand All @@ -67,7 +70,10 @@ func (s *lightningSuite) TestRun(c *C) {
c.Assert(err, ErrorMatches, "Unknown checkpoint driver invalid")

err = lightning.run(&config.Config{
Mydumper: config.MydumperRuntime{SourceDir: "."},
Mydumper: config.MydumperRuntime{
SourceDir: ".",
Filter: []string{"*.*"},
},
Checkpoint: config.Checkpoint{
Enable: true,
Driver: "file",
Expand Down
22 changes: 17 additions & 5 deletions lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pingcap/tidb-lightning/lightning/config"
"github.com/pingcap/tidb-lightning/lightning/log"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb-tools/pkg/table-filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -64,7 +64,7 @@ type MDLoader struct {
dir string
noSchema bool
dbs []*MDDatabaseMeta
filter *filter.Filter
filter filter.Filter
router *router.Table
charSet string
}
Expand All @@ -80,18 +80,27 @@ type mdLoaderSetup struct {

func NewMyDumpLoader(cfg *config.Config) (*MDLoader, error) {
var r *router.Table
var err error
if len(cfg.Routes) > 0 {
var err error
r, err = router.NewTableRouter(cfg.Mydumper.CaseSensitive, cfg.Routes)
if err != nil {
return nil, errors.Trace(err)
}
}

f, err := filter.New(false, cfg.BWList)
// use the legacy black-white-list if defined. otherwise use the new filter.
var f filter.Filter
if cfg.HasLegacyBlackWhiteList() {
f, err = filter.ParseMySQLReplicationRules(&cfg.BWList)
} else {
f, err = filter.Parse(cfg.Mydumper.Filter)
}
if err != nil {
return nil, err
}
if !cfg.Mydumper.CaseSensitive {
f = filter.CaseInsensitive(f)
}

mdl := &MDLoader{
dir: cfg.Mydumper.SourceDir,
Expand Down Expand Up @@ -297,7 +306,10 @@ func (s *mdLoaderSetup) listFiles(dir string) error {
}

func (l *MDLoader) shouldSkip(table *filter.Table) bool {
return len(l.filter.ApplyOn([]*filter.Table{table})) == 0
if len(table.Name) == 0 {
return !l.filter.MatchSchema(table.Schema)
}
return !l.filter.MatchTable(table.Schema, table.Name)
}

func (s *mdLoaderSetup) route() error {
Expand Down
15 changes: 12 additions & 3 deletions lightning/mydump/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,17 @@ type testMydumpLoaderSuite struct {
func (s *testMydumpLoaderSuite) SetUpSuite(c *C) {}
func (s *testMydumpLoaderSuite) TearDownSuite(c *C) {}

func newConfigWithSourceDir(sourceDir string) *config.Config {
return &config.Config{
Mydumper: config.MydumperRuntime{
SourceDir: sourceDir,
Filter: []string{"*.*"},
},
}
}

func (s *testMydumpLoaderSuite) SetUpTest(c *C) {
s.cfg = &config.Config{Mydumper: config.MydumperRuntime{SourceDir: c.MkDir()}}
s.cfg = newConfigWithSourceDir(c.MkDir())
}

func (s *testMydumpLoaderSuite) touch(c *C, filename ...string) string {
Expand All @@ -59,11 +68,11 @@ func (s *testMydumpLoaderSuite) mkdir(c *C, dirname string) {
}

func (s *testMydumpLoaderSuite) TestLoader(c *C) {
cfg := &config.Config{Mydumper: config.MydumperRuntime{SourceDir: "./not-exists"}}
cfg := newConfigWithSourceDir("./not-exists")
_, err := md.NewMyDumpLoader(cfg)
c.Assert(err, NotNil)

cfg = &config.Config{Mydumper: config.MydumperRuntime{SourceDir: "./examples"}}
cfg = newConfigWithSourceDir("./examples")
mdl, err := md.NewMyDumpLoader(cfg)
c.Assert(err, IsNil)

Expand Down
3 changes: 2 additions & 1 deletion lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func getFileSize(file string) (int64, error) {
TODO : test with specified 'regionBlockSize' ...
*/
func (s *testMydumpRegionSuite) TestTableRegion(c *C) {
cfg := &config.Config{Mydumper: config.MydumperRuntime{SourceDir: "./examples"}}
cfg := newConfigWithSourceDir("./examples")
loader, _ := NewMyDumpLoader(cfg)
dbMeta := loader.GetDatabases()[0]

Expand Down Expand Up @@ -216,6 +216,7 @@ func (s *testMydumpRegionSuite) TestSplitLargeFile(c *C) {
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
},
}
filePath := "./csv/split_large_file.csv"
Expand Down
18 changes: 4 additions & 14 deletions tidb-lightning.toml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ strict-format = false
# will restore in parallel. The size of each chunk is `max-region-size`, where the default is 256 MiB.
#max-region-size = 268_435_456

# only import tables if the wildcard rules are matched. See documention for details.
filter = ['*.*']

# CSV files are imported according to MySQL's LOAD DATA INFILE rules.
[mydumper.csv]
# separator between fields, should be an ASCII character.
Expand Down Expand Up @@ -210,22 +213,9 @@ switch-mode = "5m"
# the duration which the an import progress will be printed to the log.
log-progress = "5m"

## Table filter options. See the documentation for details
# [black-white-list]
# do-dbs = ["patterns"]
# ignore-dbs = ["patterns"]
#
# [[black-white-list.do-tables]]
# db-name = "db-pattern"
# table-name = "table-pattern"
#
# [[black-white-list.ignore-tables]]
# db-name = "db-pattern"
# table-name = "table-pattern"

## Rules to rename existing databases/tables and possibly merge them together.
## The patterns support wildcards with `*` and `?`.
## Routes are applied _after_ black-white-list.
## Routes are applied _after_ filter.
# [[routes]]
# schema-pattern = "shard_db_*"
# table-pattern = "shard_table_*"
Expand Down

0 comments on commit f333f5a

Please sign in to comment.