Skip to content

Commit

Permalink
Merge branch 'master' into fix-empty-region
Browse files Browse the repository at this point in the history
  • Loading branch information
Little-Wallace committed Sep 6, 2021
2 parents 3b3cc05 + 8a7bd55 commit 1c98275
Show file tree
Hide file tree
Showing 53 changed files with 2,735 additions and 1,190 deletions.
53 changes: 45 additions & 8 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
Expand All @@ -69,6 +68,7 @@ import (
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/ranger"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -967,25 +967,62 @@ func NewLocalBackend(
}

func (local *local) checkMultiIngestSupport(ctx context.Context, pdCtl *pdutil.PdController) error {
stores, err := conn.GetAllTiKVStores(ctx, pdCtl.GetPDClient(), conn.SkipTiFlash)
stores, err := pdCtl.GetPDClient().GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return errors.Trace(err)
}

hasTiFlash := false
for _, s := range stores {
client, err := local.getImportClient(ctx, s.Id)
if err != nil {
return errors.Trace(err)
if version.IsTiFlash(s) {
hasTiFlash = true
break
}
_, err = client.MultiIngest(ctx, &sst.MultiIngestRequest{})
if err != nil {
}

for _, s := range stores {
// skip stores that are not online
if s.State != metapb.StoreState_Up || version.IsTiFlash(s) {
continue
}
var err error
for i := 0; i < maxRetryTimes; i++ {
if i > 0 {
select {
case <-time.After(100 * time.Millisecond):
case <-ctx.Done():
return ctx.Err()
}
}
client, err1 := local.getImportClient(ctx, s.Id)
if err1 != nil {
err = err1
log.L().Warn("get import client failed", zap.Error(err), zap.String("store", s.Address))
continue
}
_, err = client.MultiIngest(ctx, &sst.MultiIngestRequest{})
if err == nil {
break
}
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unimplemented {
log.L().Info("multi ingest not support", zap.Any("unsupported store", s))
local.supportMultiIngest = false
return nil
}
}
return errors.Trace(err)
log.L().Warn("check multi ingest support failed", zap.Error(err), zap.String("store", s.Address),
zap.Int("retry", i))
}
if err != nil {
// if the cluster contains no TiFlash store, we don't need the multi-ingest feature,
// so in this condition, downgrade the logic instead of return an error.
if hasTiFlash {
return errors.Trace(err)
}
log.L().Warn("check multi failed all retry, fallback to false", log.ShortError(err))
local.supportMultiIngest = false
return nil
}
}

Expand Down
31 changes: 28 additions & 3 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"strconv"
"strings"
"time"
"unicode/utf8"

"github.com/BurntSushi/toml"
"github.com/docker/go-units"
Expand Down Expand Up @@ -93,6 +94,9 @@ const (
maxRetryTimes = 4
defaultRetryBackoffTime = 100 * time.Millisecond
pdStores = "/pd/api/v1/stores"

defaultCSVDataCharacterSet = "binary"
defaultCSVDataInvalidCharReplace = utf8.RuneError
)

var (
Expand Down Expand Up @@ -243,6 +247,7 @@ type PostRestore struct {
}

type CSVConfig struct {
// Separator, Delimiter and Terminator should all be in utf8mb4 encoding.
Separator string `toml:"separator" json:"separator"`
Delimiter string `toml:"delimiter" json:"delimiter"`
Terminator string `toml:"terminator" json:"terminator"`
Expand All @@ -269,6 +274,16 @@ type MydumperRuntime struct {
StrictFormat bool `toml:"strict-format" json:"strict-format"`
DefaultFileRules bool `toml:"default-file-rules" json:"default-file-rules"`
IgnoreColumns AllIgnoreColumns `toml:"ignore-data-columns" json:"ignore-data-columns"`
// DataCharacterSet is the character set of the source file. Only CSV files are supported now. The following options are supported.
// - utf8mb4
// - GB18030
// - GBK: an extension of the GB2312 character set and is also known as Code Page 936.
// - binary: no attempt to convert the encoding.
// Leave DataCharacterSet empty will make it use `binary` by default.
DataCharacterSet string `toml:"data-character-set" json:"data-character-set"`
// DataInvalidCharReplace is the replacement characters for non-compatible characters, which shouldn't duplicate with the separators or line breaks.
// Changing the default value will result in increased parsing time. Non-compatible characters do not cause an increase in error.
DataInvalidCharReplace string `toml:"data-invalid-char-replace" json:"data-invalid-char-replace"`
}

type AllIgnoreColumns []*IgnoreColumns
Expand Down Expand Up @@ -309,6 +324,10 @@ type FileRouteRule struct {
Type string `json:"type" toml:"type" yaml:"type"`
Key string `json:"key" toml:"key" yaml:"key"`
Compression string `json:"compression" toml:"compression" yaml:"compression"`
// TODO: DataCharacterSet here can overide the same field in [mydumper.csv] with a higher level.
// This could provide users a more flexable usage to configure different files with
// different data charsets.
// DataCharacterSet string `toml:"data-character-set" json:"data-character-set"`
}

type TikvImporter struct {
Expand Down Expand Up @@ -427,9 +446,11 @@ func NewConfig() *Config {
BackslashEscape: true,
TrimLastSep: false,
},
StrictFormat: false,
MaxRegionSize: MaxRegionSize,
Filter: DefaultFilter,
StrictFormat: false,
MaxRegionSize: MaxRegionSize,
Filter: DefaultFilter,
DataCharacterSet: defaultCSVDataCharacterSet,
DataInvalidCharReplace: string(defaultCSVDataInvalidCharReplace),
},
TikvImporter: TikvImporter{
Backend: "",
Expand Down Expand Up @@ -578,6 +599,10 @@ func (cfg *Config) Adjust(ctx context.Context) error {
cfg.Mydumper.DefaultFileRules = true
}

if len(cfg.Mydumper.DataCharacterSet) == 0 {
cfg.Mydumper.DataCharacterSet = defaultCSVDataCharacterSet
}

if cfg.TikvImporter.Backend == "" {
return errors.New("tikv-importer.backend must not be empty!")
}
Expand Down
84 changes: 84 additions & 0 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,3 +710,87 @@ func (s *configTestSuite) TestAdjustDiskQuota(c *C) {
c.Assert(cfg.Adjust(ctx), IsNil)
c.Assert(int64(cfg.TikvImporter.DiskQuota), Equals, int64(0))
}

func (s *configTestSuite) TestDataCharacterSet(c *C) {
testCases := []struct {
input string
err string
}{
{
input: `
[mydumper]
data-character-set = 'binary'
`,
err: "",
},
{
input: `
[mydumper]
data-character-set = 'utf8mb4'
`,
err: "",
},
{
input: `
[mydumper]
data-character-set = 'gb18030'
`,
err: "",
},
{
input: `
[mydumper]
data-invalid-char-replace = "\u2323"
`,
err: "",
},
{
input: `
[mydumper]
data-invalid-char-replace = "a"
`,
err: "",
},
{
input: `
[mydumper]
data-invalid-char-replace = "INV"
`,
err: "",
},
{
input: `
[mydumper]
data-invalid-char-replace = "😊"
`,
err: "",
},
{
input: `
[mydumper]
data-invalid-char-replace = "😊😭😅😄"
`,
err: "",
},
}

for _, tc := range testCases {
comment := Commentf("input = %s", tc.input)

cfg := config.NewConfig()
cfg.Mydumper.SourceDir = "file://."
cfg.TiDB.Port = 4000
cfg.TiDB.PdAddr = "test.invalid:2379"
cfg.TikvImporter.Backend = config.BackendLocal
cfg.TikvImporter.SortedKVDir = "."
cfg.TiDB.DistSQLScanConcurrency = 1
err := cfg.LoadFromTOML([]byte(tc.input))
c.Assert(err, IsNil)
err = cfg.Adjust(context.Background())
if tc.err != "" {
c.Assert(err, ErrorMatches, regexp.QuoteMeta(tc.err), comment)
} else {
c.Assert(err, IsNil, comment)
}
}
}
Loading

0 comments on commit 1c98275

Please sign in to comment.